common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{OptionExt, ResultExt, ensure};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::DatanodeId;
155use crate::error::{self, Result, SerdeJsonSnafu};
156use crate::key::flow::flow_state::FlowStateValue;
157use crate::key::node_address::NodeAddressValue;
158use crate::key::table_route::TableRouteKey;
159use crate::key::topic_region::TopicRegionValue;
160use crate::key::txn_helper::TxnOpGetResponseSet;
161use crate::kv_backend::KvBackendRef;
162use crate::kv_backend::txn::{Txn, TxnOp};
163use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
164use crate::rpc::store::BatchDeleteRequest;
165use crate::state_store::PoisonValue;
166
167pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
168pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
169pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
170pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
171pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
172pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
173
174pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
175pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
176pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
177pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
178pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
179pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
180pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
181pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
182pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
183// The legacy topic key prefix is used to store the topic name in previous versions.
184pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
185pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
186
187/// The election key.
188pub const ELECTION_KEY: &str = "__metasrv_election";
189/// The root key of metasrv election candidates.
190pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
191
192/// The keys with these prefixes will be loaded into the cache when the leader starts.
193pub const CACHE_KEY_PREFIXES: [&str; 5] = [
194    TABLE_NAME_KEY_PREFIX,
195    CATALOG_NAME_KEY_PREFIX,
196    SCHEMA_NAME_KEY_PREFIX,
197    TABLE_ROUTE_PREFIX,
198    NODE_ADDRESS_PREFIX,
199];
200
201/// A set of regions with the same role.
202#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
203pub struct RegionRoleSet {
204    /// Leader regions.
205    pub leader_regions: Vec<RegionNumber>,
206    /// Follower regions.
207    pub follower_regions: Vec<RegionNumber>,
208}
209
210impl<'de> Deserialize<'de> for RegionRoleSet {
211    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
212    where
213        D: serde::Deserializer<'de>,
214    {
215        #[derive(Deserialize)]
216        #[serde(untagged)]
217        enum RegionRoleSetOrLeaderOnly {
218            Full {
219                leader_regions: Vec<RegionNumber>,
220                follower_regions: Vec<RegionNumber>,
221            },
222            LeaderOnly(Vec<RegionNumber>),
223        }
224        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
225            RegionRoleSetOrLeaderOnly::Full {
226                leader_regions,
227                follower_regions,
228            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
229            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
230                Ok(RegionRoleSet::new(leader_regions, vec![]))
231            }
232        }
233    }
234}
235
236impl RegionRoleSet {
237    /// Create a new region role set.
238    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
239        Self {
240            leader_regions,
241            follower_regions,
242        }
243    }
244
245    /// Add a leader region to the set.
246    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
247        self.leader_regions.push(region_number);
248    }
249
250    /// Add a follower region to the set.
251    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
252        self.follower_regions.push(region_number);
253    }
254
255    /// Sort the regions.
256    pub fn sort(&mut self) {
257        self.follower_regions.sort();
258        self.leader_regions.sort();
259    }
260}
261
262/// The distribution of regions.
263///
264/// The key is the datanode id, the value is the region role set.
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
266
267/// The id of flow.
268pub type FlowId = u32;
269/// The partition of flow.
270pub type FlowPartitionId = u32;
271
272lazy_static! {
273    pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
274}
275
276lazy_static! {
277    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
278}
279
280lazy_static! {
281    static ref TABLE_INFO_KEY_PATTERN: Regex =
282        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
283}
284
285lazy_static! {
286    static ref VIEW_INFO_KEY_PATTERN: Regex =
287        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
288}
289
290lazy_static! {
291    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
292        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
293}
294
295lazy_static! {
296    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
297        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
298}
299
300lazy_static! {
301    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
302        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
303    ))
304    .unwrap();
305}
306
307lazy_static! {
308    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
309    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
310        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
311    ))
312        .unwrap();
313}
314
315lazy_static! {
316    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
317    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
318        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
319    ))
320        .unwrap();
321}
322
323lazy_static! {
324    static ref NODE_ADDRESS_PATTERN: Regex =
325        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
326}
327
328lazy_static! {
329    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
330        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
331}
332
333lazy_static! {
334    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
335        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
336    ))
337    .unwrap();
338}
339
340/// The key of metadata.
341pub trait MetadataKey<'a, T> {
342    fn to_bytes(&self) -> Vec<u8>;
343
344    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
345}
346
347#[derive(Debug, Clone, PartialEq)]
348pub struct BytesAdapter(Vec<u8>);
349
350impl From<Vec<u8>> for BytesAdapter {
351    fn from(value: Vec<u8>) -> Self {
352        Self(value)
353    }
354}
355
356impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
357    fn to_bytes(&self) -> Vec<u8> {
358        self.0.clone()
359    }
360
361    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
362        Ok(BytesAdapter(bytes.to_vec()))
363    }
364}
365
366pub(crate) trait MetadataKeyGetTxnOp {
367    fn build_get_op(
368        &self,
369    ) -> (
370        TxnOp,
371        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
372    );
373}
374
375pub trait MetadataValue {
376    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
377    where
378        Self: Sized;
379
380    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
381}
382
383pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
384
385pub struct TableMetadataManager {
386    table_name_manager: TableNameManager,
387    table_info_manager: TableInfoManager,
388    view_info_manager: ViewInfoManager,
389    datanode_table_manager: DatanodeTableManager,
390    catalog_manager: CatalogManager,
391    schema_manager: SchemaManager,
392    table_route_manager: TableRouteManager,
393    tombstone_manager: TombstoneManager,
394    topic_name_manager: TopicNameManager,
395    topic_region_manager: TopicRegionManager,
396    kv_backend: KvBackendRef,
397}
398
399#[macro_export]
400macro_rules! ensure_values {
401    ($got:expr, $expected_value:expr, $name:expr) => {
402        ensure!(
403            $got == $expected_value,
404            error::UnexpectedSnafu {
405                err_msg: format!(
406                    "Reads the different value: {:?} during {}, expected: {:?}",
407                    $got, $name, $expected_value
408                )
409            }
410        );
411    };
412}
413
414/// A struct containing a deserialized value(`inner`) and an original bytes.
415///
416/// - Serialize behaviors:
417///
418/// The `inner` field will be ignored.
419///
420/// - Deserialize behaviors:
421///
422/// The `inner` field will be deserialized from the `bytes` field.
423pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
424    // The original bytes of the inner.
425    bytes: Bytes,
426    // The value was deserialized from the original bytes.
427    inner: T,
428}
429
430impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
431    type Target = T;
432
433    fn deref(&self) -> &Self::Target {
434        &self.inner
435    }
436}
437
438impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
439    fn deref_mut(&mut self) -> &mut Self::Target {
440        &mut self.inner
441    }
442}
443
444impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        write!(
447            f,
448            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
449            self.inner, self.bytes
450        )
451    }
452}
453
454impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
455    /// - Serialize behaviors:
456    ///
457    /// The `inner` field will be ignored.
458    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
459    where
460        S: serde::Serializer,
461    {
462        // Safety: The original bytes are always JSON encoded.
463        // It's more efficiently than `serialize_bytes`.
464        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
465    }
466}
467
468impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
469    for DeserializedValueWithBytes<T>
470{
471    /// - Deserialize behaviors:
472    ///
473    /// The `inner` field will be deserialized from the `bytes` field.
474    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
475    where
476        D: serde::Deserializer<'de>,
477    {
478        let buf = String::deserialize(deserializer)?;
479        let bytes = Bytes::from(buf);
480
481        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
482            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
483
484        Ok(value)
485    }
486}
487
488impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
489    fn clone(&self) -> Self {
490        Self {
491            bytes: self.bytes.clone(),
492            inner: self.inner.clone(),
493        }
494    }
495}
496
497impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
498    /// Returns a struct containing a deserialized value and an original `bytes`.
499    /// It accepts original bytes of inner.
500    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
501        let inner = T::try_from_raw_value(&bytes)?;
502        Ok(Self { bytes, inner })
503    }
504
505    /// Returns a struct containing a deserialized value and an original `bytes`.
506    /// It accepts original bytes of inner.
507    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
508        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
509    }
510
511    pub fn into_inner(self) -> T {
512        self.inner
513    }
514
515    pub fn get_inner_ref(&self) -> &T {
516        &self.inner
517    }
518
519    /// Returns original `bytes`
520    pub fn get_raw_bytes(&self) -> Vec<u8> {
521        self.bytes.to_vec()
522    }
523
524    #[cfg(any(test, feature = "testing"))]
525    pub fn from_inner(inner: T) -> Self {
526        let bytes = serde_json::to_vec(&inner).unwrap();
527
528        Self {
529            bytes: Bytes::from(bytes),
530            inner,
531        }
532    }
533}
534
535impl TableMetadataManager {
536    pub fn new(kv_backend: KvBackendRef) -> Self {
537        TableMetadataManager {
538            table_name_manager: TableNameManager::new(kv_backend.clone()),
539            table_info_manager: TableInfoManager::new(kv_backend.clone()),
540            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
541            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
542            catalog_manager: CatalogManager::new(kv_backend.clone()),
543            schema_manager: SchemaManager::new(kv_backend.clone()),
544            table_route_manager: TableRouteManager::new(kv_backend.clone()),
545            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
546            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
547            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
548            kv_backend,
549        }
550    }
551
552    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
553    pub fn new_with_custom_tombstone_prefix(
554        kv_backend: KvBackendRef,
555        tombstone_prefix: &str,
556    ) -> Self {
557        Self {
558            table_name_manager: TableNameManager::new(kv_backend.clone()),
559            table_info_manager: TableInfoManager::new(kv_backend.clone()),
560            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
561            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
562            catalog_manager: CatalogManager::new(kv_backend.clone()),
563            schema_manager: SchemaManager::new(kv_backend.clone()),
564            table_route_manager: TableRouteManager::new(kv_backend.clone()),
565            tombstone_manager: TombstoneManager::new_with_prefix(
566                kv_backend.clone(),
567                tombstone_prefix,
568            ),
569            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
570            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
571            kv_backend,
572        }
573    }
574
575    pub async fn init(&self) -> Result<()> {
576        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
577
578        self.catalog_manager().create(catalog_name, true).await?;
579
580        let internal_schemas = [
581            DEFAULT_SCHEMA_NAME,
582            INFORMATION_SCHEMA_NAME,
583            DEFAULT_PRIVATE_SCHEMA_NAME,
584        ];
585
586        for schema_name in internal_schemas {
587            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
588
589            self.schema_manager().create(schema_key, None, true).await?;
590        }
591
592        Ok(())
593    }
594
595    pub fn table_name_manager(&self) -> &TableNameManager {
596        &self.table_name_manager
597    }
598
599    pub fn table_info_manager(&self) -> &TableInfoManager {
600        &self.table_info_manager
601    }
602
603    pub fn view_info_manager(&self) -> &ViewInfoManager {
604        &self.view_info_manager
605    }
606
607    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
608        &self.datanode_table_manager
609    }
610
611    pub fn catalog_manager(&self) -> &CatalogManager {
612        &self.catalog_manager
613    }
614
615    pub fn schema_manager(&self) -> &SchemaManager {
616        &self.schema_manager
617    }
618
619    pub fn table_route_manager(&self) -> &TableRouteManager {
620        &self.table_route_manager
621    }
622
623    pub fn topic_name_manager(&self) -> &TopicNameManager {
624        &self.topic_name_manager
625    }
626
627    pub fn topic_region_manager(&self) -> &TopicRegionManager {
628        &self.topic_region_manager
629    }
630
631    pub fn kv_backend(&self) -> &KvBackendRef {
632        &self.kv_backend
633    }
634
635    pub async fn get_full_table_info(
636        &self,
637        table_id: TableId,
638    ) -> Result<(
639        Option<DeserializedValueWithBytes<TableInfoValue>>,
640        Option<DeserializedValueWithBytes<TableRouteValue>>,
641    )> {
642        let table_info_key = TableInfoKey::new(table_id);
643        let table_route_key = TableRouteKey::new(table_id);
644        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
645        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
646
647        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
648        let mut res = self.kv_backend.txn(txn).await?;
649        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
650        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
651        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
652        Ok((table_info_value, table_route_value))
653    }
654
655    /// Creates metadata for view and returns an error if different metadata exists.
656    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
657    /// Parameters include:
658    /// - `view_info`: the encoded logical plan
659    /// - `table_names`: the resolved fully table names in logical plan
660    /// - `columns`: the view columns
661    /// - `plan_columns`: the original plan columns
662    /// - `definition`: The SQL to create the view
663    ///
664    pub async fn create_view_metadata(
665        &self,
666        view_info: RawTableInfo,
667        raw_logical_plan: Vec<u8>,
668        table_names: HashSet<TableName>,
669        columns: Vec<String>,
670        plan_columns: Vec<String>,
671        definition: String,
672    ) -> Result<()> {
673        let view_id = view_info.ident.table_id;
674
675        // Creates view name.
676        let view_name = TableNameKey::new(
677            &view_info.catalog_name,
678            &view_info.schema_name,
679            &view_info.name,
680        );
681        let create_table_name_txn = self
682            .table_name_manager()
683            .build_create_txn(&view_name, view_id)?;
684
685        // Creates table info.
686        let table_info_value = TableInfoValue::new(view_info);
687
688        let (create_table_info_txn, on_create_table_info_failure) = self
689            .table_info_manager()
690            .build_create_txn(view_id, &table_info_value)?;
691
692        // Creates view info
693        let view_info_value = ViewInfoValue::new(
694            raw_logical_plan,
695            table_names,
696            columns,
697            plan_columns,
698            definition,
699        );
700        let (create_view_info_txn, on_create_view_info_failure) = self
701            .view_info_manager()
702            .build_create_txn(view_id, &view_info_value)?;
703
704        let txn = Txn::merge_all(vec![
705            create_table_name_txn,
706            create_table_info_txn,
707            create_view_info_txn,
708        ]);
709
710        let mut r = self.kv_backend.txn(txn).await?;
711
712        // Checks whether metadata was already created.
713        if !r.succeeded {
714            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
715            let remote_table_info = on_create_table_info_failure(&mut set)?
716                .context(error::UnexpectedSnafu {
717                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
718                })?
719                .into_inner();
720
721            let remote_view_info = on_create_view_info_failure(&mut set)?
722                .context(error::UnexpectedSnafu {
723                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
724                })?
725                .into_inner();
726
727            let op_name = "the creating view metadata";
728            ensure_values!(remote_table_info, table_info_value, op_name);
729            ensure_values!(remote_view_info, view_info_value, op_name);
730        }
731
732        Ok(())
733    }
734
735    /// Creates metadata for table and returns an error if different metadata exists.
736    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
737    pub async fn create_table_metadata(
738        &self,
739        mut table_info: RawTableInfo,
740        table_route_value: TableRouteValue,
741        region_wal_options: HashMap<RegionNumber, String>,
742    ) -> Result<()> {
743        let region_numbers = table_route_value.region_numbers();
744        table_info.meta.region_numbers = region_numbers;
745        let table_id = table_info.ident.table_id;
746        let engine = table_info.meta.engine.clone();
747
748        // Creates table name.
749        let table_name = TableNameKey::new(
750            &table_info.catalog_name,
751            &table_info.schema_name,
752            &table_info.name,
753        );
754        let create_table_name_txn = self
755            .table_name_manager()
756            .build_create_txn(&table_name, table_id)?;
757
758        let region_options = table_info.to_region_options();
759        // Creates table info.
760        let table_info_value = TableInfoValue::new(table_info);
761        let (create_table_info_txn, on_create_table_info_failure) = self
762            .table_info_manager()
763            .build_create_txn(table_id, &table_info_value)?;
764
765        let (create_table_route_txn, on_create_table_route_failure) = self
766            .table_route_manager()
767            .table_route_storage()
768            .build_create_txn(table_id, &table_route_value)?;
769
770        let create_topic_region_txn = self
771            .topic_region_manager
772            .build_create_txn(table_id, &region_wal_options)?;
773
774        let mut txn = Txn::merge_all(vec![
775            create_table_name_txn,
776            create_table_info_txn,
777            create_table_route_txn,
778            create_topic_region_txn,
779        ]);
780
781        if let TableRouteValue::Physical(x) = &table_route_value {
782            let region_storage_path = table_info_value.region_storage_path();
783            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
784                table_id,
785                &engine,
786                &region_storage_path,
787                region_options,
788                region_wal_options,
789                region_distribution(&x.region_routes),
790            )?;
791            txn = txn.merge(create_datanode_table_txn);
792        }
793
794        let mut r = self.kv_backend.txn(txn).await?;
795
796        // Checks whether metadata was already created.
797        if !r.succeeded {
798            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
799            let remote_table_info = on_create_table_info_failure(&mut set)?
800                .context(error::UnexpectedSnafu {
801                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
802                })?
803                .into_inner();
804
805            let remote_table_route = on_create_table_route_failure(&mut set)?
806                .context(error::UnexpectedSnafu {
807                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
808                })?
809                .into_inner();
810
811            let op_name = "the creating table metadata";
812            ensure_values!(remote_table_info, table_info_value, op_name);
813            ensure_values!(remote_table_route, table_route_value, op_name);
814        }
815
816        Ok(())
817    }
818
819    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
820        // The batch size is max_txn_size / 3 because the size of the `tables_data`
821        // is 3 times the size of the `tables_data`.
822        self.kv_backend.max_txn_ops() / 3
823    }
824
825    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
826    pub async fn create_logical_tables_metadata(
827        &self,
828        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
829    ) -> Result<()> {
830        let len = tables_data.len();
831        let mut txns = Vec::with_capacity(3 * len);
832        struct OnFailure<F1, R1, F2, R2>
833        where
834            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
835            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
836        {
837            table_info_value: TableInfoValue,
838            on_create_table_info_failure: F1,
839            table_route_value: TableRouteValue,
840            on_create_table_route_failure: F2,
841        }
842        let mut on_failures = Vec::with_capacity(len);
843        for (mut table_info, table_route_value) in tables_data {
844            table_info.meta.region_numbers = table_route_value.region_numbers();
845            let table_id = table_info.ident.table_id;
846
847            // Creates table name.
848            let table_name = TableNameKey::new(
849                &table_info.catalog_name,
850                &table_info.schema_name,
851                &table_info.name,
852            );
853            let create_table_name_txn = self
854                .table_name_manager()
855                .build_create_txn(&table_name, table_id)?;
856            txns.push(create_table_name_txn);
857
858            // Creates table info.
859            let table_info_value = TableInfoValue::new(table_info);
860            let (create_table_info_txn, on_create_table_info_failure) =
861                self.table_info_manager()
862                    .build_create_txn(table_id, &table_info_value)?;
863            txns.push(create_table_info_txn);
864
865            let (create_table_route_txn, on_create_table_route_failure) = self
866                .table_route_manager()
867                .table_route_storage()
868                .build_create_txn(table_id, &table_route_value)?;
869            txns.push(create_table_route_txn);
870
871            on_failures.push(OnFailure {
872                table_info_value,
873                on_create_table_info_failure,
874                table_route_value,
875                on_create_table_route_failure,
876            });
877        }
878
879        let txn = Txn::merge_all(txns);
880        let mut r = self.kv_backend.txn(txn).await?;
881
882        // Checks whether metadata was already created.
883        if !r.succeeded {
884            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
885            for on_failure in on_failures {
886                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
887                    .context(error::UnexpectedSnafu {
888                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
889                    })?
890                    .into_inner();
891
892                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
893                    .context(error::UnexpectedSnafu {
894                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
895                    })?
896                    .into_inner();
897
898                let op_name = "the creating logical tables metadata";
899                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
900                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
901            }
902        }
903
904        Ok(())
905    }
906
907    fn table_metadata_keys(
908        &self,
909        table_id: TableId,
910        table_name: &TableName,
911        table_route_value: &TableRouteValue,
912        region_wal_options: &HashMap<RegionNumber, WalOptions>,
913    ) -> Result<Vec<Vec<u8>>> {
914        // Builds keys
915        let datanode_ids = if table_route_value.is_physical() {
916            region_distribution(table_route_value.region_routes()?)
917                .into_keys()
918                .collect()
919        } else {
920            vec![]
921        };
922        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
923        let table_name = TableNameKey::new(
924            &table_name.catalog_name,
925            &table_name.schema_name,
926            &table_name.table_name,
927        );
928        let table_info_key = TableInfoKey::new(table_id);
929        let table_route_key = TableRouteKey::new(table_id);
930        let datanode_table_keys = datanode_ids
931            .into_iter()
932            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
933            .collect::<HashSet<_>>();
934        let topic_region_map = self
935            .topic_region_manager
936            .get_topic_region_mapping(table_id, region_wal_options);
937        let topic_region_keys = topic_region_map
938            .iter()
939            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
940            .collect::<Vec<_>>();
941        keys.push(table_name.to_bytes());
942        keys.push(table_info_key.to_bytes());
943        keys.push(table_route_key.to_bytes());
944        for key in &datanode_table_keys {
945            keys.push(key.to_bytes());
946        }
947        for key in topic_region_keys {
948            keys.push(key.to_bytes());
949        }
950        Ok(keys)
951    }
952
953    /// Deletes metadata for table **logically**.
954    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
955    pub async fn delete_table_metadata(
956        &self,
957        table_id: TableId,
958        table_name: &TableName,
959        table_route_value: &TableRouteValue,
960        region_wal_options: &HashMap<RegionNumber, WalOptions>,
961    ) -> Result<()> {
962        let keys =
963            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
964        self.tombstone_manager.create(keys).await.map(|_| ())
965    }
966
967    /// Deletes metadata tombstone for table **permanently**.
968    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
969    pub async fn delete_table_metadata_tombstone(
970        &self,
971        table_id: TableId,
972        table_name: &TableName,
973        table_route_value: &TableRouteValue,
974        region_wal_options: &HashMap<RegionNumber, WalOptions>,
975    ) -> Result<()> {
976        let table_metadata_keys =
977            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
978        self.tombstone_manager
979            .delete(table_metadata_keys)
980            .await
981            .map(|_| ())
982    }
983
984    /// Restores metadata for table.
985    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
986    pub async fn restore_table_metadata(
987        &self,
988        table_id: TableId,
989        table_name: &TableName,
990        table_route_value: &TableRouteValue,
991        region_wal_options: &HashMap<RegionNumber, WalOptions>,
992    ) -> Result<()> {
993        let keys =
994            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
995        self.tombstone_manager.restore(keys).await.map(|_| ())
996    }
997
998    /// Deletes metadata for table **permanently**.
999    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1000    pub async fn destroy_table_metadata(
1001        &self,
1002        table_id: TableId,
1003        table_name: &TableName,
1004        table_route_value: &TableRouteValue,
1005        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1006    ) -> Result<()> {
1007        let keys =
1008            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1009        let _ = self
1010            .kv_backend
1011            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1012            .await?;
1013        Ok(())
1014    }
1015
1016    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1017        let mut keys = Vec::with_capacity(3);
1018        let view_name = TableNameKey::new(
1019            &view_name.catalog_name,
1020            &view_name.schema_name,
1021            &view_name.table_name,
1022        );
1023        let table_info_key = TableInfoKey::new(view_id);
1024        let view_info_key = ViewInfoKey::new(view_id);
1025        keys.push(view_name.to_bytes());
1026        keys.push(table_info_key.to_bytes());
1027        keys.push(view_info_key.to_bytes());
1028
1029        Ok(keys)
1030    }
1031
1032    /// Deletes metadata for view **permanently**.
1033    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1034    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1035        let keys = self.view_info_keys(view_id, view_name)?;
1036        let _ = self
1037            .kv_backend
1038            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1039            .await?;
1040        Ok(())
1041    }
1042
1043    /// Renames the table name and returns an error if different metadata exists.
1044    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1045    /// and the new `TableNameKey` MUST be empty.
1046    pub async fn rename_table(
1047        &self,
1048        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1049        new_table_name: String,
1050    ) -> Result<()> {
1051        let current_table_info = &current_table_info_value.table_info;
1052        let table_id = current_table_info.ident.table_id;
1053
1054        let table_name_key = TableNameKey::new(
1055            &current_table_info.catalog_name,
1056            &current_table_info.schema_name,
1057            &current_table_info.name,
1058        );
1059
1060        let new_table_name_key = TableNameKey::new(
1061            &current_table_info.catalog_name,
1062            &current_table_info.schema_name,
1063            &new_table_name,
1064        );
1065
1066        // Updates table name.
1067        let update_table_name_txn = self.table_name_manager().build_update_txn(
1068            &table_name_key,
1069            &new_table_name_key,
1070            table_id,
1071        )?;
1072
1073        let new_table_info_value = current_table_info_value
1074            .inner
1075            .with_update(move |table_info| {
1076                table_info.name = new_table_name;
1077            });
1078
1079        // Updates table info.
1080        let (update_table_info_txn, on_update_table_info_failure) = self
1081            .table_info_manager()
1082            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1083
1084        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1085
1086        let mut r = self.kv_backend.txn(txn).await?;
1087
1088        // Checks whether metadata was already updated.
1089        if !r.succeeded {
1090            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1091            let remote_table_info = on_update_table_info_failure(&mut set)?
1092                .context(error::UnexpectedSnafu {
1093                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1094                })?
1095                .into_inner();
1096
1097            let op_name = "the renaming table metadata";
1098            ensure_values!(remote_table_info, new_table_info_value, op_name);
1099        }
1100
1101        Ok(())
1102    }
1103
1104    /// Updates table info and returns an error if different metadata exists.
1105    /// And cascade-ly update all redundant table options for each region
1106    /// if region_distribution is present.
1107    pub async fn update_table_info(
1108        &self,
1109        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1110        region_distribution: Option<RegionDistribution>,
1111        new_table_info: RawTableInfo,
1112    ) -> Result<()> {
1113        let table_id = current_table_info_value.table_info.ident.table_id;
1114        let new_table_info_value = current_table_info_value.update(new_table_info);
1115
1116        // Updates table info.
1117        let (update_table_info_txn, on_update_table_info_failure) = self
1118            .table_info_manager()
1119            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1120
1121        let txn = if let Some(region_distribution) = region_distribution {
1122            // region options induced from table info.
1123            let new_region_options = new_table_info_value.table_info.to_region_options();
1124            let update_datanode_table_options_txn = self
1125                .datanode_table_manager
1126                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1127                .await?;
1128            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1129        } else {
1130            update_table_info_txn
1131        };
1132
1133        let mut r = self.kv_backend.txn(txn).await?;
1134        // Checks whether metadata was already updated.
1135        if !r.succeeded {
1136            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1137            let remote_table_info = on_update_table_info_failure(&mut set)?
1138                .context(error::UnexpectedSnafu {
1139                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1140                })?
1141                .into_inner();
1142
1143            let op_name = "the updating table info";
1144            ensure_values!(remote_table_info, new_table_info_value, op_name);
1145        }
1146        Ok(())
1147    }
1148
1149    /// Updates view info and returns an error if different metadata exists.
1150    /// Parameters include:
1151    /// - `view_id`: the view id
1152    /// - `current_view_info_value`: the current view info for CAS checking
1153    /// - `new_view_info`: the encoded logical plan
1154    /// - `table_names`: the resolved fully table names in logical plan
1155    /// - `columns`: the view columns
1156    /// - `plan_columns`: the original plan columns
1157    /// - `definition`: The SQL to create the view
1158    ///
1159    #[allow(clippy::too_many_arguments)]
1160    pub async fn update_view_info(
1161        &self,
1162        view_id: TableId,
1163        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1164        new_view_info: Vec<u8>,
1165        table_names: HashSet<TableName>,
1166        columns: Vec<String>,
1167        plan_columns: Vec<String>,
1168        definition: String,
1169    ) -> Result<()> {
1170        let new_view_info_value = current_view_info_value.update(
1171            new_view_info,
1172            table_names,
1173            columns,
1174            plan_columns,
1175            definition,
1176        );
1177
1178        // Updates view info.
1179        let (update_view_info_txn, on_update_view_info_failure) = self
1180            .view_info_manager()
1181            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1182
1183        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1184
1185        // Checks whether metadata was already updated.
1186        if !r.succeeded {
1187            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1188            let remote_view_info = on_update_view_info_failure(&mut set)?
1189                .context(error::UnexpectedSnafu {
1190                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1191                })?
1192                .into_inner();
1193
1194            let op_name = "the updating view info";
1195            ensure_values!(remote_view_info, new_view_info_value, op_name);
1196        }
1197        Ok(())
1198    }
1199
1200    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1201        self.kv_backend.max_txn_ops()
1202    }
1203
1204    pub async fn batch_update_table_info_values(
1205        &self,
1206        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1207    ) -> Result<()> {
1208        let len = table_info_value_pairs.len();
1209        let mut txns = Vec::with_capacity(len);
1210        struct OnFailure<F, R>
1211        where
1212            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1213        {
1214            table_info_value: TableInfoValue,
1215            on_update_table_info_failure: F,
1216        }
1217        let mut on_failures = Vec::with_capacity(len);
1218
1219        for (table_info_value, new_table_info) in table_info_value_pairs {
1220            let table_id = table_info_value.table_info.ident.table_id;
1221
1222            let new_table_info_value = table_info_value.update(new_table_info);
1223
1224            let (update_table_info_txn, on_update_table_info_failure) =
1225                self.table_info_manager().build_update_txn(
1226                    table_id,
1227                    &table_info_value,
1228                    &new_table_info_value,
1229                )?;
1230
1231            txns.push(update_table_info_txn);
1232
1233            on_failures.push(OnFailure {
1234                table_info_value: new_table_info_value,
1235                on_update_table_info_failure,
1236            });
1237        }
1238
1239        let txn = Txn::merge_all(txns);
1240        let mut r = self.kv_backend.txn(txn).await?;
1241
1242        if !r.succeeded {
1243            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1244            for on_failure in on_failures {
1245                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1246                    .context(error::UnexpectedSnafu {
1247                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1248                    })?
1249                    .into_inner();
1250
1251                let op_name = "the batch updating table info";
1252                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1253            }
1254        }
1255
1256        Ok(())
1257    }
1258
1259    pub async fn update_table_route(
1260        &self,
1261        table_id: TableId,
1262        region_info: RegionInfo,
1263        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1264        new_region_routes: Vec<RegionRoute>,
1265        new_region_options: &HashMap<String, String>,
1266        new_region_wal_options: &HashMap<RegionNumber, String>,
1267    ) -> Result<()> {
1268        // Updates the datanode table key value pairs.
1269        let current_region_distribution =
1270            region_distribution(current_table_route_value.region_routes()?);
1271        let new_region_distribution = region_distribution(&new_region_routes);
1272
1273        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1274            table_id,
1275            region_info,
1276            current_region_distribution,
1277            new_region_distribution,
1278            new_region_options,
1279            new_region_wal_options,
1280        )?;
1281
1282        // Updates the table_route.
1283        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1284
1285        let (update_table_route_txn, on_update_table_route_failure) = self
1286            .table_route_manager()
1287            .table_route_storage()
1288            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1289
1290        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1291
1292        let mut r = self.kv_backend.txn(txn).await?;
1293
1294        // Checks whether metadata was already updated.
1295        if !r.succeeded {
1296            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1297            let remote_table_route = on_update_table_route_failure(&mut set)?
1298                .context(error::UnexpectedSnafu {
1299                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1300                })?
1301                .into_inner();
1302
1303            let op_name = "the updating table route";
1304            ensure_values!(remote_table_route, new_table_route_value, op_name);
1305        }
1306
1307        Ok(())
1308    }
1309
1310    /// Updates the leader status of the [RegionRoute].
1311    pub async fn update_leader_region_status<F>(
1312        &self,
1313        table_id: TableId,
1314        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1315        next_region_route_status: F,
1316    ) -> Result<()>
1317    where
1318        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1319    {
1320        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1321
1322        let mut updated = 0;
1323        for route in &mut new_region_routes {
1324            if let Some(state) = next_region_route_status(route)
1325                && route.set_leader_state(state)
1326            {
1327                updated += 1;
1328            }
1329        }
1330
1331        if updated == 0 {
1332            warn!("No leader status updated");
1333            return Ok(());
1334        }
1335
1336        // Updates the table_route.
1337        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1338
1339        let (update_table_route_txn, on_update_table_route_failure) = self
1340            .table_route_manager()
1341            .table_route_storage()
1342            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1343
1344        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1345
1346        // Checks whether metadata was already updated.
1347        if !r.succeeded {
1348            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1349            let remote_table_route = on_update_table_route_failure(&mut set)?
1350                .context(error::UnexpectedSnafu {
1351                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1352                })?
1353                .into_inner();
1354
1355            let op_name = "the updating leader region status";
1356            ensure_values!(remote_table_route, new_table_route_value, op_name);
1357        }
1358
1359        Ok(())
1360    }
1361}
1362
1363#[macro_export]
1364macro_rules! impl_metadata_value {
1365    ($($val_ty: ty), *) => {
1366        $(
1367            impl $crate::key::MetadataValue for $val_ty {
1368                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1369                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1370                }
1371
1372                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1373                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1374                }
1375            }
1376        )*
1377    }
1378}
1379
1380macro_rules! impl_metadata_key_get_txn_op {
1381    ($($key: ty), *) => {
1382        $(
1383            impl $crate::key::MetadataKeyGetTxnOp for $key {
1384                /// Returns a [TxnOp] to retrieve the corresponding value
1385                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1386                fn build_get_op(
1387                    &self,
1388                ) -> (
1389                    TxnOp,
1390                    impl for<'a> FnMut(
1391                        &'a mut TxnOpGetResponseSet,
1392                    ) -> Option<Vec<u8>>,
1393                ) {
1394                    let raw_key = self.to_bytes();
1395                    (
1396                        TxnOp::Get(raw_key.clone()),
1397                        TxnOpGetResponseSet::filter(raw_key),
1398                    )
1399                }
1400            }
1401        )*
1402    }
1403}
1404
1405impl_metadata_key_get_txn_op! {
1406    TableNameKey<'_>,
1407    TableInfoKey,
1408    ViewInfoKey,
1409    TableRouteKey,
1410    DatanodeTableKey
1411}
1412
1413#[macro_export]
1414macro_rules! impl_optional_metadata_value {
1415    ($($val_ty: ty), *) => {
1416        $(
1417            impl $val_ty {
1418                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1419                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1420                }
1421
1422                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1423                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1424                }
1425            }
1426        )*
1427    }
1428}
1429
1430impl_metadata_value! {
1431    TableNameValue,
1432    TableInfoValue,
1433    ViewInfoValue,
1434    DatanodeTableValue,
1435    FlowInfoValue,
1436    FlowNameValue,
1437    FlowRouteValue,
1438    TableFlowValue,
1439    NodeAddressValue,
1440    SchemaNameValue,
1441    FlowStateValue,
1442    PoisonValue,
1443    TopicRegionValue
1444}
1445
1446impl_optional_metadata_value! {
1447    CatalogNameValue,
1448    SchemaNameValue
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453    use std::collections::{BTreeMap, HashMap, HashSet};
1454    use std::sync::Arc;
1455
1456    use bytes::Bytes;
1457    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1458    use common_time::util::current_time_millis;
1459    use common_wal::options::{KafkaWalOptions, WalOptions};
1460    use futures::TryStreamExt;
1461    use store_api::storage::{RegionId, RegionNumber};
1462    use table::metadata::{RawTableInfo, TableInfo};
1463    use table::table_name::TableName;
1464
1465    use super::datanode_table::DatanodeTableKey;
1466    use super::test_utils;
1467    use crate::ddl::test_util::create_table::test_create_table_task;
1468    use crate::ddl::utils::region_storage_path;
1469    use crate::error::Result;
1470    use crate::key::datanode_table::RegionInfo;
1471    use crate::key::table_info::TableInfoValue;
1472    use crate::key::table_name::TableNameKey;
1473    use crate::key::table_route::TableRouteValue;
1474    use crate::key::{
1475        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1476        TableMetadataManager, ViewInfoValue,
1477    };
1478    use crate::kv_backend::KvBackend;
1479    use crate::kv_backend::memory::MemoryKvBackend;
1480    use crate::peer::Peer;
1481    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1482    use crate::rpc::store::RangeRequest;
1483    use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
1484
1485    #[test]
1486    fn test_deserialized_value_with_bytes() {
1487        let region_route = new_test_region_route();
1488        let region_routes = vec![region_route.clone()];
1489
1490        let expected_region_routes =
1491            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1492        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1493
1494        // Serialize behaviors:
1495        // The inner field will be ignored.
1496        let value = DeserializedValueWithBytes {
1497            // ignored
1498            inner: TableRouteValue::physical(region_routes.clone()),
1499            bytes: Bytes::from(expected.clone()),
1500        };
1501
1502        let encoded = serde_json::to_vec(&value).unwrap();
1503
1504        // Deserialize behaviors:
1505        // The inner field will be deserialized from the bytes field.
1506        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1507            serde_json::from_slice(&encoded).unwrap();
1508
1509        assert_eq!(decoded.inner, expected_region_routes);
1510        assert_eq!(decoded.bytes, expected);
1511    }
1512
1513    fn new_test_region_route() -> RegionRoute {
1514        new_region_route(1, 2)
1515    }
1516
1517    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1518        RegionRoute {
1519            region: Region {
1520                id: region_id.into(),
1521                name: "r1".to_string(),
1522                partition: None,
1523                attrs: BTreeMap::new(),
1524                partition_expr: Default::default(),
1525            },
1526            leader_peer: Some(Peer::new(datanode, "a2")),
1527            follower_peers: vec![],
1528            leader_state: None,
1529            leader_down_since: None,
1530        }
1531    }
1532
1533    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1534        test_utils::new_test_table_info(10, region_numbers)
1535    }
1536
1537    fn new_test_table_names() -> HashSet<TableName> {
1538        let mut set = HashSet::new();
1539        set.insert(TableName {
1540            catalog_name: "greptime".to_string(),
1541            schema_name: "public".to_string(),
1542            table_name: "a_table".to_string(),
1543        });
1544        set.insert(TableName {
1545            catalog_name: "greptime".to_string(),
1546            schema_name: "public".to_string(),
1547            table_name: "b_table".to_string(),
1548        });
1549        set
1550    }
1551
1552    async fn create_physical_table_metadata(
1553        table_metadata_manager: &TableMetadataManager,
1554        table_info: RawTableInfo,
1555        region_routes: Vec<RegionRoute>,
1556        region_wal_options: HashMap<RegionNumber, String>,
1557    ) -> Result<()> {
1558        table_metadata_manager
1559            .create_table_metadata(
1560                table_info,
1561                TableRouteValue::physical(region_routes),
1562                region_wal_options,
1563            )
1564            .await
1565    }
1566
1567    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1568        let topics = (0..2)
1569            .map(|i| format!("greptimedb_topic{}", i))
1570            .collect::<Vec<_>>();
1571        let wal_options = topics
1572            .iter()
1573            .map(|topic| {
1574                WalOptions::Kafka(KafkaWalOptions {
1575                    topic: topic.clone(),
1576                })
1577            })
1578            .collect::<Vec<_>>();
1579
1580        (0..16)
1581            .enumerate()
1582            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1583            .collect()
1584    }
1585
1586    #[tokio::test]
1587    async fn test_raft_engine_topic_region_map() {
1588        let mem_kv = Arc::new(MemoryKvBackend::default());
1589        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1590        let region_route = new_test_region_route();
1591        let region_routes = &vec![region_route.clone()];
1592        let table_info: RawTableInfo =
1593            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1594        let wal_allocator = WalOptionsAllocator::RaftEngine;
1595        let regions = (0..16).collect();
1596        let region_wal_options =
1597            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1598        create_physical_table_metadata(
1599            &table_metadata_manager,
1600            table_info.clone(),
1601            region_routes.clone(),
1602            region_wal_options.clone(),
1603        )
1604        .await
1605        .unwrap();
1606
1607        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1608        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1609        let resp = mem_kv.range(range_req).await.unwrap();
1610        // Should be empty because the topic region map is empty for raft engine.
1611        assert!(resp.kvs.is_empty());
1612    }
1613
1614    #[tokio::test]
1615    async fn test_create_table_metadata() {
1616        let mem_kv = Arc::new(MemoryKvBackend::default());
1617        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1618        let region_route = new_test_region_route();
1619        let region_routes = &vec![region_route.clone()];
1620        let table_info: RawTableInfo =
1621            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1622        let region_wal_options = create_mock_region_wal_options()
1623            .into_iter()
1624            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1625            .collect::<HashMap<_, _>>();
1626
1627        // creates metadata.
1628        create_physical_table_metadata(
1629            &table_metadata_manager,
1630            table_info.clone(),
1631            region_routes.clone(),
1632            region_wal_options.clone(),
1633        )
1634        .await
1635        .unwrap();
1636
1637        // if metadata was already created, it should be ok.
1638        assert!(
1639            create_physical_table_metadata(
1640                &table_metadata_manager,
1641                table_info.clone(),
1642                region_routes.clone(),
1643                region_wal_options.clone(),
1644            )
1645            .await
1646            .is_ok()
1647        );
1648
1649        let mut modified_region_routes = region_routes.clone();
1650        modified_region_routes.push(region_route.clone());
1651        // if remote metadata was exists, it should return an error.
1652        assert!(
1653            create_physical_table_metadata(
1654                &table_metadata_manager,
1655                table_info.clone(),
1656                modified_region_routes,
1657                region_wal_options.clone(),
1658            )
1659            .await
1660            .is_err()
1661        );
1662
1663        let (remote_table_info, remote_table_route) = table_metadata_manager
1664            .get_full_table_info(10)
1665            .await
1666            .unwrap();
1667
1668        assert_eq!(
1669            remote_table_info.unwrap().into_inner().table_info,
1670            table_info
1671        );
1672        assert_eq!(
1673            remote_table_route
1674                .unwrap()
1675                .into_inner()
1676                .region_routes()
1677                .unwrap(),
1678            region_routes
1679        );
1680
1681        for i in 0..2 {
1682            let region_number = i as u32;
1683            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1684            let topic = format!("greptimedb_topic{}", i);
1685            let regions = table_metadata_manager
1686                .topic_region_manager
1687                .regions(&topic)
1688                .await
1689                .unwrap()
1690                .into_keys()
1691                .collect::<Vec<_>>();
1692            assert_eq!(regions.len(), 8);
1693            assert!(regions.contains(&region_id));
1694        }
1695    }
1696
1697    #[tokio::test]
1698    async fn test_create_logic_tables_metadata() {
1699        let mem_kv = Arc::new(MemoryKvBackend::default());
1700        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1701        let region_route = new_test_region_route();
1702        let region_routes = vec![region_route.clone()];
1703        let table_info: RawTableInfo =
1704            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1705        let table_id = table_info.ident.table_id;
1706        let table_route_value = TableRouteValue::physical(region_routes.clone());
1707
1708        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1709        // creates metadata.
1710        table_metadata_manager
1711            .create_logical_tables_metadata(tables_data.clone())
1712            .await
1713            .unwrap();
1714
1715        // if metadata was already created, it should be ok.
1716        assert!(
1717            table_metadata_manager
1718                .create_logical_tables_metadata(tables_data)
1719                .await
1720                .is_ok()
1721        );
1722
1723        let mut modified_region_routes = region_routes.clone();
1724        modified_region_routes.push(new_region_route(2, 3));
1725        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1726        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1727        // if remote metadata was exists, it should return an error.
1728        assert!(
1729            table_metadata_manager
1730                .create_logical_tables_metadata(modified_tables_data)
1731                .await
1732                .is_err()
1733        );
1734
1735        let (remote_table_info, remote_table_route) = table_metadata_manager
1736            .get_full_table_info(table_id)
1737            .await
1738            .unwrap();
1739
1740        assert_eq!(
1741            remote_table_info.unwrap().into_inner().table_info,
1742            table_info
1743        );
1744        assert_eq!(
1745            remote_table_route
1746                .unwrap()
1747                .into_inner()
1748                .region_routes()
1749                .unwrap(),
1750            &region_routes
1751        );
1752    }
1753
1754    #[tokio::test]
1755    async fn test_create_many_logical_tables_metadata() {
1756        let kv_backend = Arc::new(MemoryKvBackend::default());
1757        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1758
1759        let mut tables_data = vec![];
1760        for i in 0..128 {
1761            let table_id = i + 1;
1762            let regin_number = table_id * 3;
1763            let region_id = RegionId::new(table_id, regin_number);
1764            let region_route = new_region_route(region_id.as_u64(), 2);
1765            let region_routes = vec![region_route.clone()];
1766            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1767                table_id,
1768                &format!("my_table_{}", table_id),
1769                region_routes.iter().map(|r| r.region.id.region_number()),
1770            )
1771            .into();
1772            let table_route_value = TableRouteValue::physical(region_routes.clone());
1773
1774            tables_data.push((table_info, table_route_value));
1775        }
1776
1777        // creates metadata.
1778        table_metadata_manager
1779            .create_logical_tables_metadata(tables_data)
1780            .await
1781            .unwrap();
1782    }
1783
1784    #[tokio::test]
1785    async fn test_delete_table_metadata() {
1786        let mem_kv = Arc::new(MemoryKvBackend::default());
1787        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1788        let region_route = new_test_region_route();
1789        let region_routes = &vec![region_route.clone()];
1790        let table_info: RawTableInfo =
1791            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1792        let table_id = table_info.ident.table_id;
1793        let datanode_id = 2;
1794        let region_wal_options = create_mock_region_wal_options();
1795        let serialized_region_wal_options = region_wal_options
1796            .iter()
1797            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1798            .collect::<HashMap<_, _>>();
1799
1800        // creates metadata.
1801        create_physical_table_metadata(
1802            &table_metadata_manager,
1803            table_info.clone(),
1804            region_routes.clone(),
1805            serialized_region_wal_options,
1806        )
1807        .await
1808        .unwrap();
1809
1810        let table_name = TableName::new(
1811            table_info.catalog_name,
1812            table_info.schema_name,
1813            table_info.name,
1814        );
1815        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1816        // deletes metadata.
1817        table_metadata_manager
1818            .delete_table_metadata(
1819                table_id,
1820                &table_name,
1821                table_route_value,
1822                &region_wal_options,
1823            )
1824            .await
1825            .unwrap();
1826        // Should be ignored.
1827        table_metadata_manager
1828            .delete_table_metadata(
1829                table_id,
1830                &table_name,
1831                table_route_value,
1832                &region_wal_options,
1833            )
1834            .await
1835            .unwrap();
1836        assert!(
1837            table_metadata_manager
1838                .table_info_manager()
1839                .get(table_id)
1840                .await
1841                .unwrap()
1842                .is_none()
1843        );
1844        assert!(
1845            table_metadata_manager
1846                .table_route_manager()
1847                .table_route_storage()
1848                .get(table_id)
1849                .await
1850                .unwrap()
1851                .is_none()
1852        );
1853        assert!(
1854            table_metadata_manager
1855                .datanode_table_manager()
1856                .tables(datanode_id)
1857                .try_collect::<Vec<_>>()
1858                .await
1859                .unwrap()
1860                .is_empty()
1861        );
1862        // Checks removed values
1863        let table_info = table_metadata_manager
1864            .table_info_manager()
1865            .get(table_id)
1866            .await
1867            .unwrap();
1868        assert!(table_info.is_none());
1869        let table_route = table_metadata_manager
1870            .table_route_manager()
1871            .table_route_storage()
1872            .get(table_id)
1873            .await
1874            .unwrap();
1875        assert!(table_route.is_none());
1876        // Logical delete removes the topic region mapping as well.
1877        let regions = table_metadata_manager
1878            .topic_region_manager
1879            .regions("greptimedb_topic0")
1880            .await
1881            .unwrap();
1882        assert_eq!(regions.len(), 0);
1883        let regions = table_metadata_manager
1884            .topic_region_manager
1885            .regions("greptimedb_topic1")
1886            .await
1887            .unwrap();
1888        assert_eq!(regions.len(), 0);
1889    }
1890
1891    #[tokio::test]
1892    async fn test_rename_table() {
1893        let mem_kv = Arc::new(MemoryKvBackend::default());
1894        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1895        let region_route = new_test_region_route();
1896        let region_routes = vec![region_route.clone()];
1897        let table_info: RawTableInfo =
1898            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1899        let table_id = table_info.ident.table_id;
1900        // creates metadata.
1901        create_physical_table_metadata(
1902            &table_metadata_manager,
1903            table_info.clone(),
1904            region_routes.clone(),
1905            HashMap::new(),
1906        )
1907        .await
1908        .unwrap();
1909
1910        let new_table_name = "another_name".to_string();
1911        let table_info_value =
1912            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1913
1914        table_metadata_manager
1915            .rename_table(&table_info_value, new_table_name.clone())
1916            .await
1917            .unwrap();
1918        // if remote metadata was updated, it should be ok.
1919        table_metadata_manager
1920            .rename_table(&table_info_value, new_table_name.clone())
1921            .await
1922            .unwrap();
1923        let mut modified_table_info = table_info.clone();
1924        modified_table_info.name = "hi".to_string();
1925        let modified_table_info_value =
1926            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1927        // if the table_info_value is wrong, it should return an error.
1928        // The ABA problem.
1929        assert!(
1930            table_metadata_manager
1931                .rename_table(&modified_table_info_value, new_table_name.clone())
1932                .await
1933                .is_err()
1934        );
1935
1936        let old_table_name = TableNameKey::new(
1937            &table_info.catalog_name,
1938            &table_info.schema_name,
1939            &table_info.name,
1940        );
1941        let new_table_name = TableNameKey::new(
1942            &table_info.catalog_name,
1943            &table_info.schema_name,
1944            &new_table_name,
1945        );
1946
1947        assert!(
1948            table_metadata_manager
1949                .table_name_manager()
1950                .get(old_table_name)
1951                .await
1952                .unwrap()
1953                .is_none()
1954        );
1955
1956        assert_eq!(
1957            table_metadata_manager
1958                .table_name_manager()
1959                .get(new_table_name)
1960                .await
1961                .unwrap()
1962                .unwrap()
1963                .table_id(),
1964            table_id
1965        );
1966    }
1967
1968    #[tokio::test]
1969    async fn test_update_table_info() {
1970        let mem_kv = Arc::new(MemoryKvBackend::default());
1971        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1972        let region_route = new_test_region_route();
1973        let region_routes = vec![region_route.clone()];
1974        let table_info: RawTableInfo =
1975            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1976        let table_id = table_info.ident.table_id;
1977        // creates metadata.
1978        create_physical_table_metadata(
1979            &table_metadata_manager,
1980            table_info.clone(),
1981            region_routes.clone(),
1982            HashMap::new(),
1983        )
1984        .await
1985        .unwrap();
1986
1987        let mut new_table_info = table_info.clone();
1988        new_table_info.name = "hi".to_string();
1989        let current_table_info_value =
1990            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1991        // should be ok.
1992        table_metadata_manager
1993            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1994            .await
1995            .unwrap();
1996        // if table info was updated, it should be ok.
1997        table_metadata_manager
1998            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1999            .await
2000            .unwrap();
2001
2002        // updated table_info should equal the `new_table_info`
2003        let updated_table_info = table_metadata_manager
2004            .table_info_manager()
2005            .get(table_id)
2006            .await
2007            .unwrap()
2008            .unwrap()
2009            .into_inner();
2010        assert_eq!(updated_table_info.table_info, new_table_info);
2011
2012        let mut wrong_table_info = table_info.clone();
2013        wrong_table_info.name = "wrong".to_string();
2014        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2015            current_table_info_value.update(wrong_table_info),
2016        );
2017        // if the current_table_info_value is wrong, it should return an error.
2018        // The ABA problem.
2019        assert!(
2020            table_metadata_manager
2021                .update_table_info(&wrong_table_info_value, None, new_table_info)
2022                .await
2023                .is_err()
2024        )
2025    }
2026
2027    #[tokio::test]
2028    async fn test_update_table_leader_region_status() {
2029        let mem_kv = Arc::new(MemoryKvBackend::default());
2030        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2031        let datanode = 1;
2032        let region_routes = vec![
2033            RegionRoute {
2034                region: Region {
2035                    id: 1.into(),
2036                    name: "r1".to_string(),
2037                    partition: None,
2038                    attrs: BTreeMap::new(),
2039                    partition_expr: Default::default(),
2040                },
2041                leader_peer: Some(Peer::new(datanode, "a2")),
2042                leader_state: Some(LeaderState::Downgrading),
2043                follower_peers: vec![],
2044                leader_down_since: Some(current_time_millis()),
2045            },
2046            RegionRoute {
2047                region: Region {
2048                    id: 2.into(),
2049                    name: "r2".to_string(),
2050                    partition: None,
2051                    attrs: BTreeMap::new(),
2052                    partition_expr: Default::default(),
2053                },
2054                leader_peer: Some(Peer::new(datanode, "a1")),
2055                leader_state: None,
2056                follower_peers: vec![],
2057                leader_down_since: None,
2058            },
2059        ];
2060        let table_info: RawTableInfo =
2061            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2062        let table_id = table_info.ident.table_id;
2063        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2064            TableRouteValue::physical(region_routes.clone()),
2065        );
2066
2067        // creates metadata.
2068        create_physical_table_metadata(
2069            &table_metadata_manager,
2070            table_info.clone(),
2071            region_routes.clone(),
2072            HashMap::new(),
2073        )
2074        .await
2075        .unwrap();
2076
2077        table_metadata_manager
2078            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2079                if region_route.leader_state.is_some() {
2080                    None
2081                } else {
2082                    Some(Some(LeaderState::Downgrading))
2083                }
2084            })
2085            .await
2086            .unwrap();
2087
2088        let updated_route_value = table_metadata_manager
2089            .table_route_manager()
2090            .table_route_storage()
2091            .get(table_id)
2092            .await
2093            .unwrap()
2094            .unwrap();
2095
2096        assert_eq!(
2097            updated_route_value.region_routes().unwrap()[0].leader_state,
2098            Some(LeaderState::Downgrading)
2099        );
2100
2101        assert!(
2102            updated_route_value.region_routes().unwrap()[0]
2103                .leader_down_since
2104                .is_some()
2105        );
2106
2107        assert_eq!(
2108            updated_route_value.region_routes().unwrap()[1].leader_state,
2109            Some(LeaderState::Downgrading)
2110        );
2111        assert!(
2112            updated_route_value.region_routes().unwrap()[1]
2113                .leader_down_since
2114                .is_some()
2115        );
2116    }
2117
2118    async fn assert_datanode_table(
2119        table_metadata_manager: &TableMetadataManager,
2120        table_id: u32,
2121        region_routes: &[RegionRoute],
2122    ) {
2123        let region_distribution = region_distribution(region_routes);
2124        for (datanode, regions) in region_distribution {
2125            let got = table_metadata_manager
2126                .datanode_table_manager()
2127                .get(&DatanodeTableKey::new(datanode, table_id))
2128                .await
2129                .unwrap()
2130                .unwrap();
2131
2132            assert_eq!(got.regions, regions.leader_regions);
2133            assert_eq!(got.follower_regions, regions.follower_regions);
2134        }
2135    }
2136
2137    #[tokio::test]
2138    async fn test_update_table_route() {
2139        let mem_kv = Arc::new(MemoryKvBackend::default());
2140        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2141        let region_route = new_test_region_route();
2142        let region_routes = vec![region_route.clone()];
2143        let table_info: RawTableInfo =
2144            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2145        let table_id = table_info.ident.table_id;
2146        let engine = table_info.meta.engine.as_str();
2147        let region_storage_path =
2148            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2149        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2150            TableRouteValue::physical(region_routes.clone()),
2151        );
2152
2153        // creates metadata.
2154        create_physical_table_metadata(
2155            &table_metadata_manager,
2156            table_info.clone(),
2157            region_routes.clone(),
2158            HashMap::new(),
2159        )
2160        .await
2161        .unwrap();
2162
2163        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2164        let new_region_routes = vec![
2165            new_region_route(1, 1),
2166            new_region_route(2, 2),
2167            new_region_route(3, 3),
2168        ];
2169        // it should be ok.
2170        table_metadata_manager
2171            .update_table_route(
2172                table_id,
2173                RegionInfo {
2174                    engine: engine.to_string(),
2175                    region_storage_path: region_storage_path.to_string(),
2176                    region_options: HashMap::new(),
2177                    region_wal_options: HashMap::new(),
2178                },
2179                &current_table_route_value,
2180                new_region_routes.clone(),
2181                &HashMap::new(),
2182                &HashMap::new(),
2183            )
2184            .await
2185            .unwrap();
2186        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2187
2188        // if the table route was updated. it should be ok.
2189        table_metadata_manager
2190            .update_table_route(
2191                table_id,
2192                RegionInfo {
2193                    engine: engine.to_string(),
2194                    region_storage_path: region_storage_path.to_string(),
2195                    region_options: HashMap::new(),
2196                    region_wal_options: HashMap::new(),
2197                },
2198                &current_table_route_value,
2199                new_region_routes.clone(),
2200                &HashMap::new(),
2201                &HashMap::new(),
2202            )
2203            .await
2204            .unwrap();
2205
2206        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2207            current_table_route_value
2208                .inner
2209                .update(new_region_routes.clone())
2210                .unwrap(),
2211        );
2212        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2213        // it should be ok.
2214        table_metadata_manager
2215            .update_table_route(
2216                table_id,
2217                RegionInfo {
2218                    engine: engine.to_string(),
2219                    region_storage_path: region_storage_path.to_string(),
2220                    region_options: HashMap::new(),
2221                    region_wal_options: HashMap::new(),
2222                },
2223                &current_table_route_value,
2224                new_region_routes.clone(),
2225                &HashMap::new(),
2226                &HashMap::new(),
2227            )
2228            .await
2229            .unwrap();
2230        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2231
2232        // if the current_table_route_value is wrong, it should return an error.
2233        // The ABA problem.
2234        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2235            current_table_route_value
2236                .update(vec![
2237                    new_region_route(1, 1),
2238                    new_region_route(2, 2),
2239                    new_region_route(3, 3),
2240                    new_region_route(4, 4),
2241                ])
2242                .unwrap(),
2243        );
2244        assert!(
2245            table_metadata_manager
2246                .update_table_route(
2247                    table_id,
2248                    RegionInfo {
2249                        engine: engine.to_string(),
2250                        region_storage_path: region_storage_path.to_string(),
2251                        region_options: HashMap::new(),
2252                        region_wal_options: HashMap::new(),
2253                    },
2254                    &wrong_table_route_value,
2255                    new_region_routes,
2256                    &HashMap::new(),
2257                    &HashMap::new(),
2258                )
2259                .await
2260                .is_err()
2261        );
2262    }
2263
2264    #[tokio::test]
2265    async fn test_destroy_table_metadata() {
2266        let mem_kv = Arc::new(MemoryKvBackend::default());
2267        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2268        let table_id = 1025;
2269        let table_name = "foo";
2270        let task = test_create_table_task(table_name, table_id);
2271        let options = create_mock_region_wal_options();
2272        let serialized_options = options
2273            .iter()
2274            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2275            .collect::<HashMap<_, _>>();
2276        table_metadata_manager
2277            .create_table_metadata(
2278                task.table_info,
2279                TableRouteValue::physical(vec![
2280                    RegionRoute {
2281                        region: Region::new_test(RegionId::new(table_id, 1)),
2282                        leader_peer: Some(Peer::empty(1)),
2283                        follower_peers: vec![Peer::empty(5)],
2284                        leader_state: None,
2285                        leader_down_since: None,
2286                    },
2287                    RegionRoute {
2288                        region: Region::new_test(RegionId::new(table_id, 2)),
2289                        leader_peer: Some(Peer::empty(2)),
2290                        follower_peers: vec![Peer::empty(4)],
2291                        leader_state: None,
2292                        leader_down_since: None,
2293                    },
2294                    RegionRoute {
2295                        region: Region::new_test(RegionId::new(table_id, 3)),
2296                        leader_peer: Some(Peer::empty(3)),
2297                        follower_peers: vec![],
2298                        leader_state: None,
2299                        leader_down_since: None,
2300                    },
2301                ]),
2302                serialized_options,
2303            )
2304            .await
2305            .unwrap();
2306        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2307        let table_route_value = table_metadata_manager
2308            .table_route_manager
2309            .table_route_storage()
2310            .get_with_raw_bytes(table_id)
2311            .await
2312            .unwrap()
2313            .unwrap();
2314        table_metadata_manager
2315            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2316            .await
2317            .unwrap();
2318        assert!(mem_kv.is_empty());
2319    }
2320
2321    #[tokio::test]
2322    async fn test_restore_table_metadata() {
2323        let mem_kv = Arc::new(MemoryKvBackend::default());
2324        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2325        let table_id = 1025;
2326        let table_name = "foo";
2327        let task = test_create_table_task(table_name, table_id);
2328        let options = create_mock_region_wal_options();
2329        let serialized_options = options
2330            .iter()
2331            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2332            .collect::<HashMap<_, _>>();
2333        table_metadata_manager
2334            .create_table_metadata(
2335                task.table_info,
2336                TableRouteValue::physical(vec![
2337                    RegionRoute {
2338                        region: Region::new_test(RegionId::new(table_id, 1)),
2339                        leader_peer: Some(Peer::empty(1)),
2340                        follower_peers: vec![Peer::empty(5)],
2341                        leader_state: None,
2342                        leader_down_since: None,
2343                    },
2344                    RegionRoute {
2345                        region: Region::new_test(RegionId::new(table_id, 2)),
2346                        leader_peer: Some(Peer::empty(2)),
2347                        follower_peers: vec![Peer::empty(4)],
2348                        leader_state: None,
2349                        leader_down_since: None,
2350                    },
2351                    RegionRoute {
2352                        region: Region::new_test(RegionId::new(table_id, 3)),
2353                        leader_peer: Some(Peer::empty(3)),
2354                        follower_peers: vec![],
2355                        leader_state: None,
2356                        leader_down_since: None,
2357                    },
2358                ]),
2359                serialized_options,
2360            )
2361            .await
2362            .unwrap();
2363        let expected_result = mem_kv.dump();
2364        let table_route_value = table_metadata_manager
2365            .table_route_manager
2366            .table_route_storage()
2367            .get_with_raw_bytes(table_id)
2368            .await
2369            .unwrap()
2370            .unwrap();
2371        let region_routes = table_route_value.region_routes().unwrap();
2372        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2373        let table_route_value = TableRouteValue::physical(region_routes.clone());
2374        table_metadata_manager
2375            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2376            .await
2377            .unwrap();
2378        table_metadata_manager
2379            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2380            .await
2381            .unwrap();
2382        let kvs = mem_kv.dump();
2383        assert_eq!(kvs, expected_result);
2384        // Should be ignored.
2385        table_metadata_manager
2386            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2387            .await
2388            .unwrap();
2389        let kvs = mem_kv.dump();
2390        assert_eq!(kvs, expected_result);
2391    }
2392
2393    #[tokio::test]
2394    async fn test_create_update_view_info() {
2395        let mem_kv = Arc::new(MemoryKvBackend::default());
2396        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2397
2398        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2399
2400        let view_id = view_info.ident.table_id;
2401
2402        let logical_plan: Vec<u8> = vec![1, 2, 3];
2403        let columns = vec!["a".to_string()];
2404        let plan_columns = vec!["number".to_string()];
2405        let table_names = new_test_table_names();
2406        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2407
2408        // Create metadata
2409        table_metadata_manager
2410            .create_view_metadata(
2411                view_info.clone(),
2412                logical_plan.clone(),
2413                table_names.clone(),
2414                columns.clone(),
2415                plan_columns.clone(),
2416                definition.to_string(),
2417            )
2418            .await
2419            .unwrap();
2420
2421        {
2422            // assert view info
2423            let current_view_info = table_metadata_manager
2424                .view_info_manager()
2425                .get(view_id)
2426                .await
2427                .unwrap()
2428                .unwrap()
2429                .into_inner();
2430            assert_eq!(current_view_info.view_info, logical_plan);
2431            assert_eq!(current_view_info.table_names, table_names);
2432            assert_eq!(current_view_info.definition, definition);
2433            assert_eq!(current_view_info.columns, columns);
2434            assert_eq!(current_view_info.plan_columns, plan_columns);
2435            // assert table info
2436            let current_table_info = table_metadata_manager
2437                .table_info_manager()
2438                .get(view_id)
2439                .await
2440                .unwrap()
2441                .unwrap()
2442                .into_inner();
2443            assert_eq!(current_table_info.table_info, view_info);
2444        }
2445
2446        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2447        let new_table_names = {
2448            let mut set = HashSet::new();
2449            set.insert(TableName {
2450                catalog_name: "greptime".to_string(),
2451                schema_name: "public".to_string(),
2452                table_name: "b_table".to_string(),
2453            });
2454            set.insert(TableName {
2455                catalog_name: "greptime".to_string(),
2456                schema_name: "public".to_string(),
2457                table_name: "c_table".to_string(),
2458            });
2459            set
2460        };
2461        let new_columns = vec!["b".to_string()];
2462        let new_plan_columns = vec!["number2".to_string()];
2463        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2464
2465        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2466            logical_plan.clone(),
2467            table_names,
2468            columns,
2469            plan_columns,
2470            definition.to_string(),
2471        ));
2472        // should be ok.
2473        table_metadata_manager
2474            .update_view_info(
2475                view_id,
2476                &current_view_info_value,
2477                new_logical_plan.clone(),
2478                new_table_names.clone(),
2479                new_columns.clone(),
2480                new_plan_columns.clone(),
2481                new_definition.to_string(),
2482            )
2483            .await
2484            .unwrap();
2485        // if table info was updated, it should be ok.
2486        table_metadata_manager
2487            .update_view_info(
2488                view_id,
2489                &current_view_info_value,
2490                new_logical_plan.clone(),
2491                new_table_names.clone(),
2492                new_columns.clone(),
2493                new_plan_columns.clone(),
2494                new_definition.to_string(),
2495            )
2496            .await
2497            .unwrap();
2498
2499        // updated view_info should equal the `new_logical_plan`
2500        let updated_view_info = table_metadata_manager
2501            .view_info_manager()
2502            .get(view_id)
2503            .await
2504            .unwrap()
2505            .unwrap()
2506            .into_inner();
2507        assert_eq!(updated_view_info.view_info, new_logical_plan);
2508        assert_eq!(updated_view_info.table_names, new_table_names);
2509        assert_eq!(updated_view_info.definition, new_definition);
2510        assert_eq!(updated_view_info.columns, new_columns);
2511        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2512
2513        let wrong_view_info = logical_plan.clone();
2514        let wrong_definition = "wrong_definition";
2515        let wrong_view_info_value =
2516            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2517                wrong_view_info,
2518                new_table_names.clone(),
2519                new_columns.clone(),
2520                new_plan_columns.clone(),
2521                wrong_definition.to_string(),
2522            ));
2523        // if the current_view_info_value is wrong, it should return an error.
2524        // The ABA problem.
2525        assert!(
2526            table_metadata_manager
2527                .update_view_info(
2528                    view_id,
2529                    &wrong_view_info_value,
2530                    new_logical_plan.clone(),
2531                    new_table_names.clone(),
2532                    vec!["c".to_string()],
2533                    vec!["number3".to_string()],
2534                    wrong_definition.to_string(),
2535                )
2536                .await
2537                .is_err()
2538        );
2539
2540        // The view_info is not changed.
2541        let current_view_info = table_metadata_manager
2542            .view_info_manager()
2543            .get(view_id)
2544            .await
2545            .unwrap()
2546            .unwrap()
2547            .into_inner();
2548        assert_eq!(current_view_info.view_info, new_logical_plan);
2549        assert_eq!(current_view_info.table_names, new_table_names);
2550        assert_eq!(current_view_info.definition, new_definition);
2551        assert_eq!(current_view_info.columns, new_columns);
2552        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2553    }
2554
2555    #[test]
2556    fn test_region_role_set_deserialize() {
2557        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2558        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2559        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2560        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2561
2562        let s = r#"[1, 2, 3]"#;
2563        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2564        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2565        assert!(region_role_set.follower_regions.is_empty());
2566    }
2567
2568    #[test]
2569    fn test_region_distribution_deserialize() {
2570        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2571        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2572        assert_eq!(region_distribution.len(), 2);
2573        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2574        assert!(region_distribution[&1].follower_regions.is_empty());
2575        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2576        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2577    }
2578}