common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{RawTableInfo, TableId};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186// The legacy topic key prefix is used to store the topic name in previous versions.
187pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190/// The election key.
191pub const ELECTION_KEY: &str = "__metasrv_election";
192/// The root key of metasrv election candidates.
193pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195/// The keys with these prefixes will be loaded into the cache when the leader starts.
196pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197    TABLE_NAME_KEY_PREFIX,
198    CATALOG_NAME_KEY_PREFIX,
199    SCHEMA_NAME_KEY_PREFIX,
200    TABLE_ROUTE_PREFIX,
201    NODE_ADDRESS_PREFIX,
202];
203
204/// A set of regions with the same role.
205#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207    /// Leader regions.
208    pub leader_regions: Vec<RegionNumber>,
209    /// Follower regions.
210    pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215    where
216        D: serde::Deserializer<'de>,
217    {
218        #[derive(Deserialize)]
219        #[serde(untagged)]
220        enum RegionRoleSetOrLeaderOnly {
221            Full {
222                leader_regions: Vec<RegionNumber>,
223                follower_regions: Vec<RegionNumber>,
224            },
225            LeaderOnly(Vec<RegionNumber>),
226        }
227        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228            RegionRoleSetOrLeaderOnly::Full {
229                leader_regions,
230                follower_regions,
231            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233                Ok(RegionRoleSet::new(leader_regions, vec![]))
234            }
235        }
236    }
237}
238
239impl RegionRoleSet {
240    /// Create a new region role set.
241    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242        Self {
243            leader_regions,
244            follower_regions,
245        }
246    }
247
248    /// Add a leader region to the set.
249    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250        self.leader_regions.push(region_number);
251    }
252
253    /// Add a follower region to the set.
254    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255        self.follower_regions.push(region_number);
256    }
257
258    /// Sort the regions.
259    pub fn sort(&mut self) {
260        self.follower_regions.sort();
261        self.leader_regions.sort();
262    }
263}
264
265/// The distribution of regions.
266///
267/// The key is the datanode id, the value is the region role set.
268pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270/// The id of flow.
271pub type FlowId = u32;
272/// The partition of flow.
273pub type FlowPartitionId = u32;
274
275lazy_static! {
276    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280    static ref TABLE_INFO_KEY_PATTERN: Regex =
281        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285    static ref VIEW_INFO_KEY_PATTERN: Regex =
286        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307    ))
308    .unwrap();
309}
310
311lazy_static! {
312    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
313    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315    ))
316        .unwrap();
317}
318
319lazy_static! {
320    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
321    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323    ))
324        .unwrap();
325}
326
327lazy_static! {
328    static ref NODE_ADDRESS_PATTERN: Regex =
329        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340    ))
341    .unwrap();
342}
343
344/// The key of metadata.
345pub trait MetadataKey<'a, T> {
346    fn to_bytes(&self) -> Vec<u8>;
347
348    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355    fn from(value: Vec<u8>) -> Self {
356        Self(value)
357    }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361    fn to_bytes(&self) -> Vec<u8> {
362        self.0.clone()
363    }
364
365    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366        Ok(BytesAdapter(bytes.to_vec()))
367    }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371    fn build_get_op(
372        &self,
373    ) -> (
374        TxnOp,
375        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376    );
377}
378
379pub trait MetadataValue {
380    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381    where
382        Self: Sized;
383
384    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390    table_name_manager: TableNameManager,
391    table_info_manager: TableInfoManager,
392    view_info_manager: ViewInfoManager,
393    datanode_table_manager: DatanodeTableManager,
394    catalog_manager: CatalogManager,
395    schema_manager: SchemaManager,
396    table_route_manager: TableRouteManager,
397    table_repart_manager: TableRepartManager,
398    tombstone_manager: TombstoneManager,
399    topic_name_manager: TopicNameManager,
400    topic_region_manager: TopicRegionManager,
401    kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406    ($got:expr, $expected_value:expr, $name:expr) => {
407        ensure!(
408            $got == $expected_value,
409            error::UnexpectedSnafu {
410                err_msg: format!(
411                    "Reads the different value: {:?} during {}, expected: {:?}",
412                    $got, $name, $expected_value
413                )
414            }
415        );
416    };
417}
418
419/// A struct containing a deserialized value(`inner`) and an original bytes.
420///
421/// - Serialize behaviors:
422///
423/// The `inner` field will be ignored.
424///
425/// - Deserialize behaviors:
426///
427/// The `inner` field will be deserialized from the `bytes` field.
428pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429    // The original bytes of the inner.
430    bytes: Bytes,
431    // The value was deserialized from the original bytes.
432    inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436    type Target = T;
437
438    fn deref(&self) -> &Self::Target {
439        &self.inner
440    }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444    fn deref_mut(&mut self) -> &mut Self::Target {
445        &mut self.inner
446    }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454            self.inner, self.bytes
455        )
456    }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460    /// - Serialize behaviors:
461    ///
462    /// The `inner` field will be ignored.
463    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464    where
465        S: serde::Serializer,
466    {
467        // Safety: The original bytes are always JSON encoded.
468        // It's more efficiently than `serialize_bytes`.
469        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470    }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474    for DeserializedValueWithBytes<T>
475{
476    /// - Deserialize behaviors:
477    ///
478    /// The `inner` field will be deserialized from the `bytes` field.
479    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480    where
481        D: serde::Deserializer<'de>,
482    {
483        let buf = String::deserialize(deserializer)?;
484        let bytes = Bytes::from(buf);
485
486        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489        Ok(value)
490    }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494    fn clone(&self) -> Self {
495        Self {
496            bytes: self.bytes.clone(),
497            inner: self.inner.clone(),
498        }
499    }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503    /// Returns a struct containing a deserialized value and an original `bytes`.
504    /// It accepts original bytes of inner.
505    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506        let inner = T::try_from_raw_value(&bytes)?;
507        Ok(Self { bytes, inner })
508    }
509
510    /// Returns a struct containing a deserialized value and an original `bytes`.
511    /// It accepts original bytes of inner.
512    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514    }
515
516    pub fn into_inner(self) -> T {
517        self.inner
518    }
519
520    pub fn get_inner_ref(&self) -> &T {
521        &self.inner
522    }
523
524    /// Returns original `bytes`
525    pub fn get_raw_bytes(&self) -> Vec<u8> {
526        self.bytes.to_vec()
527    }
528
529    #[cfg(any(test, feature = "testing"))]
530    pub fn from_inner(inner: T) -> Self {
531        let bytes = serde_json::to_vec(&inner).unwrap();
532
533        Self {
534            bytes: Bytes::from(bytes),
535            inner,
536        }
537    }
538}
539
540impl TableMetadataManager {
541    pub fn new(kv_backend: KvBackendRef) -> Self {
542        TableMetadataManager {
543            table_name_manager: TableNameManager::new(kv_backend.clone()),
544            table_info_manager: TableInfoManager::new(kv_backend.clone()),
545            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547            catalog_manager: CatalogManager::new(kv_backend.clone()),
548            schema_manager: SchemaManager::new(kv_backend.clone()),
549            table_route_manager: TableRouteManager::new(kv_backend.clone()),
550            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554            kv_backend,
555        }
556    }
557
558    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
559    pub fn new_with_custom_tombstone_prefix(
560        kv_backend: KvBackendRef,
561        tombstone_prefix: &str,
562    ) -> Self {
563        Self {
564            table_name_manager: TableNameManager::new(kv_backend.clone()),
565            table_info_manager: TableInfoManager::new(kv_backend.clone()),
566            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568            catalog_manager: CatalogManager::new(kv_backend.clone()),
569            schema_manager: SchemaManager::new(kv_backend.clone()),
570            table_route_manager: TableRouteManager::new(kv_backend.clone()),
571            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572            tombstone_manager: TombstoneManager::new_with_prefix(
573                kv_backend.clone(),
574                tombstone_prefix,
575            ),
576            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578            kv_backend,
579        }
580    }
581
582    pub async fn init(&self) -> Result<()> {
583        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585        self.catalog_manager().create(catalog_name, true).await?;
586
587        let internal_schemas = [
588            DEFAULT_SCHEMA_NAME,
589            INFORMATION_SCHEMA_NAME,
590            DEFAULT_PRIVATE_SCHEMA_NAME,
591        ];
592
593        for schema_name in internal_schemas {
594            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596            self.schema_manager().create(schema_key, None, true).await?;
597        }
598
599        Ok(())
600    }
601
602    pub fn table_name_manager(&self) -> &TableNameManager {
603        &self.table_name_manager
604    }
605
606    pub fn table_info_manager(&self) -> &TableInfoManager {
607        &self.table_info_manager
608    }
609
610    pub fn view_info_manager(&self) -> &ViewInfoManager {
611        &self.view_info_manager
612    }
613
614    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615        &self.datanode_table_manager
616    }
617
618    pub fn catalog_manager(&self) -> &CatalogManager {
619        &self.catalog_manager
620    }
621
622    pub fn schema_manager(&self) -> &SchemaManager {
623        &self.schema_manager
624    }
625
626    pub fn table_route_manager(&self) -> &TableRouteManager {
627        &self.table_route_manager
628    }
629
630    pub fn table_repart_manager(&self) -> &TableRepartManager {
631        &self.table_repart_manager
632    }
633
634    pub fn topic_name_manager(&self) -> &TopicNameManager {
635        &self.topic_name_manager
636    }
637
638    pub fn topic_region_manager(&self) -> &TopicRegionManager {
639        &self.topic_region_manager
640    }
641
642    pub fn kv_backend(&self) -> &KvBackendRef {
643        &self.kv_backend
644    }
645
646    pub async fn get_full_table_info(
647        &self,
648        table_id: TableId,
649    ) -> Result<(
650        Option<DeserializedValueWithBytes<TableInfoValue>>,
651        Option<DeserializedValueWithBytes<TableRouteValue>>,
652    )> {
653        let table_info_key = TableInfoKey::new(table_id);
654        let table_route_key = TableRouteKey::new(table_id);
655        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659        let mut res = self.kv_backend.txn(txn).await?;
660        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663        Ok((table_info_value, table_route_value))
664    }
665
666    /// Creates metadata for view and returns an error if different metadata exists.
667    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
668    /// Parameters include:
669    /// - `view_info`: the encoded logical plan
670    /// - `table_names`: the resolved fully table names in logical plan
671    /// - `columns`: the view columns
672    /// - `plan_columns`: the original plan columns
673    /// - `definition`: The SQL to create the view
674    ///
675    pub async fn create_view_metadata(
676        &self,
677        view_info: RawTableInfo,
678        raw_logical_plan: Vec<u8>,
679        table_names: HashSet<TableName>,
680        columns: Vec<String>,
681        plan_columns: Vec<String>,
682        definition: String,
683    ) -> Result<()> {
684        let view_id = view_info.ident.table_id;
685
686        // Creates view name.
687        let view_name = TableNameKey::new(
688            &view_info.catalog_name,
689            &view_info.schema_name,
690            &view_info.name,
691        );
692        let create_table_name_txn = self
693            .table_name_manager()
694            .build_create_txn(&view_name, view_id)?;
695
696        // Creates table info.
697        let table_info_value = TableInfoValue::new(view_info);
698
699        let (create_table_info_txn, on_create_table_info_failure) = self
700            .table_info_manager()
701            .build_create_txn(view_id, &table_info_value)?;
702
703        // Creates view info
704        let view_info_value = ViewInfoValue::new(
705            raw_logical_plan,
706            table_names,
707            columns,
708            plan_columns,
709            definition,
710        );
711        let (create_view_info_txn, on_create_view_info_failure) = self
712            .view_info_manager()
713            .build_create_txn(view_id, &view_info_value)?;
714
715        let txn = Txn::merge_all(vec![
716            create_table_name_txn,
717            create_table_info_txn,
718            create_view_info_txn,
719        ]);
720
721        let mut r = self.kv_backend.txn(txn).await?;
722
723        // Checks whether metadata was already created.
724        if !r.succeeded {
725            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726            let remote_table_info = on_create_table_info_failure(&mut set)?
727                .context(error::UnexpectedSnafu {
728                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729                })?
730                .into_inner();
731
732            let remote_view_info = on_create_view_info_failure(&mut set)?
733                .context(error::UnexpectedSnafu {
734                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735                })?
736                .into_inner();
737
738            let op_name = "the creating view metadata";
739            ensure_values!(remote_table_info, table_info_value, op_name);
740            ensure_values!(remote_view_info, view_info_value, op_name);
741        }
742
743        Ok(())
744    }
745
746    /// Creates metadata for table and returns an error if different metadata exists.
747    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
748    pub async fn create_table_metadata(
749        &self,
750        table_info: RawTableInfo,
751        table_route_value: TableRouteValue,
752        region_wal_options: HashMap<RegionNumber, String>,
753    ) -> Result<()> {
754        let table_id = table_info.ident.table_id;
755        let engine = table_info.meta.engine.clone();
756
757        // Creates table name.
758        let table_name = TableNameKey::new(
759            &table_info.catalog_name,
760            &table_info.schema_name,
761            &table_info.name,
762        );
763        let create_table_name_txn = self
764            .table_name_manager()
765            .build_create_txn(&table_name, table_id)?;
766
767        let region_options = table_info.to_region_options();
768        // Creates table info.
769        let table_info_value = TableInfoValue::new(table_info);
770        let (create_table_info_txn, on_create_table_info_failure) = self
771            .table_info_manager()
772            .build_create_txn(table_id, &table_info_value)?;
773
774        let (create_table_route_txn, on_create_table_route_failure) = self
775            .table_route_manager()
776            .table_route_storage()
777            .build_create_txn(table_id, &table_route_value)?;
778
779        let create_topic_region_txn = self
780            .topic_region_manager
781            .build_create_txn(table_id, &region_wal_options)?;
782
783        let mut txn = Txn::merge_all(vec![
784            create_table_name_txn,
785            create_table_info_txn,
786            create_table_route_txn,
787            create_topic_region_txn,
788        ]);
789
790        if let TableRouteValue::Physical(x) = &table_route_value {
791            let region_storage_path = table_info_value.region_storage_path();
792            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793                table_id,
794                &engine,
795                &region_storage_path,
796                region_options,
797                region_wal_options,
798                region_distribution(&x.region_routes),
799            )?;
800            txn = txn.merge(create_datanode_table_txn);
801        }
802
803        let mut r = self.kv_backend.txn(txn).await?;
804
805        // Checks whether metadata was already created.
806        if !r.succeeded {
807            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808            let remote_table_info = on_create_table_info_failure(&mut set)?
809                .context(error::UnexpectedSnafu {
810                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811                })?
812                .into_inner();
813
814            let remote_table_route = on_create_table_route_failure(&mut set)?
815                .context(error::UnexpectedSnafu {
816                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817                })?
818                .into_inner();
819
820            let op_name = "the creating table metadata";
821            ensure_values!(remote_table_info, table_info_value, op_name);
822            ensure_values!(remote_table_route, table_route_value, op_name);
823        }
824
825        Ok(())
826    }
827
828    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829        // The batch size is max_txn_size / 3 because the size of the `tables_data`
830        // is 3 times the size of the `tables_data`.
831        self.kv_backend.max_txn_ops() / 3
832    }
833
834    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
835    pub async fn create_logical_tables_metadata(
836        &self,
837        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
838    ) -> Result<()> {
839        let len = tables_data.len();
840        let mut txns = Vec::with_capacity(3 * len);
841        struct OnFailure<F1, R1, F2, R2>
842        where
843            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845        {
846            table_info_value: TableInfoValue,
847            on_create_table_info_failure: F1,
848            table_route_value: TableRouteValue,
849            on_create_table_route_failure: F2,
850        }
851        let mut on_failures = Vec::with_capacity(len);
852        for (table_info, table_route_value) in tables_data {
853            let table_id = table_info.ident.table_id;
854
855            // Creates table name.
856            let table_name = TableNameKey::new(
857                &table_info.catalog_name,
858                &table_info.schema_name,
859                &table_info.name,
860            );
861            let create_table_name_txn = self
862                .table_name_manager()
863                .build_create_txn(&table_name, table_id)?;
864            txns.push(create_table_name_txn);
865
866            // Creates table info.
867            let table_info_value = TableInfoValue::new(table_info);
868            let (create_table_info_txn, on_create_table_info_failure) =
869                self.table_info_manager()
870                    .build_create_txn(table_id, &table_info_value)?;
871            txns.push(create_table_info_txn);
872
873            let (create_table_route_txn, on_create_table_route_failure) = self
874                .table_route_manager()
875                .table_route_storage()
876                .build_create_txn(table_id, &table_route_value)?;
877            txns.push(create_table_route_txn);
878
879            on_failures.push(OnFailure {
880                table_info_value,
881                on_create_table_info_failure,
882                table_route_value,
883                on_create_table_route_failure,
884            });
885        }
886
887        let txn = Txn::merge_all(txns);
888        let mut r = self.kv_backend.txn(txn).await?;
889
890        // Checks whether metadata was already created.
891        if !r.succeeded {
892            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893            for on_failure in on_failures {
894                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895                    .context(error::UnexpectedSnafu {
896                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897                    })?
898                    .into_inner();
899
900                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901                    .context(error::UnexpectedSnafu {
902                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903                    })?
904                    .into_inner();
905
906                let op_name = "the creating logical tables metadata";
907                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909            }
910        }
911
912        Ok(())
913    }
914
915    fn table_metadata_keys(
916        &self,
917        table_id: TableId,
918        table_name: &TableName,
919        table_route_value: &TableRouteValue,
920        region_wal_options: &HashMap<RegionNumber, WalOptions>,
921    ) -> Result<Vec<Vec<u8>>> {
922        // Builds keys
923        let datanode_ids = if table_route_value.is_physical() {
924            region_distribution(table_route_value.region_routes()?)
925                .into_keys()
926                .collect()
927        } else {
928            vec![]
929        };
930        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931        let table_name = TableNameKey::new(
932            &table_name.catalog_name,
933            &table_name.schema_name,
934            &table_name.table_name,
935        );
936        let table_info_key = TableInfoKey::new(table_id);
937        let table_route_key = TableRouteKey::new(table_id);
938        let table_repart_key = TableRepartKey::new(table_id);
939        let datanode_table_keys = datanode_ids
940            .into_iter()
941            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942            .collect::<HashSet<_>>();
943        let topic_region_map = self
944            .topic_region_manager
945            .get_topic_region_mapping(table_id, region_wal_options);
946        let topic_region_keys = topic_region_map
947            .iter()
948            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949            .collect::<Vec<_>>();
950        keys.push(table_name.to_bytes());
951        keys.push(table_info_key.to_bytes());
952        keys.push(table_route_key.to_bytes());
953        keys.push(table_repart_key.to_bytes());
954        for key in &datanode_table_keys {
955            keys.push(key.to_bytes());
956        }
957        for key in topic_region_keys {
958            keys.push(key.to_bytes());
959        }
960        Ok(keys)
961    }
962
963    /// Deletes metadata for table **logically**.
964    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
965    pub async fn delete_table_metadata(
966        &self,
967        table_id: TableId,
968        table_name: &TableName,
969        table_route_value: &TableRouteValue,
970        region_wal_options: &HashMap<RegionNumber, WalOptions>,
971    ) -> Result<()> {
972        let keys =
973            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974        self.tombstone_manager.create(keys).await.map(|_| ())
975    }
976
977    /// Deletes metadata tombstone for table **permanently**.
978    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
979    pub async fn delete_table_metadata_tombstone(
980        &self,
981        table_id: TableId,
982        table_name: &TableName,
983        table_route_value: &TableRouteValue,
984        region_wal_options: &HashMap<RegionNumber, WalOptions>,
985    ) -> Result<()> {
986        let table_metadata_keys =
987            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988        self.tombstone_manager
989            .delete(table_metadata_keys)
990            .await
991            .map(|_| ())
992    }
993
994    /// Restores metadata for table.
995    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
996    pub async fn restore_table_metadata(
997        &self,
998        table_id: TableId,
999        table_name: &TableName,
1000        table_route_value: &TableRouteValue,
1001        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002    ) -> Result<()> {
1003        let keys =
1004            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005        self.tombstone_manager.restore(keys).await.map(|_| ())
1006    }
1007
1008    /// Deletes metadata for table **permanently**.
1009    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1010    pub async fn destroy_table_metadata(
1011        &self,
1012        table_id: TableId,
1013        table_name: &TableName,
1014        table_route_value: &TableRouteValue,
1015        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016    ) -> Result<()> {
1017        let keys =
1018            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019        let _ = self
1020            .kv_backend
1021            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022            .await?;
1023        Ok(())
1024    }
1025
1026    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027        let mut keys = Vec::with_capacity(3);
1028        let view_name = TableNameKey::new(
1029            &view_name.catalog_name,
1030            &view_name.schema_name,
1031            &view_name.table_name,
1032        );
1033        let table_info_key = TableInfoKey::new(view_id);
1034        let view_info_key = ViewInfoKey::new(view_id);
1035        keys.push(view_name.to_bytes());
1036        keys.push(table_info_key.to_bytes());
1037        keys.push(view_info_key.to_bytes());
1038
1039        Ok(keys)
1040    }
1041
1042    /// Deletes metadata for view **permanently**.
1043    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1044    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045        let keys = self.view_info_keys(view_id, view_name)?;
1046        let _ = self
1047            .kv_backend
1048            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049            .await?;
1050        Ok(())
1051    }
1052
1053    /// Renames the table name and returns an error if different metadata exists.
1054    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1055    /// and the new `TableNameKey` MUST be empty.
1056    pub async fn rename_table(
1057        &self,
1058        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059        new_table_name: String,
1060    ) -> Result<()> {
1061        let current_table_info = &current_table_info_value.table_info;
1062        let table_id = current_table_info.ident.table_id;
1063
1064        let table_name_key = TableNameKey::new(
1065            &current_table_info.catalog_name,
1066            &current_table_info.schema_name,
1067            &current_table_info.name,
1068        );
1069
1070        let new_table_name_key = TableNameKey::new(
1071            &current_table_info.catalog_name,
1072            &current_table_info.schema_name,
1073            &new_table_name,
1074        );
1075
1076        // Updates table name.
1077        let update_table_name_txn = self.table_name_manager().build_update_txn(
1078            &table_name_key,
1079            &new_table_name_key,
1080            table_id,
1081        )?;
1082
1083        let new_table_info_value = current_table_info_value
1084            .inner
1085            .with_update(move |table_info| {
1086                table_info.name = new_table_name;
1087            });
1088
1089        // Updates table info.
1090        let (update_table_info_txn, on_update_table_info_failure) = self
1091            .table_info_manager()
1092            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096        let mut r = self.kv_backend.txn(txn).await?;
1097
1098        // Checks whether metadata was already updated.
1099        if !r.succeeded {
1100            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101            let remote_table_info = on_update_table_info_failure(&mut set)?
1102                .context(error::UnexpectedSnafu {
1103                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104                })?
1105                .into_inner();
1106
1107            let op_name = "the renaming table metadata";
1108            ensure_values!(remote_table_info, new_table_info_value, op_name);
1109        }
1110
1111        Ok(())
1112    }
1113
1114    /// Updates table info and returns an error if different metadata exists.
1115    /// And cascade-ly update all redundant table options for each region
1116    /// if region_distribution is present.
1117    pub async fn update_table_info(
1118        &self,
1119        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120        region_distribution: Option<RegionDistribution>,
1121        new_table_info: RawTableInfo,
1122    ) -> Result<()> {
1123        let table_id = current_table_info_value.table_info.ident.table_id;
1124        let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126        // Updates table info.
1127        let (update_table_info_txn, on_update_table_info_failure) = self
1128            .table_info_manager()
1129            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131        let txn = if let Some(region_distribution) = region_distribution {
1132            // region options induced from table info.
1133            let new_region_options = new_table_info_value.table_info.to_region_options();
1134            let update_datanode_table_options_txn = self
1135                .datanode_table_manager
1136                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137                .await?;
1138            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139        } else {
1140            update_table_info_txn
1141        };
1142
1143        let mut r = self.kv_backend.txn(txn).await?;
1144        // Checks whether metadata was already updated.
1145        if !r.succeeded {
1146            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147            let remote_table_info = on_update_table_info_failure(&mut set)?
1148                .context(error::UnexpectedSnafu {
1149                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150                })?
1151                .into_inner();
1152
1153            let op_name = "the updating table info";
1154            ensure_values!(remote_table_info, new_table_info_value, op_name);
1155        }
1156        Ok(())
1157    }
1158
1159    /// Updates view info and returns an error if different metadata exists.
1160    /// Parameters include:
1161    /// - `view_id`: the view id
1162    /// - `current_view_info_value`: the current view info for CAS checking
1163    /// - `new_view_info`: the encoded logical plan
1164    /// - `table_names`: the resolved fully table names in logical plan
1165    /// - `columns`: the view columns
1166    /// - `plan_columns`: the original plan columns
1167    /// - `definition`: The SQL to create the view
1168    ///
1169    #[allow(clippy::too_many_arguments)]
1170    pub async fn update_view_info(
1171        &self,
1172        view_id: TableId,
1173        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174        new_view_info: Vec<u8>,
1175        table_names: HashSet<TableName>,
1176        columns: Vec<String>,
1177        plan_columns: Vec<String>,
1178        definition: String,
1179    ) -> Result<()> {
1180        let new_view_info_value = current_view_info_value.update(
1181            new_view_info,
1182            table_names,
1183            columns,
1184            plan_columns,
1185            definition,
1186        );
1187
1188        // Updates view info.
1189        let (update_view_info_txn, on_update_view_info_failure) = self
1190            .view_info_manager()
1191            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195        // Checks whether metadata was already updated.
1196        if !r.succeeded {
1197            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198            let remote_view_info = on_update_view_info_failure(&mut set)?
1199                .context(error::UnexpectedSnafu {
1200                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201                })?
1202                .into_inner();
1203
1204            let op_name = "the updating view info";
1205            ensure_values!(remote_view_info, new_view_info_value, op_name);
1206        }
1207        Ok(())
1208    }
1209
1210    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211        self.kv_backend.max_txn_ops()
1212    }
1213
1214    pub async fn batch_update_table_info_values(
1215        &self,
1216        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1217    ) -> Result<()> {
1218        let len = table_info_value_pairs.len();
1219        let mut txns = Vec::with_capacity(len);
1220        struct OnFailure<F, R>
1221        where
1222            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223        {
1224            table_info_value: TableInfoValue,
1225            on_update_table_info_failure: F,
1226        }
1227        let mut on_failures = Vec::with_capacity(len);
1228
1229        for (table_info_value, new_table_info) in table_info_value_pairs {
1230            let table_id = table_info_value.table_info.ident.table_id;
1231
1232            let new_table_info_value = table_info_value.update(new_table_info);
1233
1234            let (update_table_info_txn, on_update_table_info_failure) =
1235                self.table_info_manager().build_update_txn(
1236                    table_id,
1237                    &table_info_value,
1238                    &new_table_info_value,
1239                )?;
1240
1241            txns.push(update_table_info_txn);
1242
1243            on_failures.push(OnFailure {
1244                table_info_value: new_table_info_value,
1245                on_update_table_info_failure,
1246            });
1247        }
1248
1249        let txn = Txn::merge_all(txns);
1250        let mut r = self.kv_backend.txn(txn).await?;
1251
1252        if !r.succeeded {
1253            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254            for on_failure in on_failures {
1255                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256                    .context(error::UnexpectedSnafu {
1257                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258                    })?
1259                    .into_inner();
1260
1261                let op_name = "the batch updating table info";
1262                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268
1269    pub async fn update_table_route(
1270        &self,
1271        table_id: TableId,
1272        region_info: RegionInfo,
1273        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274        new_region_routes: Vec<RegionRoute>,
1275        new_region_options: &HashMap<String, String>,
1276        new_region_wal_options: &HashMap<RegionNumber, String>,
1277    ) -> Result<()> {
1278        // Updates the datanode table key value pairs.
1279        let current_region_distribution =
1280            region_distribution(current_table_route_value.region_routes()?);
1281        let new_region_distribution = region_distribution(&new_region_routes);
1282
1283        let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1284            table_id,
1285            &region_info.region_wal_options,
1286            new_region_wal_options,
1287        )?;
1288        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1289            table_id,
1290            region_info,
1291            current_region_distribution,
1292            new_region_distribution,
1293            new_region_options,
1294            new_region_wal_options,
1295        )?;
1296
1297        // Updates the table_route.
1298        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299        let (update_table_route_txn, on_update_table_route_failure) = self
1300            .table_route_manager()
1301            .table_route_storage()
1302            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1303
1304        let txn = Txn::merge_all(vec![
1305            update_datanode_table_txn,
1306            update_table_route_txn,
1307            update_topic_region_txn,
1308        ]);
1309
1310        let mut r = self.kv_backend.txn(txn).await?;
1311
1312        // Checks whether metadata was already updated.
1313        if !r.succeeded {
1314            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1315            let remote_table_route = on_update_table_route_failure(&mut set)?
1316                .context(error::UnexpectedSnafu {
1317                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1318                })?
1319                .into_inner();
1320
1321            let op_name = "the updating table route";
1322            ensure_values!(remote_table_route, new_table_route_value, op_name);
1323        }
1324
1325        Ok(())
1326    }
1327
1328    /// Updates the leader status of the [RegionRoute].
1329    pub async fn update_leader_region_status<F>(
1330        &self,
1331        table_id: TableId,
1332        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1333        next_region_route_status: F,
1334    ) -> Result<()>
1335    where
1336        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1337    {
1338        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1339
1340        let mut updated = 0;
1341        for route in &mut new_region_routes {
1342            if let Some(state) = next_region_route_status(route)
1343                && route.set_leader_state(state)
1344            {
1345                updated += 1;
1346            }
1347        }
1348
1349        if updated == 0 {
1350            warn!("No leader status updated");
1351            return Ok(());
1352        }
1353
1354        // Updates the table_route.
1355        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1356
1357        let (update_table_route_txn, on_update_table_route_failure) = self
1358            .table_route_manager()
1359            .table_route_storage()
1360            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1361
1362        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1363
1364        // Checks whether metadata was already updated.
1365        if !r.succeeded {
1366            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1367            let remote_table_route = on_update_table_route_failure(&mut set)?
1368                .context(error::UnexpectedSnafu {
1369                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1370                })?
1371                .into_inner();
1372
1373            let op_name = "the updating leader region status";
1374            ensure_values!(remote_table_route, new_table_route_value, op_name);
1375        }
1376
1377        Ok(())
1378    }
1379}
1380
1381#[macro_export]
1382macro_rules! impl_metadata_value {
1383    ($($val_ty: ty), *) => {
1384        $(
1385            impl $crate::key::MetadataValue for $val_ty {
1386                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1387                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1388                }
1389
1390                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1391                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1392                }
1393            }
1394        )*
1395    }
1396}
1397
1398macro_rules! impl_metadata_key_get_txn_op {
1399    ($($key: ty), *) => {
1400        $(
1401            impl $crate::key::MetadataKeyGetTxnOp for $key {
1402                /// Returns a [TxnOp] to retrieve the corresponding value
1403                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1404                fn build_get_op(
1405                    &self,
1406                ) -> (
1407                    TxnOp,
1408                    impl for<'a> FnMut(
1409                        &'a mut TxnOpGetResponseSet,
1410                    ) -> Option<Vec<u8>>,
1411                ) {
1412                    let raw_key = self.to_bytes();
1413                    (
1414                        TxnOp::Get(raw_key.clone()),
1415                        TxnOpGetResponseSet::filter(raw_key),
1416                    )
1417                }
1418            }
1419        )*
1420    }
1421}
1422
1423impl_metadata_key_get_txn_op! {
1424    TableNameKey<'_>,
1425    TableInfoKey,
1426    ViewInfoKey,
1427    TableRouteKey,
1428    DatanodeTableKey
1429}
1430
1431#[macro_export]
1432macro_rules! impl_optional_metadata_value {
1433    ($($val_ty: ty), *) => {
1434        $(
1435            impl $val_ty {
1436                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1437                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1438                }
1439
1440                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1441                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1442                }
1443            }
1444        )*
1445    }
1446}
1447
1448impl_metadata_value! {
1449    TableNameValue,
1450    TableInfoValue,
1451    ViewInfoValue,
1452    DatanodeTableValue,
1453    FlowInfoValue,
1454    FlowNameValue,
1455    FlowRouteValue,
1456    TableFlowValue,
1457    NodeAddressValue,
1458    SchemaNameValue,
1459    FlowStateValue,
1460    PoisonValue,
1461    TopicRegionValue
1462}
1463
1464impl_optional_metadata_value! {
1465    CatalogNameValue,
1466    SchemaNameValue
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471    use std::collections::{BTreeMap, HashMap, HashSet};
1472    use std::sync::Arc;
1473
1474    use bytes::Bytes;
1475    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1476    use common_time::util::current_time_millis;
1477    use common_wal::options::{KafkaWalOptions, WalOptions};
1478    use futures::TryStreamExt;
1479    use store_api::storage::{RegionId, RegionNumber};
1480    use table::metadata::{RawTableInfo, TableInfo};
1481    use table::table_name::TableName;
1482
1483    use super::datanode_table::DatanodeTableKey;
1484    use super::test_utils;
1485    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1486    use crate::ddl::test_util::create_table::test_create_table_task;
1487    use crate::ddl::utils::region_storage_path;
1488    use crate::error::Result;
1489    use crate::key::datanode_table::RegionInfo;
1490    use crate::key::table_info::TableInfoValue;
1491    use crate::key::table_name::TableNameKey;
1492    use crate::key::table_route::TableRouteValue;
1493    use crate::key::topic_region::TopicRegionKey;
1494    use crate::key::{
1495        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1496        TableMetadataManager, ViewInfoValue,
1497    };
1498    use crate::kv_backend::KvBackend;
1499    use crate::kv_backend::memory::MemoryKvBackend;
1500    use crate::peer::Peer;
1501    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1502    use crate::rpc::store::RangeRequest;
1503    use crate::wal_provider::WalProvider;
1504
1505    #[test]
1506    fn test_deserialized_value_with_bytes() {
1507        let region_route = new_test_region_route();
1508        let region_routes = vec![region_route.clone()];
1509
1510        let expected_region_routes =
1511            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1512        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1513
1514        // Serialize behaviors:
1515        // The inner field will be ignored.
1516        let value = DeserializedValueWithBytes {
1517            // ignored
1518            inner: TableRouteValue::physical(region_routes.clone()),
1519            bytes: Bytes::from(expected.clone()),
1520        };
1521
1522        let encoded = serde_json::to_vec(&value).unwrap();
1523
1524        // Deserialize behaviors:
1525        // The inner field will be deserialized from the bytes field.
1526        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1527            serde_json::from_slice(&encoded).unwrap();
1528
1529        assert_eq!(decoded.inner, expected_region_routes);
1530        assert_eq!(decoded.bytes, expected);
1531    }
1532
1533    fn new_test_region_route() -> RegionRoute {
1534        new_region_route(1, 2)
1535    }
1536
1537    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1538        RegionRoute {
1539            region: Region {
1540                id: region_id.into(),
1541                name: "r1".to_string(),
1542                partition: None,
1543                attrs: BTreeMap::new(),
1544                partition_expr: Default::default(),
1545            },
1546            leader_peer: Some(Peer::new(datanode, "a2")),
1547            follower_peers: vec![],
1548            leader_state: None,
1549            leader_down_since: None,
1550        }
1551    }
1552
1553    fn new_test_table_info() -> TableInfo {
1554        test_utils::new_test_table_info(10)
1555    }
1556
1557    fn new_test_table_names() -> HashSet<TableName> {
1558        let mut set = HashSet::new();
1559        set.insert(TableName {
1560            catalog_name: "greptime".to_string(),
1561            schema_name: "public".to_string(),
1562            table_name: "a_table".to_string(),
1563        });
1564        set.insert(TableName {
1565            catalog_name: "greptime".to_string(),
1566            schema_name: "public".to_string(),
1567            table_name: "b_table".to_string(),
1568        });
1569        set
1570    }
1571
1572    async fn create_physical_table_metadata(
1573        table_metadata_manager: &TableMetadataManager,
1574        table_info: RawTableInfo,
1575        region_routes: Vec<RegionRoute>,
1576        region_wal_options: HashMap<RegionNumber, String>,
1577    ) -> Result<()> {
1578        table_metadata_manager
1579            .create_table_metadata(
1580                table_info,
1581                TableRouteValue::physical(region_routes),
1582                region_wal_options,
1583            )
1584            .await
1585    }
1586
1587    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1588        let topics = (0..2)
1589            .map(|i| format!("greptimedb_topic{}", i))
1590            .collect::<Vec<_>>();
1591        let wal_options = topics
1592            .iter()
1593            .map(|topic| {
1594                WalOptions::Kafka(KafkaWalOptions {
1595                    topic: topic.clone(),
1596                })
1597            })
1598            .collect::<Vec<_>>();
1599
1600        (0..16)
1601            .enumerate()
1602            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1603            .collect()
1604    }
1605
1606    #[tokio::test]
1607    async fn test_raft_engine_topic_region_map() {
1608        let mem_kv = Arc::new(MemoryKvBackend::default());
1609        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1610        let region_route = new_test_region_route();
1611        let region_routes = &vec![region_route.clone()];
1612        let table_info: RawTableInfo = new_test_table_info().into();
1613        let wal_provider = WalProvider::RaftEngine;
1614        let regions: Vec<_> = (0..16).collect();
1615        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1616        create_physical_table_metadata(
1617            &table_metadata_manager,
1618            table_info.clone(),
1619            region_routes.clone(),
1620            region_wal_options.clone(),
1621        )
1622        .await
1623        .unwrap();
1624
1625        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1626        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1627        let resp = mem_kv.range(range_req).await.unwrap();
1628        // Should be empty because the topic region map is empty for raft engine.
1629        assert!(resp.kvs.is_empty());
1630    }
1631
1632    #[tokio::test]
1633    async fn test_create_table_metadata() {
1634        let mem_kv = Arc::new(MemoryKvBackend::default());
1635        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1636        let region_route = new_test_region_route();
1637        let region_routes = &vec![region_route.clone()];
1638        let table_info: RawTableInfo = new_test_table_info().into();
1639        let region_wal_options = create_mock_region_wal_options()
1640            .into_iter()
1641            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1642            .collect::<HashMap<_, _>>();
1643
1644        // creates metadata.
1645        create_physical_table_metadata(
1646            &table_metadata_manager,
1647            table_info.clone(),
1648            region_routes.clone(),
1649            region_wal_options.clone(),
1650        )
1651        .await
1652        .unwrap();
1653
1654        // if metadata was already created, it should be ok.
1655        assert!(
1656            create_physical_table_metadata(
1657                &table_metadata_manager,
1658                table_info.clone(),
1659                region_routes.clone(),
1660                region_wal_options.clone(),
1661            )
1662            .await
1663            .is_ok()
1664        );
1665
1666        let mut modified_region_routes = region_routes.clone();
1667        modified_region_routes.push(region_route.clone());
1668        // if remote metadata was exists, it should return an error.
1669        assert!(
1670            create_physical_table_metadata(
1671                &table_metadata_manager,
1672                table_info.clone(),
1673                modified_region_routes,
1674                region_wal_options.clone(),
1675            )
1676            .await
1677            .is_err()
1678        );
1679
1680        let (remote_table_info, remote_table_route) = table_metadata_manager
1681            .get_full_table_info(10)
1682            .await
1683            .unwrap();
1684
1685        assert_eq!(
1686            remote_table_info.unwrap().into_inner().table_info,
1687            table_info
1688        );
1689        assert_eq!(
1690            remote_table_route
1691                .unwrap()
1692                .into_inner()
1693                .region_routes()
1694                .unwrap(),
1695            region_routes
1696        );
1697
1698        for i in 0..2 {
1699            let region_number = i as u32;
1700            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1701            let topic = format!("greptimedb_topic{}", i);
1702            let regions = table_metadata_manager
1703                .topic_region_manager
1704                .regions(&topic)
1705                .await
1706                .unwrap()
1707                .into_keys()
1708                .collect::<Vec<_>>();
1709            assert_eq!(regions.len(), 8);
1710            assert!(regions.contains(&region_id));
1711        }
1712    }
1713
1714    #[tokio::test]
1715    async fn test_create_logic_tables_metadata() {
1716        let mem_kv = Arc::new(MemoryKvBackend::default());
1717        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1718        let region_route = new_test_region_route();
1719        let region_routes = vec![region_route.clone()];
1720        let table_info: RawTableInfo = new_test_table_info().into();
1721        let table_id = table_info.ident.table_id;
1722        let table_route_value = TableRouteValue::physical(region_routes.clone());
1723
1724        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1725        // creates metadata.
1726        table_metadata_manager
1727            .create_logical_tables_metadata(tables_data.clone())
1728            .await
1729            .unwrap();
1730
1731        // if metadata was already created, it should be ok.
1732        assert!(
1733            table_metadata_manager
1734                .create_logical_tables_metadata(tables_data)
1735                .await
1736                .is_ok()
1737        );
1738
1739        let mut modified_region_routes = region_routes.clone();
1740        modified_region_routes.push(new_region_route(2, 3));
1741        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1742        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1743        // if remote metadata was exists, it should return an error.
1744        assert!(
1745            table_metadata_manager
1746                .create_logical_tables_metadata(modified_tables_data)
1747                .await
1748                .is_err()
1749        );
1750
1751        let (remote_table_info, remote_table_route) = table_metadata_manager
1752            .get_full_table_info(table_id)
1753            .await
1754            .unwrap();
1755
1756        assert_eq!(
1757            remote_table_info.unwrap().into_inner().table_info,
1758            table_info
1759        );
1760        assert_eq!(
1761            remote_table_route
1762                .unwrap()
1763                .into_inner()
1764                .region_routes()
1765                .unwrap(),
1766            &region_routes
1767        );
1768    }
1769
1770    #[tokio::test]
1771    async fn test_create_many_logical_tables_metadata() {
1772        let kv_backend = Arc::new(MemoryKvBackend::default());
1773        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1774
1775        let mut tables_data = vec![];
1776        for i in 0..128 {
1777            let table_id = i + 1;
1778            let regin_number = table_id * 3;
1779            let region_id = RegionId::new(table_id, regin_number);
1780            let region_route = new_region_route(region_id.as_u64(), 2);
1781            let region_routes = vec![region_route.clone()];
1782            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1783                table_id,
1784                &format!("my_table_{}", table_id),
1785            )
1786            .into();
1787            let table_route_value = TableRouteValue::physical(region_routes.clone());
1788
1789            tables_data.push((table_info, table_route_value));
1790        }
1791
1792        // creates metadata.
1793        table_metadata_manager
1794            .create_logical_tables_metadata(tables_data)
1795            .await
1796            .unwrap();
1797    }
1798
1799    #[tokio::test]
1800    async fn test_delete_table_metadata() {
1801        let mem_kv = Arc::new(MemoryKvBackend::default());
1802        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1803        let region_route = new_test_region_route();
1804        let region_routes = &vec![region_route.clone()];
1805        let table_info: RawTableInfo = new_test_table_info().into();
1806        let table_id = table_info.ident.table_id;
1807        let datanode_id = 2;
1808        let region_wal_options = create_mock_region_wal_options();
1809        let serialized_region_wal_options = region_wal_options
1810            .iter()
1811            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1812            .collect::<HashMap<_, _>>();
1813
1814        // creates metadata.
1815        create_physical_table_metadata(
1816            &table_metadata_manager,
1817            table_info.clone(),
1818            region_routes.clone(),
1819            serialized_region_wal_options,
1820        )
1821        .await
1822        .unwrap();
1823
1824        let table_name = TableName::new(
1825            table_info.catalog_name,
1826            table_info.schema_name,
1827            table_info.name,
1828        );
1829        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1830        // deletes metadata.
1831        table_metadata_manager
1832            .delete_table_metadata(
1833                table_id,
1834                &table_name,
1835                table_route_value,
1836                &region_wal_options,
1837            )
1838            .await
1839            .unwrap();
1840        // Should be ignored.
1841        table_metadata_manager
1842            .delete_table_metadata(
1843                table_id,
1844                &table_name,
1845                table_route_value,
1846                &region_wal_options,
1847            )
1848            .await
1849            .unwrap();
1850        assert!(
1851            table_metadata_manager
1852                .table_info_manager()
1853                .get(table_id)
1854                .await
1855                .unwrap()
1856                .is_none()
1857        );
1858        assert!(
1859            table_metadata_manager
1860                .table_route_manager()
1861                .table_route_storage()
1862                .get(table_id)
1863                .await
1864                .unwrap()
1865                .is_none()
1866        );
1867        assert!(
1868            table_metadata_manager
1869                .datanode_table_manager()
1870                .tables(datanode_id)
1871                .try_collect::<Vec<_>>()
1872                .await
1873                .unwrap()
1874                .is_empty()
1875        );
1876        // Checks removed values
1877        let table_info = table_metadata_manager
1878            .table_info_manager()
1879            .get(table_id)
1880            .await
1881            .unwrap();
1882        assert!(table_info.is_none());
1883        let table_route = table_metadata_manager
1884            .table_route_manager()
1885            .table_route_storage()
1886            .get(table_id)
1887            .await
1888            .unwrap();
1889        assert!(table_route.is_none());
1890        // Logical delete removes the topic region mapping as well.
1891        let regions = table_metadata_manager
1892            .topic_region_manager
1893            .regions("greptimedb_topic0")
1894            .await
1895            .unwrap();
1896        assert_eq!(regions.len(), 0);
1897        let regions = table_metadata_manager
1898            .topic_region_manager
1899            .regions("greptimedb_topic1")
1900            .await
1901            .unwrap();
1902        assert_eq!(regions.len(), 0);
1903    }
1904
1905    #[tokio::test]
1906    async fn test_rename_table() {
1907        let mem_kv = Arc::new(MemoryKvBackend::default());
1908        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1909        let region_route = new_test_region_route();
1910        let region_routes = vec![region_route.clone()];
1911        let table_info: RawTableInfo = new_test_table_info().into();
1912        let table_id = table_info.ident.table_id;
1913        // creates metadata.
1914        create_physical_table_metadata(
1915            &table_metadata_manager,
1916            table_info.clone(),
1917            region_routes.clone(),
1918            HashMap::new(),
1919        )
1920        .await
1921        .unwrap();
1922
1923        let new_table_name = "another_name".to_string();
1924        let table_info_value =
1925            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1926
1927        table_metadata_manager
1928            .rename_table(&table_info_value, new_table_name.clone())
1929            .await
1930            .unwrap();
1931        // if remote metadata was updated, it should be ok.
1932        table_metadata_manager
1933            .rename_table(&table_info_value, new_table_name.clone())
1934            .await
1935            .unwrap();
1936        let mut modified_table_info = table_info.clone();
1937        modified_table_info.name = "hi".to_string();
1938        let modified_table_info_value =
1939            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1940        // if the table_info_value is wrong, it should return an error.
1941        // The ABA problem.
1942        assert!(
1943            table_metadata_manager
1944                .rename_table(&modified_table_info_value, new_table_name.clone())
1945                .await
1946                .is_err()
1947        );
1948
1949        let old_table_name = TableNameKey::new(
1950            &table_info.catalog_name,
1951            &table_info.schema_name,
1952            &table_info.name,
1953        );
1954        let new_table_name = TableNameKey::new(
1955            &table_info.catalog_name,
1956            &table_info.schema_name,
1957            &new_table_name,
1958        );
1959
1960        assert!(
1961            table_metadata_manager
1962                .table_name_manager()
1963                .get(old_table_name)
1964                .await
1965                .unwrap()
1966                .is_none()
1967        );
1968
1969        assert_eq!(
1970            table_metadata_manager
1971                .table_name_manager()
1972                .get(new_table_name)
1973                .await
1974                .unwrap()
1975                .unwrap()
1976                .table_id(),
1977            table_id
1978        );
1979    }
1980
1981    #[tokio::test]
1982    async fn test_update_table_info() {
1983        let mem_kv = Arc::new(MemoryKvBackend::default());
1984        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1985        let region_route = new_test_region_route();
1986        let region_routes = vec![region_route.clone()];
1987        let table_info: RawTableInfo = new_test_table_info().into();
1988        let table_id = table_info.ident.table_id;
1989        // creates metadata.
1990        create_physical_table_metadata(
1991            &table_metadata_manager,
1992            table_info.clone(),
1993            region_routes.clone(),
1994            HashMap::new(),
1995        )
1996        .await
1997        .unwrap();
1998
1999        let mut new_table_info = table_info.clone();
2000        new_table_info.name = "hi".to_string();
2001        let current_table_info_value =
2002            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2003        // should be ok.
2004        table_metadata_manager
2005            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2006            .await
2007            .unwrap();
2008        // if table info was updated, it should be ok.
2009        table_metadata_manager
2010            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2011            .await
2012            .unwrap();
2013
2014        // updated table_info should equal the `new_table_info`
2015        let updated_table_info = table_metadata_manager
2016            .table_info_manager()
2017            .get(table_id)
2018            .await
2019            .unwrap()
2020            .unwrap()
2021            .into_inner();
2022        assert_eq!(updated_table_info.table_info, new_table_info);
2023
2024        let mut wrong_table_info = table_info.clone();
2025        wrong_table_info.name = "wrong".to_string();
2026        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2027            current_table_info_value.update(wrong_table_info),
2028        );
2029        // if the current_table_info_value is wrong, it should return an error.
2030        // The ABA problem.
2031        assert!(
2032            table_metadata_manager
2033                .update_table_info(&wrong_table_info_value, None, new_table_info)
2034                .await
2035                .is_err()
2036        )
2037    }
2038
2039    #[tokio::test]
2040    async fn test_update_table_leader_region_status() {
2041        let mem_kv = Arc::new(MemoryKvBackend::default());
2042        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2043        let datanode = 1;
2044        let region_routes = vec![
2045            RegionRoute {
2046                region: Region {
2047                    id: 1.into(),
2048                    name: "r1".to_string(),
2049                    partition: None,
2050                    attrs: BTreeMap::new(),
2051                    partition_expr: Default::default(),
2052                },
2053                leader_peer: Some(Peer::new(datanode, "a2")),
2054                leader_state: Some(LeaderState::Downgrading),
2055                follower_peers: vec![],
2056                leader_down_since: Some(current_time_millis()),
2057            },
2058            RegionRoute {
2059                region: Region {
2060                    id: 2.into(),
2061                    name: "r2".to_string(),
2062                    partition: None,
2063                    attrs: BTreeMap::new(),
2064                    partition_expr: Default::default(),
2065                },
2066                leader_peer: Some(Peer::new(datanode, "a1")),
2067                leader_state: None,
2068                follower_peers: vec![],
2069                leader_down_since: None,
2070            },
2071        ];
2072        let table_info: RawTableInfo = new_test_table_info().into();
2073        let table_id = table_info.ident.table_id;
2074        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2075            TableRouteValue::physical(region_routes.clone()),
2076        );
2077
2078        // creates metadata.
2079        create_physical_table_metadata(
2080            &table_metadata_manager,
2081            table_info.clone(),
2082            region_routes.clone(),
2083            HashMap::new(),
2084        )
2085        .await
2086        .unwrap();
2087
2088        table_metadata_manager
2089            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2090                if region_route.leader_state.is_some() {
2091                    None
2092                } else {
2093                    Some(Some(LeaderState::Downgrading))
2094                }
2095            })
2096            .await
2097            .unwrap();
2098
2099        let updated_route_value = table_metadata_manager
2100            .table_route_manager()
2101            .table_route_storage()
2102            .get(table_id)
2103            .await
2104            .unwrap()
2105            .unwrap();
2106
2107        assert_eq!(
2108            updated_route_value.region_routes().unwrap()[0].leader_state,
2109            Some(LeaderState::Downgrading)
2110        );
2111
2112        assert!(
2113            updated_route_value.region_routes().unwrap()[0]
2114                .leader_down_since
2115                .is_some()
2116        );
2117
2118        assert_eq!(
2119            updated_route_value.region_routes().unwrap()[1].leader_state,
2120            Some(LeaderState::Downgrading)
2121        );
2122        assert!(
2123            updated_route_value.region_routes().unwrap()[1]
2124                .leader_down_since
2125                .is_some()
2126        );
2127    }
2128
2129    async fn assert_datanode_table(
2130        table_metadata_manager: &TableMetadataManager,
2131        table_id: u32,
2132        region_routes: &[RegionRoute],
2133    ) {
2134        let region_distribution = region_distribution(region_routes);
2135        for (datanode, regions) in region_distribution {
2136            let got = table_metadata_manager
2137                .datanode_table_manager()
2138                .get(&DatanodeTableKey::new(datanode, table_id))
2139                .await
2140                .unwrap()
2141                .unwrap();
2142
2143            assert_eq!(got.regions, regions.leader_regions);
2144            assert_eq!(got.follower_regions, regions.follower_regions);
2145        }
2146    }
2147
2148    #[tokio::test]
2149    async fn test_update_table_route() {
2150        let mem_kv = Arc::new(MemoryKvBackend::default());
2151        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2152        let region_route = new_test_region_route();
2153        let region_routes = vec![region_route.clone()];
2154        let table_info: RawTableInfo = new_test_table_info().into();
2155        let table_id = table_info.ident.table_id;
2156        let engine = table_info.meta.engine.as_str();
2157        let region_storage_path =
2158            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2159        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2160            TableRouteValue::physical(region_routes.clone()),
2161        );
2162
2163        // creates metadata.
2164        create_physical_table_metadata(
2165            &table_metadata_manager,
2166            table_info.clone(),
2167            region_routes.clone(),
2168            HashMap::new(),
2169        )
2170        .await
2171        .unwrap();
2172
2173        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2174        let new_region_routes = vec![
2175            new_region_route(1, 1),
2176            new_region_route(2, 2),
2177            new_region_route(3, 3),
2178        ];
2179        // it should be ok.
2180        table_metadata_manager
2181            .update_table_route(
2182                table_id,
2183                RegionInfo {
2184                    engine: engine.to_string(),
2185                    region_storage_path: region_storage_path.clone(),
2186                    region_options: HashMap::new(),
2187                    region_wal_options: HashMap::new(),
2188                },
2189                &current_table_route_value,
2190                new_region_routes.clone(),
2191                &HashMap::new(),
2192                &HashMap::new(),
2193            )
2194            .await
2195            .unwrap();
2196        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2197
2198        // if the table route was updated. it should be ok.
2199        table_metadata_manager
2200            .update_table_route(
2201                table_id,
2202                RegionInfo {
2203                    engine: engine.to_string(),
2204                    region_storage_path: region_storage_path.clone(),
2205                    region_options: HashMap::new(),
2206                    region_wal_options: HashMap::new(),
2207                },
2208                &current_table_route_value,
2209                new_region_routes.clone(),
2210                &HashMap::new(),
2211                &HashMap::new(),
2212            )
2213            .await
2214            .unwrap();
2215
2216        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2217            current_table_route_value
2218                .inner
2219                .update(new_region_routes.clone())
2220                .unwrap(),
2221        );
2222        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2223        // it should be ok.
2224        table_metadata_manager
2225            .update_table_route(
2226                table_id,
2227                RegionInfo {
2228                    engine: engine.to_string(),
2229                    region_storage_path: region_storage_path.clone(),
2230                    region_options: HashMap::new(),
2231                    region_wal_options: HashMap::new(),
2232                },
2233                &current_table_route_value,
2234                new_region_routes.clone(),
2235                &HashMap::new(),
2236                &HashMap::new(),
2237            )
2238            .await
2239            .unwrap();
2240        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2241
2242        // if the current_table_route_value is wrong, it should return an error.
2243        // The ABA problem.
2244        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2245            current_table_route_value
2246                .update(vec![
2247                    new_region_route(1, 1),
2248                    new_region_route(2, 2),
2249                    new_region_route(3, 3),
2250                    new_region_route(4, 4),
2251                ])
2252                .unwrap(),
2253        );
2254        assert!(
2255            table_metadata_manager
2256                .update_table_route(
2257                    table_id,
2258                    RegionInfo {
2259                        engine: engine.to_string(),
2260                        region_storage_path: region_storage_path.clone(),
2261                        region_options: HashMap::new(),
2262                        region_wal_options: HashMap::new(),
2263                    },
2264                    &wrong_table_route_value,
2265                    new_region_routes,
2266                    &HashMap::new(),
2267                    &HashMap::new(),
2268                )
2269                .await
2270                .is_err()
2271        );
2272    }
2273
2274    #[tokio::test]
2275    async fn test_update_table_route_with_topic_region_mapping() {
2276        let mem_kv = Arc::new(MemoryKvBackend::default());
2277        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2278        let region_route = new_test_region_route();
2279        let region_routes = vec![region_route.clone()];
2280        let table_info: RawTableInfo = new_test_table_info().into();
2281        let table_id = table_info.ident.table_id;
2282        let engine = table_info.meta.engine.as_str();
2283        let region_storage_path =
2284            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2285
2286        // Create initial metadata with Kafka WAL options
2287        let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2288            (
2289                1,
2290                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2291                    topic: "topic_1".to_string(),
2292                }))
2293                .unwrap(),
2294            ),
2295            (
2296                2,
2297                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2298                    topic: "topic_2".to_string(),
2299                }))
2300                .unwrap(),
2301            ),
2302        ]
2303        .into_iter()
2304        .collect();
2305
2306        create_physical_table_metadata(
2307            &table_metadata_manager,
2308            table_info.clone(),
2309            region_routes.clone(),
2310            old_region_wal_options.clone(),
2311        )
2312        .await
2313        .unwrap();
2314
2315        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2316            TableRouteValue::physical(region_routes.clone()),
2317        );
2318
2319        // Verify initial topic region mappings exist
2320        let region_id_1 = RegionId::new(table_id, 1);
2321        let region_id_2 = RegionId::new(table_id, 2);
2322        let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2323        let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2324        assert!(
2325            table_metadata_manager
2326                .topic_region_manager
2327                .get(topic_1_key.clone())
2328                .await
2329                .unwrap()
2330                .is_some()
2331        );
2332        assert!(
2333            table_metadata_manager
2334                .topic_region_manager
2335                .get(topic_2_key.clone())
2336                .await
2337                .unwrap()
2338                .is_some()
2339        );
2340
2341        // Test 1: Add new region with new topic
2342        let new_region_routes = vec![
2343            new_region_route(1, 1),
2344            new_region_route(2, 2),
2345            new_region_route(3, 3), // New region
2346        ];
2347        let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2348            (
2349                1,
2350                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2351                    topic: "topic_1".to_string(), // Unchanged
2352                }))
2353                .unwrap(),
2354            ),
2355            (
2356                2,
2357                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2358                    topic: "topic_2".to_string(), // Unchanged
2359                }))
2360                .unwrap(),
2361            ),
2362            (
2363                3,
2364                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2365                    topic: "topic_3".to_string(), // New topic
2366                }))
2367                .unwrap(),
2368            ),
2369        ]
2370        .into_iter()
2371        .collect();
2372        let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2373            current_table_route_value
2374                .inner
2375                .update(new_region_routes.clone())
2376                .unwrap(),
2377        );
2378        table_metadata_manager
2379            .update_table_route(
2380                table_id,
2381                RegionInfo {
2382                    engine: engine.to_string(),
2383                    region_storage_path: region_storage_path.clone(),
2384                    region_options: HashMap::new(),
2385                    region_wal_options: old_region_wal_options.clone(),
2386                },
2387                &current_table_route_value,
2388                new_region_routes.clone(),
2389                &HashMap::new(),
2390                &new_region_wal_options,
2391            )
2392            .await
2393            .unwrap();
2394        // Verify new topic region mapping was created
2395        let region_id_3 = RegionId::new(table_id, 3);
2396        let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2397        assert!(
2398            table_metadata_manager
2399                .topic_region_manager
2400                .get(topic_3_key)
2401                .await
2402                .unwrap()
2403                .is_some()
2404        );
2405        // Test 2: Remove a region and change topic for another
2406        let newer_region_routes = vec![
2407            new_region_route(1, 1),
2408            // Region 2 removed
2409            // Region 3 now has different topic
2410        ];
2411        let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2412            (
2413                1,
2414                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2415                    topic: "topic_1".to_string(), // Unchanged
2416                }))
2417                .unwrap(),
2418            ),
2419            (
2420                3,
2421                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2422                    topic: "topic_3_new".to_string(), // Changed topic
2423                }))
2424                .unwrap(),
2425            ),
2426        ]
2427        .into_iter()
2428        .collect();
2429        table_metadata_manager
2430            .update_table_route(
2431                table_id,
2432                RegionInfo {
2433                    engine: engine.to_string(),
2434                    region_storage_path: region_storage_path.clone(),
2435                    region_options: HashMap::new(),
2436                    region_wal_options: new_region_wal_options.clone(),
2437                },
2438                &current_table_route_value_updated,
2439                newer_region_routes.clone(),
2440                &HashMap::new(),
2441                &newer_region_wal_options,
2442            )
2443            .await
2444            .unwrap();
2445        // Verify region 2 mapping was deleted
2446        let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2447        assert!(
2448            table_metadata_manager
2449                .topic_region_manager
2450                .get(topic_2_key_new)
2451                .await
2452                .unwrap()
2453                .is_none()
2454        );
2455        // Verify region 3 old topic mapping was deleted
2456        let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2457        assert!(
2458            table_metadata_manager
2459                .topic_region_manager
2460                .get(topic_3_key_old)
2461                .await
2462                .unwrap()
2463                .is_none()
2464        );
2465        // Verify region 3 new topic mapping was created
2466        let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2467        assert!(
2468            table_metadata_manager
2469                .topic_region_manager
2470                .get(topic_3_key_new)
2471                .await
2472                .unwrap()
2473                .is_some()
2474        );
2475        // Verify region 1 mapping still exists (unchanged)
2476        assert!(
2477            table_metadata_manager
2478                .topic_region_manager
2479                .get(topic_1_key)
2480                .await
2481                .unwrap()
2482                .is_some()
2483        );
2484    }
2485
2486    #[tokio::test]
2487    async fn test_destroy_table_metadata() {
2488        let mem_kv = Arc::new(MemoryKvBackend::default());
2489        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2490        let table_id = 1025;
2491        let table_name = "foo";
2492        let task = test_create_table_task(table_name, table_id);
2493        let options = create_mock_region_wal_options();
2494        let serialized_options = options
2495            .iter()
2496            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2497            .collect::<HashMap<_, _>>();
2498        table_metadata_manager
2499            .create_table_metadata(
2500                task.table_info,
2501                TableRouteValue::physical(vec![
2502                    RegionRoute {
2503                        region: Region::new_test(RegionId::new(table_id, 1)),
2504                        leader_peer: Some(Peer::empty(1)),
2505                        follower_peers: vec![Peer::empty(5)],
2506                        leader_state: None,
2507                        leader_down_since: None,
2508                    },
2509                    RegionRoute {
2510                        region: Region::new_test(RegionId::new(table_id, 2)),
2511                        leader_peer: Some(Peer::empty(2)),
2512                        follower_peers: vec![Peer::empty(4)],
2513                        leader_state: None,
2514                        leader_down_since: None,
2515                    },
2516                    RegionRoute {
2517                        region: Region::new_test(RegionId::new(table_id, 3)),
2518                        leader_peer: Some(Peer::empty(3)),
2519                        follower_peers: vec![],
2520                        leader_state: None,
2521                        leader_down_since: None,
2522                    },
2523                ]),
2524                serialized_options,
2525            )
2526            .await
2527            .unwrap();
2528        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2529        let table_route_value = table_metadata_manager
2530            .table_route_manager
2531            .table_route_storage()
2532            .get_with_raw_bytes(table_id)
2533            .await
2534            .unwrap()
2535            .unwrap();
2536        table_metadata_manager
2537            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2538            .await
2539            .unwrap();
2540        assert!(mem_kv.is_empty());
2541    }
2542
2543    #[tokio::test]
2544    async fn test_restore_table_metadata() {
2545        let mem_kv = Arc::new(MemoryKvBackend::default());
2546        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2547        let table_id = 1025;
2548        let table_name = "foo";
2549        let task = test_create_table_task(table_name, table_id);
2550        let options = create_mock_region_wal_options();
2551        let serialized_options = options
2552            .iter()
2553            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2554            .collect::<HashMap<_, _>>();
2555        table_metadata_manager
2556            .create_table_metadata(
2557                task.table_info,
2558                TableRouteValue::physical(vec![
2559                    RegionRoute {
2560                        region: Region::new_test(RegionId::new(table_id, 1)),
2561                        leader_peer: Some(Peer::empty(1)),
2562                        follower_peers: vec![Peer::empty(5)],
2563                        leader_state: None,
2564                        leader_down_since: None,
2565                    },
2566                    RegionRoute {
2567                        region: Region::new_test(RegionId::new(table_id, 2)),
2568                        leader_peer: Some(Peer::empty(2)),
2569                        follower_peers: vec![Peer::empty(4)],
2570                        leader_state: None,
2571                        leader_down_since: None,
2572                    },
2573                    RegionRoute {
2574                        region: Region::new_test(RegionId::new(table_id, 3)),
2575                        leader_peer: Some(Peer::empty(3)),
2576                        follower_peers: vec![],
2577                        leader_state: None,
2578                        leader_down_since: None,
2579                    },
2580                ]),
2581                serialized_options,
2582            )
2583            .await
2584            .unwrap();
2585        let expected_result = mem_kv.dump();
2586        let table_route_value = table_metadata_manager
2587            .table_route_manager
2588            .table_route_storage()
2589            .get_with_raw_bytes(table_id)
2590            .await
2591            .unwrap()
2592            .unwrap();
2593        let region_routes = table_route_value.region_routes().unwrap();
2594        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2595        let table_route_value = TableRouteValue::physical(region_routes.clone());
2596        table_metadata_manager
2597            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2598            .await
2599            .unwrap();
2600        table_metadata_manager
2601            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2602            .await
2603            .unwrap();
2604        let kvs = mem_kv.dump();
2605        assert_eq!(kvs, expected_result);
2606        // Should be ignored.
2607        table_metadata_manager
2608            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2609            .await
2610            .unwrap();
2611        let kvs = mem_kv.dump();
2612        assert_eq!(kvs, expected_result);
2613    }
2614
2615    #[tokio::test]
2616    async fn test_create_update_view_info() {
2617        let mem_kv = Arc::new(MemoryKvBackend::default());
2618        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2619
2620        let view_info: RawTableInfo = new_test_table_info().into();
2621
2622        let view_id = view_info.ident.table_id;
2623
2624        let logical_plan: Vec<u8> = vec![1, 2, 3];
2625        let columns = vec!["a".to_string()];
2626        let plan_columns = vec!["number".to_string()];
2627        let table_names = new_test_table_names();
2628        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2629
2630        // Create metadata
2631        table_metadata_manager
2632            .create_view_metadata(
2633                view_info.clone(),
2634                logical_plan.clone(),
2635                table_names.clone(),
2636                columns.clone(),
2637                plan_columns.clone(),
2638                definition.to_string(),
2639            )
2640            .await
2641            .unwrap();
2642
2643        {
2644            // assert view info
2645            let current_view_info = table_metadata_manager
2646                .view_info_manager()
2647                .get(view_id)
2648                .await
2649                .unwrap()
2650                .unwrap()
2651                .into_inner();
2652            assert_eq!(current_view_info.view_info, logical_plan);
2653            assert_eq!(current_view_info.table_names, table_names);
2654            assert_eq!(current_view_info.definition, definition);
2655            assert_eq!(current_view_info.columns, columns);
2656            assert_eq!(current_view_info.plan_columns, plan_columns);
2657            // assert table info
2658            let current_table_info = table_metadata_manager
2659                .table_info_manager()
2660                .get(view_id)
2661                .await
2662                .unwrap()
2663                .unwrap()
2664                .into_inner();
2665            assert_eq!(current_table_info.table_info, view_info);
2666        }
2667
2668        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2669        let new_table_names = {
2670            let mut set = HashSet::new();
2671            set.insert(TableName {
2672                catalog_name: "greptime".to_string(),
2673                schema_name: "public".to_string(),
2674                table_name: "b_table".to_string(),
2675            });
2676            set.insert(TableName {
2677                catalog_name: "greptime".to_string(),
2678                schema_name: "public".to_string(),
2679                table_name: "c_table".to_string(),
2680            });
2681            set
2682        };
2683        let new_columns = vec!["b".to_string()];
2684        let new_plan_columns = vec!["number2".to_string()];
2685        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2686
2687        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2688            logical_plan.clone(),
2689            table_names,
2690            columns,
2691            plan_columns,
2692            definition.to_string(),
2693        ));
2694        // should be ok.
2695        table_metadata_manager
2696            .update_view_info(
2697                view_id,
2698                &current_view_info_value,
2699                new_logical_plan.clone(),
2700                new_table_names.clone(),
2701                new_columns.clone(),
2702                new_plan_columns.clone(),
2703                new_definition.to_string(),
2704            )
2705            .await
2706            .unwrap();
2707        // if table info was updated, it should be ok.
2708        table_metadata_manager
2709            .update_view_info(
2710                view_id,
2711                &current_view_info_value,
2712                new_logical_plan.clone(),
2713                new_table_names.clone(),
2714                new_columns.clone(),
2715                new_plan_columns.clone(),
2716                new_definition.to_string(),
2717            )
2718            .await
2719            .unwrap();
2720
2721        // updated view_info should equal the `new_logical_plan`
2722        let updated_view_info = table_metadata_manager
2723            .view_info_manager()
2724            .get(view_id)
2725            .await
2726            .unwrap()
2727            .unwrap()
2728            .into_inner();
2729        assert_eq!(updated_view_info.view_info, new_logical_plan);
2730        assert_eq!(updated_view_info.table_names, new_table_names);
2731        assert_eq!(updated_view_info.definition, new_definition);
2732        assert_eq!(updated_view_info.columns, new_columns);
2733        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2734
2735        let wrong_view_info = logical_plan.clone();
2736        let wrong_definition = "wrong_definition";
2737        let wrong_view_info_value =
2738            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2739                wrong_view_info,
2740                new_table_names.clone(),
2741                new_columns.clone(),
2742                new_plan_columns.clone(),
2743                wrong_definition.to_string(),
2744            ));
2745        // if the current_view_info_value is wrong, it should return an error.
2746        // The ABA problem.
2747        assert!(
2748            table_metadata_manager
2749                .update_view_info(
2750                    view_id,
2751                    &wrong_view_info_value,
2752                    new_logical_plan.clone(),
2753                    new_table_names.clone(),
2754                    vec!["c".to_string()],
2755                    vec!["number3".to_string()],
2756                    wrong_definition.to_string(),
2757                )
2758                .await
2759                .is_err()
2760        );
2761
2762        // The view_info is not changed.
2763        let current_view_info = table_metadata_manager
2764            .view_info_manager()
2765            .get(view_id)
2766            .await
2767            .unwrap()
2768            .unwrap()
2769            .into_inner();
2770        assert_eq!(current_view_info.view_info, new_logical_plan);
2771        assert_eq!(current_view_info.table_names, new_table_names);
2772        assert_eq!(current_view_info.definition, new_definition);
2773        assert_eq!(current_view_info.columns, new_columns);
2774        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2775    }
2776
2777    #[test]
2778    fn test_region_role_set_deserialize() {
2779        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2780        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2781        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2782        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2783
2784        let s = r#"[1, 2, 3]"#;
2785        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2786        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2787        assert!(region_role_set.follower_regions.is_empty());
2788    }
2789
2790    #[test]
2791    fn test_region_distribution_deserialize() {
2792        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2793        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2794        assert_eq!(region_distribution.len(), 2);
2795        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2796        assert!(region_distribution[&1].follower_regions.is_empty());
2797        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2798        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2799    }
2800}