common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{TableId, TableInfo};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186// The legacy topic key prefix is used to store the topic name in previous versions.
187pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190/// The election key.
191pub const ELECTION_KEY: &str = "__metasrv_election";
192/// The root key of metasrv election candidates.
193pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195/// The keys with these prefixes will be loaded into the cache when the leader starts.
196pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197    TABLE_NAME_KEY_PREFIX,
198    CATALOG_NAME_KEY_PREFIX,
199    SCHEMA_NAME_KEY_PREFIX,
200    TABLE_ROUTE_PREFIX,
201    NODE_ADDRESS_PREFIX,
202];
203
204/// A set of regions with the same role.
205#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207    /// Leader regions.
208    pub leader_regions: Vec<RegionNumber>,
209    /// Follower regions.
210    pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215    where
216        D: serde::Deserializer<'de>,
217    {
218        #[derive(Deserialize)]
219        #[serde(untagged)]
220        enum RegionRoleSetOrLeaderOnly {
221            Full {
222                leader_regions: Vec<RegionNumber>,
223                follower_regions: Vec<RegionNumber>,
224            },
225            LeaderOnly(Vec<RegionNumber>),
226        }
227        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228            RegionRoleSetOrLeaderOnly::Full {
229                leader_regions,
230                follower_regions,
231            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233                Ok(RegionRoleSet::new(leader_regions, vec![]))
234            }
235        }
236    }
237}
238
239impl RegionRoleSet {
240    /// Create a new region role set.
241    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242        Self {
243            leader_regions,
244            follower_regions,
245        }
246    }
247
248    /// Add a leader region to the set.
249    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250        self.leader_regions.push(region_number);
251    }
252
253    /// Add a follower region to the set.
254    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255        self.follower_regions.push(region_number);
256    }
257
258    /// Sort the regions.
259    pub fn sort(&mut self) {
260        self.follower_regions.sort();
261        self.leader_regions.sort();
262    }
263}
264
265/// The distribution of regions.
266///
267/// The key is the datanode id, the value is the region role set.
268pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270/// The id of flow.
271pub type FlowId = u32;
272/// The partition of flow.
273pub type FlowPartitionId = u32;
274
275lazy_static! {
276    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280    static ref TABLE_INFO_KEY_PATTERN: Regex =
281        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285    static ref VIEW_INFO_KEY_PATTERN: Regex =
286        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307    ))
308    .unwrap();
309}
310
311lazy_static! {
312    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
313    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315    ))
316        .unwrap();
317}
318
319lazy_static! {
320    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
321    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323    ))
324        .unwrap();
325}
326
327lazy_static! {
328    static ref NODE_ADDRESS_PATTERN: Regex =
329        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340    ))
341    .unwrap();
342}
343
344/// The key of metadata.
345pub trait MetadataKey<'a, T> {
346    fn to_bytes(&self) -> Vec<u8>;
347
348    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355    fn from(value: Vec<u8>) -> Self {
356        Self(value)
357    }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361    fn to_bytes(&self) -> Vec<u8> {
362        self.0.clone()
363    }
364
365    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366        Ok(BytesAdapter(bytes.to_vec()))
367    }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371    fn build_get_op(
372        &self,
373    ) -> (
374        TxnOp,
375        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376    );
377}
378
379pub trait MetadataValue {
380    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381    where
382        Self: Sized;
383
384    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390    table_name_manager: TableNameManager,
391    table_info_manager: TableInfoManager,
392    view_info_manager: ViewInfoManager,
393    datanode_table_manager: DatanodeTableManager,
394    catalog_manager: CatalogManager,
395    schema_manager: SchemaManager,
396    table_route_manager: TableRouteManager,
397    table_repart_manager: TableRepartManager,
398    tombstone_manager: TombstoneManager,
399    topic_name_manager: TopicNameManager,
400    topic_region_manager: TopicRegionManager,
401    kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406    ($got:expr, $expected_value:expr, $name:expr) => {
407        ensure!(
408            $got == $expected_value,
409            error::UnexpectedSnafu {
410                err_msg: format!(
411                    "Reads the different value: {:?} during {}, expected: {:?}",
412                    $got, $name, $expected_value
413                )
414            }
415        );
416    };
417}
418
419/// A struct containing a deserialized value(`inner`) and an original bytes.
420///
421/// - Serialize behaviors:
422///
423/// The `inner` field will be ignored.
424///
425/// - Deserialize behaviors:
426///
427/// The `inner` field will be deserialized from the `bytes` field.
428pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429    // The original bytes of the inner.
430    bytes: Bytes,
431    // The value was deserialized from the original bytes.
432    inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436    type Target = T;
437
438    fn deref(&self) -> &Self::Target {
439        &self.inner
440    }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444    fn deref_mut(&mut self) -> &mut Self::Target {
445        &mut self.inner
446    }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454            self.inner, self.bytes
455        )
456    }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460    /// - Serialize behaviors:
461    ///
462    /// The `inner` field will be ignored.
463    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464    where
465        S: serde::Serializer,
466    {
467        // Safety: The original bytes are always JSON encoded.
468        // It's more efficiently than `serialize_bytes`.
469        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470    }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474    for DeserializedValueWithBytes<T>
475{
476    /// - Deserialize behaviors:
477    ///
478    /// The `inner` field will be deserialized from the `bytes` field.
479    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480    where
481        D: serde::Deserializer<'de>,
482    {
483        let buf = String::deserialize(deserializer)?;
484        let bytes = Bytes::from(buf);
485
486        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489        Ok(value)
490    }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494    fn clone(&self) -> Self {
495        Self {
496            bytes: self.bytes.clone(),
497            inner: self.inner.clone(),
498        }
499    }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503    /// Returns a struct containing a deserialized value and an original `bytes`.
504    /// It accepts original bytes of inner.
505    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506        let inner = T::try_from_raw_value(&bytes)?;
507        Ok(Self { bytes, inner })
508    }
509
510    /// Returns a struct containing a deserialized value and an original `bytes`.
511    /// It accepts original bytes of inner.
512    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514    }
515
516    pub fn into_inner(self) -> T {
517        self.inner
518    }
519
520    pub fn get_inner_ref(&self) -> &T {
521        &self.inner
522    }
523
524    /// Returns original `bytes`
525    pub fn get_raw_bytes(&self) -> Vec<u8> {
526        self.bytes.to_vec()
527    }
528
529    #[cfg(any(test, feature = "testing"))]
530    pub fn from_inner(inner: T) -> Self {
531        let bytes = serde_json::to_vec(&inner).unwrap();
532
533        Self {
534            bytes: Bytes::from(bytes),
535            inner,
536        }
537    }
538}
539
540impl TableMetadataManager {
541    pub fn new(kv_backend: KvBackendRef) -> Self {
542        TableMetadataManager {
543            table_name_manager: TableNameManager::new(kv_backend.clone()),
544            table_info_manager: TableInfoManager::new(kv_backend.clone()),
545            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547            catalog_manager: CatalogManager::new(kv_backend.clone()),
548            schema_manager: SchemaManager::new(kv_backend.clone()),
549            table_route_manager: TableRouteManager::new(kv_backend.clone()),
550            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554            kv_backend,
555        }
556    }
557
558    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
559    pub fn new_with_custom_tombstone_prefix(
560        kv_backend: KvBackendRef,
561        tombstone_prefix: &str,
562    ) -> Self {
563        Self {
564            table_name_manager: TableNameManager::new(kv_backend.clone()),
565            table_info_manager: TableInfoManager::new(kv_backend.clone()),
566            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568            catalog_manager: CatalogManager::new(kv_backend.clone()),
569            schema_manager: SchemaManager::new(kv_backend.clone()),
570            table_route_manager: TableRouteManager::new(kv_backend.clone()),
571            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572            tombstone_manager: TombstoneManager::new_with_prefix(
573                kv_backend.clone(),
574                tombstone_prefix,
575            ),
576            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578            kv_backend,
579        }
580    }
581
582    pub async fn init(&self) -> Result<()> {
583        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585        self.catalog_manager().create(catalog_name, true).await?;
586
587        let internal_schemas = [
588            DEFAULT_SCHEMA_NAME,
589            INFORMATION_SCHEMA_NAME,
590            DEFAULT_PRIVATE_SCHEMA_NAME,
591        ];
592
593        for schema_name in internal_schemas {
594            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596            self.schema_manager().create(schema_key, None, true).await?;
597        }
598
599        Ok(())
600    }
601
602    pub fn table_name_manager(&self) -> &TableNameManager {
603        &self.table_name_manager
604    }
605
606    pub fn table_info_manager(&self) -> &TableInfoManager {
607        &self.table_info_manager
608    }
609
610    pub fn view_info_manager(&self) -> &ViewInfoManager {
611        &self.view_info_manager
612    }
613
614    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615        &self.datanode_table_manager
616    }
617
618    pub fn catalog_manager(&self) -> &CatalogManager {
619        &self.catalog_manager
620    }
621
622    pub fn schema_manager(&self) -> &SchemaManager {
623        &self.schema_manager
624    }
625
626    pub fn table_route_manager(&self) -> &TableRouteManager {
627        &self.table_route_manager
628    }
629
630    pub fn table_repart_manager(&self) -> &TableRepartManager {
631        &self.table_repart_manager
632    }
633
634    pub fn topic_name_manager(&self) -> &TopicNameManager {
635        &self.topic_name_manager
636    }
637
638    pub fn topic_region_manager(&self) -> &TopicRegionManager {
639        &self.topic_region_manager
640    }
641
642    pub fn kv_backend(&self) -> &KvBackendRef {
643        &self.kv_backend
644    }
645
646    pub async fn get_full_table_info(
647        &self,
648        table_id: TableId,
649    ) -> Result<(
650        Option<DeserializedValueWithBytes<TableInfoValue>>,
651        Option<DeserializedValueWithBytes<TableRouteValue>>,
652    )> {
653        let table_info_key = TableInfoKey::new(table_id);
654        let table_route_key = TableRouteKey::new(table_id);
655        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659        let mut res = self.kv_backend.txn(txn).await?;
660        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663        Ok((table_info_value, table_route_value))
664    }
665
666    /// Creates metadata for view and returns an error if different metadata exists.
667    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
668    /// Parameters include:
669    /// - `view_info`: the encoded logical plan
670    /// - `table_names`: the resolved fully table names in logical plan
671    /// - `columns`: the view columns
672    /// - `plan_columns`: the original plan columns
673    /// - `definition`: The SQL to create the view
674    ///
675    pub async fn create_view_metadata(
676        &self,
677        view_info: TableInfo,
678        raw_logical_plan: Vec<u8>,
679        table_names: HashSet<TableName>,
680        columns: Vec<String>,
681        plan_columns: Vec<String>,
682        definition: String,
683    ) -> Result<()> {
684        let view_id = view_info.ident.table_id;
685
686        // Creates view name.
687        let view_name = TableNameKey::new(
688            &view_info.catalog_name,
689            &view_info.schema_name,
690            &view_info.name,
691        );
692        let create_table_name_txn = self
693            .table_name_manager()
694            .build_create_txn(&view_name, view_id)?;
695
696        // Creates table info.
697        let table_info_value = TableInfoValue::new(view_info);
698
699        let (create_table_info_txn, on_create_table_info_failure) = self
700            .table_info_manager()
701            .build_create_txn(view_id, &table_info_value)?;
702
703        // Creates view info
704        let view_info_value = ViewInfoValue::new(
705            raw_logical_plan,
706            table_names,
707            columns,
708            plan_columns,
709            definition,
710        );
711        let (create_view_info_txn, on_create_view_info_failure) = self
712            .view_info_manager()
713            .build_create_txn(view_id, &view_info_value)?;
714
715        let txn = Txn::merge_all(vec![
716            create_table_name_txn,
717            create_table_info_txn,
718            create_view_info_txn,
719        ]);
720
721        let mut r = self.kv_backend.txn(txn).await?;
722
723        // Checks whether metadata was already created.
724        if !r.succeeded {
725            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726            let remote_table_info = on_create_table_info_failure(&mut set)?
727                .context(error::UnexpectedSnafu {
728                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729                })?
730                .into_inner();
731
732            let remote_view_info = on_create_view_info_failure(&mut set)?
733                .context(error::UnexpectedSnafu {
734                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735                })?
736                .into_inner();
737
738            let op_name = "the creating view metadata";
739            ensure_values!(remote_table_info, table_info_value, op_name);
740            ensure_values!(remote_view_info, view_info_value, op_name);
741        }
742
743        Ok(())
744    }
745
746    /// Creates metadata for table and returns an error if different metadata exists.
747    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
748    pub async fn create_table_metadata(
749        &self,
750        table_info: TableInfo,
751        table_route_value: TableRouteValue,
752        region_wal_options: HashMap<RegionNumber, String>,
753    ) -> Result<()> {
754        let table_id = table_info.ident.table_id;
755        let engine = table_info.meta.engine.clone();
756
757        // Creates table name.
758        let table_name = TableNameKey::new(
759            &table_info.catalog_name,
760            &table_info.schema_name,
761            &table_info.name,
762        );
763        let create_table_name_txn = self
764            .table_name_manager()
765            .build_create_txn(&table_name, table_id)?;
766
767        let region_options = table_info.to_region_options();
768        // Creates table info.
769        let table_info_value = TableInfoValue::new(table_info);
770        let (create_table_info_txn, on_create_table_info_failure) = self
771            .table_info_manager()
772            .build_create_txn(table_id, &table_info_value)?;
773
774        let (create_table_route_txn, on_create_table_route_failure) = self
775            .table_route_manager()
776            .table_route_storage()
777            .build_create_txn(table_id, &table_route_value)?;
778
779        let create_topic_region_txn = self
780            .topic_region_manager
781            .build_create_txn(table_id, &region_wal_options)?;
782
783        let mut txn = Txn::merge_all(vec![
784            create_table_name_txn,
785            create_table_info_txn,
786            create_table_route_txn,
787            create_topic_region_txn,
788        ]);
789
790        if let TableRouteValue::Physical(x) = &table_route_value {
791            let region_storage_path = table_info_value.region_storage_path();
792            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793                table_id,
794                &engine,
795                &region_storage_path,
796                region_options,
797                region_wal_options,
798                region_distribution(&x.region_routes),
799            )?;
800            txn = txn.merge(create_datanode_table_txn);
801        }
802
803        let mut r = self.kv_backend.txn(txn).await?;
804
805        // Checks whether metadata was already created.
806        if !r.succeeded {
807            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808            let remote_table_info = on_create_table_info_failure(&mut set)?
809                .context(error::UnexpectedSnafu {
810                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811                })?
812                .into_inner();
813
814            let remote_table_route = on_create_table_route_failure(&mut set)?
815                .context(error::UnexpectedSnafu {
816                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817                })?
818                .into_inner();
819
820            let op_name = "the creating table metadata";
821            ensure_values!(remote_table_info, table_info_value, op_name);
822            ensure_values!(remote_table_route, table_route_value, op_name);
823        }
824
825        Ok(())
826    }
827
828    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829        // The batch size is max_txn_size / 3 because the size of the `tables_data`
830        // is 3 times the size of the `tables_data`.
831        self.kv_backend.max_txn_ops() / 3
832    }
833
834    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
835    pub async fn create_logical_tables_metadata(
836        &self,
837        tables_data: Vec<(TableInfo, TableRouteValue)>,
838    ) -> Result<()> {
839        let len = tables_data.len();
840        let mut txns = Vec::with_capacity(3 * len);
841        struct OnFailure<F1, R1, F2, R2>
842        where
843            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845        {
846            table_info_value: TableInfoValue,
847            on_create_table_info_failure: F1,
848            table_route_value: TableRouteValue,
849            on_create_table_route_failure: F2,
850        }
851        let mut on_failures = Vec::with_capacity(len);
852        for (table_info, table_route_value) in tables_data {
853            let table_id = table_info.ident.table_id;
854
855            // Creates table name.
856            let table_name = TableNameKey::new(
857                &table_info.catalog_name,
858                &table_info.schema_name,
859                &table_info.name,
860            );
861            let create_table_name_txn = self
862                .table_name_manager()
863                .build_create_txn(&table_name, table_id)?;
864            txns.push(create_table_name_txn);
865
866            // Creates table info.
867            let table_info_value = TableInfoValue::new(table_info);
868            let (create_table_info_txn, on_create_table_info_failure) =
869                self.table_info_manager()
870                    .build_create_txn(table_id, &table_info_value)?;
871            txns.push(create_table_info_txn);
872
873            let (create_table_route_txn, on_create_table_route_failure) = self
874                .table_route_manager()
875                .table_route_storage()
876                .build_create_txn(table_id, &table_route_value)?;
877            txns.push(create_table_route_txn);
878
879            on_failures.push(OnFailure {
880                table_info_value,
881                on_create_table_info_failure,
882                table_route_value,
883                on_create_table_route_failure,
884            });
885        }
886
887        let txn = Txn::merge_all(txns);
888        let mut r = self.kv_backend.txn(txn).await?;
889
890        // Checks whether metadata was already created.
891        if !r.succeeded {
892            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893            for on_failure in on_failures {
894                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895                    .context(error::UnexpectedSnafu {
896                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897                    })?
898                    .into_inner();
899
900                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901                    .context(error::UnexpectedSnafu {
902                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903                    })?
904                    .into_inner();
905
906                let op_name = "the creating logical tables metadata";
907                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909            }
910        }
911
912        Ok(())
913    }
914
915    fn table_metadata_keys(
916        &self,
917        table_id: TableId,
918        table_name: &TableName,
919        table_route_value: &TableRouteValue,
920        region_wal_options: &HashMap<RegionNumber, WalOptions>,
921    ) -> Result<Vec<Vec<u8>>> {
922        // Builds keys
923        let datanode_ids = if table_route_value.is_physical() {
924            region_distribution(table_route_value.region_routes()?)
925                .into_keys()
926                .collect()
927        } else {
928            vec![]
929        };
930        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931        let table_name = TableNameKey::new(
932            &table_name.catalog_name,
933            &table_name.schema_name,
934            &table_name.table_name,
935        );
936        let table_info_key = TableInfoKey::new(table_id);
937        let table_route_key = TableRouteKey::new(table_id);
938        let table_repart_key = TableRepartKey::new(table_id);
939        let datanode_table_keys = datanode_ids
940            .into_iter()
941            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942            .collect::<HashSet<_>>();
943        let topic_region_map = self
944            .topic_region_manager
945            .get_topic_region_mapping(table_id, region_wal_options);
946        let topic_region_keys = topic_region_map
947            .iter()
948            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949            .collect::<Vec<_>>();
950        keys.push(table_name.to_bytes());
951        keys.push(table_info_key.to_bytes());
952        keys.push(table_route_key.to_bytes());
953        keys.push(table_repart_key.to_bytes());
954        for key in &datanode_table_keys {
955            keys.push(key.to_bytes());
956        }
957        for key in topic_region_keys {
958            keys.push(key.to_bytes());
959        }
960        Ok(keys)
961    }
962
963    /// Deletes metadata for table **logically**.
964    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
965    pub async fn delete_table_metadata(
966        &self,
967        table_id: TableId,
968        table_name: &TableName,
969        table_route_value: &TableRouteValue,
970        region_wal_options: &HashMap<RegionNumber, WalOptions>,
971    ) -> Result<()> {
972        let keys =
973            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974        self.tombstone_manager.create(keys).await.map(|_| ())
975    }
976
977    /// Deletes metadata tombstone for table **permanently**.
978    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
979    pub async fn delete_table_metadata_tombstone(
980        &self,
981        table_id: TableId,
982        table_name: &TableName,
983        table_route_value: &TableRouteValue,
984        region_wal_options: &HashMap<RegionNumber, WalOptions>,
985    ) -> Result<()> {
986        let table_metadata_keys =
987            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988        self.tombstone_manager
989            .delete(table_metadata_keys)
990            .await
991            .map(|_| ())
992    }
993
994    /// Restores metadata for table.
995    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
996    pub async fn restore_table_metadata(
997        &self,
998        table_id: TableId,
999        table_name: &TableName,
1000        table_route_value: &TableRouteValue,
1001        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002    ) -> Result<()> {
1003        let keys =
1004            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005        self.tombstone_manager.restore(keys).await.map(|_| ())
1006    }
1007
1008    /// Deletes metadata for table **permanently**.
1009    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1010    pub async fn destroy_table_metadata(
1011        &self,
1012        table_id: TableId,
1013        table_name: &TableName,
1014        table_route_value: &TableRouteValue,
1015        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016    ) -> Result<()> {
1017        let keys =
1018            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019        let _ = self
1020            .kv_backend
1021            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022            .await?;
1023        Ok(())
1024    }
1025
1026    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027        let mut keys = Vec::with_capacity(3);
1028        let view_name = TableNameKey::new(
1029            &view_name.catalog_name,
1030            &view_name.schema_name,
1031            &view_name.table_name,
1032        );
1033        let table_info_key = TableInfoKey::new(view_id);
1034        let view_info_key = ViewInfoKey::new(view_id);
1035        keys.push(view_name.to_bytes());
1036        keys.push(table_info_key.to_bytes());
1037        keys.push(view_info_key.to_bytes());
1038
1039        Ok(keys)
1040    }
1041
1042    /// Deletes metadata for view **permanently**.
1043    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1044    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045        let keys = self.view_info_keys(view_id, view_name)?;
1046        let _ = self
1047            .kv_backend
1048            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049            .await?;
1050        Ok(())
1051    }
1052
1053    /// Renames the table name and returns an error if different metadata exists.
1054    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1055    /// and the new `TableNameKey` MUST be empty.
1056    pub async fn rename_table(
1057        &self,
1058        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059        new_table_name: String,
1060    ) -> Result<()> {
1061        let current_table_info = &current_table_info_value.table_info;
1062        let table_id = current_table_info.ident.table_id;
1063
1064        let table_name_key = TableNameKey::new(
1065            &current_table_info.catalog_name,
1066            &current_table_info.schema_name,
1067            &current_table_info.name,
1068        );
1069
1070        let new_table_name_key = TableNameKey::new(
1071            &current_table_info.catalog_name,
1072            &current_table_info.schema_name,
1073            &new_table_name,
1074        );
1075
1076        // Updates table name.
1077        let update_table_name_txn = self.table_name_manager().build_update_txn(
1078            &table_name_key,
1079            &new_table_name_key,
1080            table_id,
1081        )?;
1082
1083        let new_table_info_value = current_table_info_value
1084            .inner
1085            .with_update(move |table_info| {
1086                table_info.name = new_table_name;
1087            });
1088
1089        // Updates table info.
1090        let (update_table_info_txn, on_update_table_info_failure) = self
1091            .table_info_manager()
1092            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096        let mut r = self.kv_backend.txn(txn).await?;
1097
1098        // Checks whether metadata was already updated.
1099        if !r.succeeded {
1100            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101            let remote_table_info = on_update_table_info_failure(&mut set)?
1102                .context(error::UnexpectedSnafu {
1103                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104                })?
1105                .into_inner();
1106
1107            let op_name = "the renaming table metadata";
1108            ensure_values!(remote_table_info, new_table_info_value, op_name);
1109        }
1110
1111        Ok(())
1112    }
1113
1114    /// Updates table info and returns an error if different metadata exists.
1115    /// And cascade-ly update all redundant table options for each region
1116    /// if region_distribution is present.
1117    pub async fn update_table_info(
1118        &self,
1119        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120        region_distribution: Option<RegionDistribution>,
1121        new_table_info: TableInfo,
1122    ) -> Result<()> {
1123        let table_id = current_table_info_value.table_info.ident.table_id;
1124        let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126        // Updates table info.
1127        let (update_table_info_txn, on_update_table_info_failure) = self
1128            .table_info_manager()
1129            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131        let txn = if let Some(region_distribution) = region_distribution {
1132            // region options induced from table info.
1133            let new_region_options = new_table_info_value.table_info.to_region_options();
1134            let update_datanode_table_options_txn = self
1135                .datanode_table_manager
1136                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137                .await?;
1138            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139        } else {
1140            update_table_info_txn
1141        };
1142
1143        let mut r = self.kv_backend.txn(txn).await?;
1144        // Checks whether metadata was already updated.
1145        if !r.succeeded {
1146            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147            let remote_table_info = on_update_table_info_failure(&mut set)?
1148                .context(error::UnexpectedSnafu {
1149                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150                })?
1151                .into_inner();
1152
1153            let op_name = "the updating table info";
1154            ensure_values!(remote_table_info, new_table_info_value, op_name);
1155        }
1156        Ok(())
1157    }
1158
1159    /// Updates view info and returns an error if different metadata exists.
1160    /// Parameters include:
1161    /// - `view_id`: the view id
1162    /// - `current_view_info_value`: the current view info for CAS checking
1163    /// - `new_view_info`: the encoded logical plan
1164    /// - `table_names`: the resolved fully table names in logical plan
1165    /// - `columns`: the view columns
1166    /// - `plan_columns`: the original plan columns
1167    /// - `definition`: The SQL to create the view
1168    ///
1169    #[allow(clippy::too_many_arguments)]
1170    pub async fn update_view_info(
1171        &self,
1172        view_id: TableId,
1173        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174        new_view_info: Vec<u8>,
1175        table_names: HashSet<TableName>,
1176        columns: Vec<String>,
1177        plan_columns: Vec<String>,
1178        definition: String,
1179    ) -> Result<()> {
1180        let new_view_info_value = current_view_info_value.update(
1181            new_view_info,
1182            table_names,
1183            columns,
1184            plan_columns,
1185            definition,
1186        );
1187
1188        // Updates view info.
1189        let (update_view_info_txn, on_update_view_info_failure) = self
1190            .view_info_manager()
1191            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195        // Checks whether metadata was already updated.
1196        if !r.succeeded {
1197            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198            let remote_view_info = on_update_view_info_failure(&mut set)?
1199                .context(error::UnexpectedSnafu {
1200                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201                })?
1202                .into_inner();
1203
1204            let op_name = "the updating view info";
1205            ensure_values!(remote_view_info, new_view_info_value, op_name);
1206        }
1207        Ok(())
1208    }
1209
1210    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211        self.kv_backend.max_txn_ops()
1212    }
1213
1214    pub async fn batch_update_table_info_values(
1215        &self,
1216        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1217    ) -> Result<()> {
1218        let len = table_info_value_pairs.len();
1219        let mut txns = Vec::with_capacity(len);
1220        struct OnFailure<F, R>
1221        where
1222            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223        {
1224            table_info_value: TableInfoValue,
1225            on_update_table_info_failure: F,
1226        }
1227        let mut on_failures = Vec::with_capacity(len);
1228
1229        for (table_info_value, new_table_info) in table_info_value_pairs {
1230            let table_id = table_info_value.table_info.ident.table_id;
1231
1232            let new_table_info_value = table_info_value.update(new_table_info);
1233
1234            let (update_table_info_txn, on_update_table_info_failure) =
1235                self.table_info_manager().build_update_txn(
1236                    table_id,
1237                    &table_info_value,
1238                    &new_table_info_value,
1239                )?;
1240
1241            txns.push(update_table_info_txn);
1242
1243            on_failures.push(OnFailure {
1244                table_info_value: new_table_info_value,
1245                on_update_table_info_failure,
1246            });
1247        }
1248
1249        let txn = Txn::merge_all(txns);
1250        let mut r = self.kv_backend.txn(txn).await?;
1251
1252        if !r.succeeded {
1253            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254            for on_failure in on_failures {
1255                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256                    .context(error::UnexpectedSnafu {
1257                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258                    })?
1259                    .into_inner();
1260
1261                let op_name = "the batch updating table info";
1262                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268
1269    pub async fn update_table_route(
1270        &self,
1271        table_id: TableId,
1272        region_info: RegionInfo,
1273        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274        new_region_routes: Vec<RegionRoute>,
1275        new_region_options: &HashMap<String, String>,
1276        new_region_wal_options: &HashMap<RegionNumber, String>,
1277    ) -> Result<()> {
1278        // Updates the datanode table key value pairs.
1279        let current_region_distribution =
1280            region_distribution(current_table_route_value.region_routes()?);
1281        let new_region_distribution = region_distribution(&new_region_routes);
1282
1283        let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1284            table_id,
1285            &region_info.region_wal_options,
1286            new_region_wal_options,
1287        )?;
1288        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1289            table_id,
1290            region_info,
1291            current_region_distribution,
1292            new_region_distribution,
1293            new_region_options,
1294            new_region_wal_options,
1295        )?;
1296
1297        // Updates the table_route.
1298        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299        let (update_table_route_txn, on_update_table_route_failure) = self
1300            .table_route_manager()
1301            .table_route_storage()
1302            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1303
1304        let txn = Txn::merge_all(vec![
1305            update_datanode_table_txn,
1306            update_table_route_txn,
1307            update_topic_region_txn,
1308        ]);
1309
1310        let mut r = self.kv_backend.txn(txn).await?;
1311
1312        // Checks whether metadata was already updated.
1313        if !r.succeeded {
1314            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1315            let remote_table_route = on_update_table_route_failure(&mut set)?
1316                .context(error::UnexpectedSnafu {
1317                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1318                })?
1319                .into_inner();
1320
1321            let op_name = "the updating table route";
1322            ensure_values!(remote_table_route, new_table_route_value, op_name);
1323        }
1324
1325        Ok(())
1326    }
1327
1328    /// Updates the leader status of the [RegionRoute].
1329    pub async fn update_leader_region_status<F>(
1330        &self,
1331        table_id: TableId,
1332        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1333        next_region_route_status: F,
1334    ) -> Result<()>
1335    where
1336        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1337    {
1338        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1339
1340        let mut updated = 0;
1341        for route in &mut new_region_routes {
1342            if let Some(state) = next_region_route_status(route)
1343                && route.set_leader_state(state)
1344            {
1345                updated += 1;
1346            }
1347        }
1348
1349        if updated == 0 {
1350            warn!("No leader status updated");
1351            return Ok(());
1352        }
1353
1354        // Updates the table_route.
1355        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1356
1357        let (update_table_route_txn, on_update_table_route_failure) = self
1358            .table_route_manager()
1359            .table_route_storage()
1360            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1361
1362        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1363
1364        // Checks whether metadata was already updated.
1365        if !r.succeeded {
1366            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1367            let remote_table_route = on_update_table_route_failure(&mut set)?
1368                .context(error::UnexpectedSnafu {
1369                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1370                })?
1371                .into_inner();
1372
1373            let op_name = "the updating leader region status";
1374            ensure_values!(remote_table_route, new_table_route_value, op_name);
1375        }
1376
1377        Ok(())
1378    }
1379}
1380
1381#[macro_export]
1382macro_rules! impl_metadata_value {
1383    ($($val_ty: ty), *) => {
1384        $(
1385            impl $crate::key::MetadataValue for $val_ty {
1386                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1387                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1388                }
1389
1390                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1391                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1392                }
1393            }
1394        )*
1395    }
1396}
1397
1398macro_rules! impl_metadata_key_get_txn_op {
1399    ($($key: ty), *) => {
1400        $(
1401            impl $crate::key::MetadataKeyGetTxnOp for $key {
1402                /// Returns a [TxnOp] to retrieve the corresponding value
1403                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1404                fn build_get_op(
1405                    &self,
1406                ) -> (
1407                    TxnOp,
1408                    impl for<'a> FnMut(
1409                        &'a mut TxnOpGetResponseSet,
1410                    ) -> Option<Vec<u8>>,
1411                ) {
1412                    let raw_key = self.to_bytes();
1413                    (
1414                        TxnOp::Get(raw_key.clone()),
1415                        TxnOpGetResponseSet::filter(raw_key),
1416                    )
1417                }
1418            }
1419        )*
1420    }
1421}
1422
1423impl_metadata_key_get_txn_op! {
1424    TableNameKey<'_>,
1425    TableInfoKey,
1426    ViewInfoKey,
1427    TableRouteKey,
1428    DatanodeTableKey
1429}
1430
1431#[macro_export]
1432macro_rules! impl_optional_metadata_value {
1433    ($($val_ty: ty), *) => {
1434        $(
1435            impl $val_ty {
1436                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1437                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1438                }
1439
1440                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1441                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1442                }
1443            }
1444        )*
1445    }
1446}
1447
1448impl_metadata_value! {
1449    TableNameValue,
1450    TableInfoValue,
1451    ViewInfoValue,
1452    DatanodeTableValue,
1453    FlowInfoValue,
1454    FlowNameValue,
1455    FlowRouteValue,
1456    TableFlowValue,
1457    NodeAddressValue,
1458    SchemaNameValue,
1459    FlowStateValue,
1460    PoisonValue,
1461    TopicRegionValue
1462}
1463
1464impl_optional_metadata_value! {
1465    CatalogNameValue,
1466    SchemaNameValue
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471    use std::collections::{BTreeMap, HashMap, HashSet};
1472    use std::sync::Arc;
1473
1474    use bytes::Bytes;
1475    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1476    use common_time::util::current_time_millis;
1477    use common_wal::options::{KafkaWalOptions, WalOptions};
1478    use futures::TryStreamExt;
1479    use store_api::storage::{RegionId, RegionNumber};
1480    use table::metadata::TableInfo;
1481    use table::table_name::TableName;
1482
1483    use super::datanode_table::DatanodeTableKey;
1484    use super::test_utils;
1485    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1486    use crate::ddl::test_util::create_table::test_create_table_task;
1487    use crate::ddl::utils::region_storage_path;
1488    use crate::error::Result;
1489    use crate::key::datanode_table::RegionInfo;
1490    use crate::key::table_info::TableInfoValue;
1491    use crate::key::table_name::TableNameKey;
1492    use crate::key::table_route::TableRouteValue;
1493    use crate::key::topic_region::TopicRegionKey;
1494    use crate::key::{
1495        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1496        TableMetadataManager, ViewInfoValue,
1497    };
1498    use crate::kv_backend::KvBackend;
1499    use crate::kv_backend::memory::MemoryKvBackend;
1500    use crate::peer::Peer;
1501    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1502    use crate::rpc::store::RangeRequest;
1503    use crate::wal_provider::WalProvider;
1504
1505    #[test]
1506    fn test_deserialized_value_with_bytes() {
1507        let region_route = new_test_region_route();
1508        let region_routes = vec![region_route.clone()];
1509
1510        let expected_region_routes =
1511            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1512        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1513
1514        // Serialize behaviors:
1515        // The inner field will be ignored.
1516        let value = DeserializedValueWithBytes {
1517            // ignored
1518            inner: TableRouteValue::physical(region_routes.clone()),
1519            bytes: Bytes::from(expected.clone()),
1520        };
1521
1522        let encoded = serde_json::to_vec(&value).unwrap();
1523
1524        // Deserialize behaviors:
1525        // The inner field will be deserialized from the bytes field.
1526        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1527            serde_json::from_slice(&encoded).unwrap();
1528
1529        assert_eq!(decoded.inner, expected_region_routes);
1530        assert_eq!(decoded.bytes, expected);
1531    }
1532
1533    fn new_test_region_route() -> RegionRoute {
1534        new_region_route(1, 2)
1535    }
1536
1537    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1538        RegionRoute {
1539            region: Region {
1540                id: region_id.into(),
1541                name: "r1".to_string(),
1542                partition: None,
1543                attrs: BTreeMap::new(),
1544                partition_expr: Default::default(),
1545            },
1546            leader_peer: Some(Peer::new(datanode, "a2")),
1547            follower_peers: vec![],
1548            leader_state: None,
1549            leader_down_since: None,
1550        }
1551    }
1552
1553    fn new_test_table_info() -> TableInfo {
1554        test_utils::new_test_table_info(10)
1555    }
1556
1557    fn new_test_table_names() -> HashSet<TableName> {
1558        let mut set = HashSet::new();
1559        set.insert(TableName {
1560            catalog_name: "greptime".to_string(),
1561            schema_name: "public".to_string(),
1562            table_name: "a_table".to_string(),
1563        });
1564        set.insert(TableName {
1565            catalog_name: "greptime".to_string(),
1566            schema_name: "public".to_string(),
1567            table_name: "b_table".to_string(),
1568        });
1569        set
1570    }
1571
1572    async fn create_physical_table_metadata(
1573        table_metadata_manager: &TableMetadataManager,
1574        table_info: TableInfo,
1575        region_routes: Vec<RegionRoute>,
1576        region_wal_options: HashMap<RegionNumber, String>,
1577    ) -> Result<()> {
1578        table_metadata_manager
1579            .create_table_metadata(
1580                table_info,
1581                TableRouteValue::physical(region_routes),
1582                region_wal_options,
1583            )
1584            .await
1585    }
1586
1587    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1588        let topics = (0..2)
1589            .map(|i| format!("greptimedb_topic{}", i))
1590            .collect::<Vec<_>>();
1591        let wal_options = topics
1592            .iter()
1593            .map(|topic| {
1594                WalOptions::Kafka(KafkaWalOptions {
1595                    topic: topic.clone(),
1596                })
1597            })
1598            .collect::<Vec<_>>();
1599
1600        (0..16)
1601            .enumerate()
1602            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1603            .collect()
1604    }
1605
1606    #[tokio::test]
1607    async fn test_raft_engine_topic_region_map() {
1608        let mem_kv = Arc::new(MemoryKvBackend::default());
1609        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1610        let region_route = new_test_region_route();
1611        let region_routes = &vec![region_route.clone()];
1612        let table_info = new_test_table_info();
1613        let wal_provider = WalProvider::RaftEngine;
1614        let regions: Vec<_> = (0..16).collect();
1615        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1616        create_physical_table_metadata(
1617            &table_metadata_manager,
1618            table_info.clone(),
1619            region_routes.clone(),
1620            region_wal_options.clone(),
1621        )
1622        .await
1623        .unwrap();
1624
1625        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1626        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1627        let resp = mem_kv.range(range_req).await.unwrap();
1628        // Should be empty because the topic region map is empty for raft engine.
1629        assert!(resp.kvs.is_empty());
1630    }
1631
1632    #[tokio::test]
1633    async fn test_create_table_metadata() {
1634        let mem_kv = Arc::new(MemoryKvBackend::default());
1635        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1636        let region_route = new_test_region_route();
1637        let region_routes = &vec![region_route.clone()];
1638        let table_info = new_test_table_info();
1639        let region_wal_options = create_mock_region_wal_options()
1640            .into_iter()
1641            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1642            .collect::<HashMap<_, _>>();
1643
1644        // creates metadata.
1645        create_physical_table_metadata(
1646            &table_metadata_manager,
1647            table_info.clone(),
1648            region_routes.clone(),
1649            region_wal_options.clone(),
1650        )
1651        .await
1652        .unwrap();
1653
1654        // if metadata was already created, it should be ok.
1655        assert!(
1656            create_physical_table_metadata(
1657                &table_metadata_manager,
1658                table_info.clone(),
1659                region_routes.clone(),
1660                region_wal_options.clone(),
1661            )
1662            .await
1663            .is_ok()
1664        );
1665
1666        let mut modified_region_routes = region_routes.clone();
1667        modified_region_routes.push(region_route.clone());
1668        // if remote metadata was exists, it should return an error.
1669        assert!(
1670            create_physical_table_metadata(
1671                &table_metadata_manager,
1672                table_info.clone(),
1673                modified_region_routes,
1674                region_wal_options.clone(),
1675            )
1676            .await
1677            .is_err()
1678        );
1679
1680        let (remote_table_info, remote_table_route) = table_metadata_manager
1681            .get_full_table_info(10)
1682            .await
1683            .unwrap();
1684
1685        assert_eq!(
1686            remote_table_info.unwrap().into_inner().table_info,
1687            table_info
1688        );
1689        assert_eq!(
1690            remote_table_route
1691                .unwrap()
1692                .into_inner()
1693                .region_routes()
1694                .unwrap(),
1695            region_routes
1696        );
1697
1698        for i in 0..2 {
1699            let region_number = i as u32;
1700            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1701            let topic = format!("greptimedb_topic{}", i);
1702            let regions = table_metadata_manager
1703                .topic_region_manager
1704                .regions(&topic)
1705                .await
1706                .unwrap()
1707                .into_keys()
1708                .collect::<Vec<_>>();
1709            assert_eq!(regions.len(), 8);
1710            assert!(regions.contains(&region_id));
1711        }
1712    }
1713
1714    #[tokio::test]
1715    async fn test_create_logic_tables_metadata() {
1716        let mem_kv = Arc::new(MemoryKvBackend::default());
1717        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1718        let region_route = new_test_region_route();
1719        let region_routes = vec![region_route.clone()];
1720        let table_info = new_test_table_info();
1721        let table_id = table_info.ident.table_id;
1722        let table_route_value = TableRouteValue::physical(region_routes.clone());
1723
1724        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1725        // creates metadata.
1726        table_metadata_manager
1727            .create_logical_tables_metadata(tables_data.clone())
1728            .await
1729            .unwrap();
1730
1731        // if metadata was already created, it should be ok.
1732        assert!(
1733            table_metadata_manager
1734                .create_logical_tables_metadata(tables_data)
1735                .await
1736                .is_ok()
1737        );
1738
1739        let mut modified_region_routes = region_routes.clone();
1740        modified_region_routes.push(new_region_route(2, 3));
1741        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1742        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1743        // if remote metadata was exists, it should return an error.
1744        assert!(
1745            table_metadata_manager
1746                .create_logical_tables_metadata(modified_tables_data)
1747                .await
1748                .is_err()
1749        );
1750
1751        let (remote_table_info, remote_table_route) = table_metadata_manager
1752            .get_full_table_info(table_id)
1753            .await
1754            .unwrap();
1755
1756        assert_eq!(
1757            remote_table_info.unwrap().into_inner().table_info,
1758            table_info
1759        );
1760        assert_eq!(
1761            remote_table_route
1762                .unwrap()
1763                .into_inner()
1764                .region_routes()
1765                .unwrap(),
1766            &region_routes
1767        );
1768    }
1769
1770    #[tokio::test]
1771    async fn test_create_many_logical_tables_metadata() {
1772        let kv_backend = Arc::new(MemoryKvBackend::default());
1773        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1774
1775        let mut tables_data = vec![];
1776        for i in 0..128 {
1777            let table_id = i + 1;
1778            let regin_number = table_id * 3;
1779            let region_id = RegionId::new(table_id, regin_number);
1780            let region_route = new_region_route(region_id.as_u64(), 2);
1781            let region_routes = vec![region_route.clone()];
1782            let table_info = test_utils::new_test_table_info_with_name(
1783                table_id,
1784                &format!("my_table_{}", table_id),
1785            );
1786            let table_route_value = TableRouteValue::physical(region_routes.clone());
1787
1788            tables_data.push((table_info, table_route_value));
1789        }
1790
1791        // creates metadata.
1792        table_metadata_manager
1793            .create_logical_tables_metadata(tables_data)
1794            .await
1795            .unwrap();
1796    }
1797
1798    #[tokio::test]
1799    async fn test_delete_table_metadata() {
1800        let mem_kv = Arc::new(MemoryKvBackend::default());
1801        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1802        let region_route = new_test_region_route();
1803        let region_routes = &vec![region_route.clone()];
1804        let table_info = new_test_table_info();
1805        let table_id = table_info.ident.table_id;
1806        let datanode_id = 2;
1807        let region_wal_options = create_mock_region_wal_options();
1808        let serialized_region_wal_options = region_wal_options
1809            .iter()
1810            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1811            .collect::<HashMap<_, _>>();
1812
1813        // creates metadata.
1814        create_physical_table_metadata(
1815            &table_metadata_manager,
1816            table_info.clone(),
1817            region_routes.clone(),
1818            serialized_region_wal_options,
1819        )
1820        .await
1821        .unwrap();
1822
1823        let table_name = TableName::new(
1824            table_info.catalog_name,
1825            table_info.schema_name,
1826            table_info.name,
1827        );
1828        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1829        // deletes metadata.
1830        table_metadata_manager
1831            .delete_table_metadata(
1832                table_id,
1833                &table_name,
1834                table_route_value,
1835                &region_wal_options,
1836            )
1837            .await
1838            .unwrap();
1839        // Should be ignored.
1840        table_metadata_manager
1841            .delete_table_metadata(
1842                table_id,
1843                &table_name,
1844                table_route_value,
1845                &region_wal_options,
1846            )
1847            .await
1848            .unwrap();
1849        assert!(
1850            table_metadata_manager
1851                .table_info_manager()
1852                .get(table_id)
1853                .await
1854                .unwrap()
1855                .is_none()
1856        );
1857        assert!(
1858            table_metadata_manager
1859                .table_route_manager()
1860                .table_route_storage()
1861                .get(table_id)
1862                .await
1863                .unwrap()
1864                .is_none()
1865        );
1866        assert!(
1867            table_metadata_manager
1868                .datanode_table_manager()
1869                .tables(datanode_id)
1870                .try_collect::<Vec<_>>()
1871                .await
1872                .unwrap()
1873                .is_empty()
1874        );
1875        // Checks removed values
1876        let table_info = table_metadata_manager
1877            .table_info_manager()
1878            .get(table_id)
1879            .await
1880            .unwrap();
1881        assert!(table_info.is_none());
1882        let table_route = table_metadata_manager
1883            .table_route_manager()
1884            .table_route_storage()
1885            .get(table_id)
1886            .await
1887            .unwrap();
1888        assert!(table_route.is_none());
1889        // Logical delete removes the topic region mapping as well.
1890        let regions = table_metadata_manager
1891            .topic_region_manager
1892            .regions("greptimedb_topic0")
1893            .await
1894            .unwrap();
1895        assert_eq!(regions.len(), 0);
1896        let regions = table_metadata_manager
1897            .topic_region_manager
1898            .regions("greptimedb_topic1")
1899            .await
1900            .unwrap();
1901        assert_eq!(regions.len(), 0);
1902    }
1903
1904    #[tokio::test]
1905    async fn test_rename_table() {
1906        let mem_kv = Arc::new(MemoryKvBackend::default());
1907        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1908        let region_route = new_test_region_route();
1909        let region_routes = vec![region_route.clone()];
1910        let table_info = new_test_table_info();
1911        let table_id = table_info.ident.table_id;
1912        // creates metadata.
1913        create_physical_table_metadata(
1914            &table_metadata_manager,
1915            table_info.clone(),
1916            region_routes.clone(),
1917            HashMap::new(),
1918        )
1919        .await
1920        .unwrap();
1921
1922        let new_table_name = "another_name".to_string();
1923        let table_info_value =
1924            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1925
1926        table_metadata_manager
1927            .rename_table(&table_info_value, new_table_name.clone())
1928            .await
1929            .unwrap();
1930        // if remote metadata was updated, it should be ok.
1931        table_metadata_manager
1932            .rename_table(&table_info_value, new_table_name.clone())
1933            .await
1934            .unwrap();
1935        let mut modified_table_info = table_info.clone();
1936        modified_table_info.name = "hi".to_string();
1937        let modified_table_info_value =
1938            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1939        // if the table_info_value is wrong, it should return an error.
1940        // The ABA problem.
1941        assert!(
1942            table_metadata_manager
1943                .rename_table(&modified_table_info_value, new_table_name.clone())
1944                .await
1945                .is_err()
1946        );
1947
1948        let old_table_name = TableNameKey::new(
1949            &table_info.catalog_name,
1950            &table_info.schema_name,
1951            &table_info.name,
1952        );
1953        let new_table_name = TableNameKey::new(
1954            &table_info.catalog_name,
1955            &table_info.schema_name,
1956            &new_table_name,
1957        );
1958
1959        assert!(
1960            table_metadata_manager
1961                .table_name_manager()
1962                .get(old_table_name)
1963                .await
1964                .unwrap()
1965                .is_none()
1966        );
1967
1968        assert_eq!(
1969            table_metadata_manager
1970                .table_name_manager()
1971                .get(new_table_name)
1972                .await
1973                .unwrap()
1974                .unwrap()
1975                .table_id(),
1976            table_id
1977        );
1978    }
1979
1980    #[tokio::test]
1981    async fn test_update_table_info() {
1982        let mem_kv = Arc::new(MemoryKvBackend::default());
1983        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1984        let region_route = new_test_region_route();
1985        let region_routes = vec![region_route.clone()];
1986        let table_info = new_test_table_info();
1987        let table_id = table_info.ident.table_id;
1988        // creates metadata.
1989        create_physical_table_metadata(
1990            &table_metadata_manager,
1991            table_info.clone(),
1992            region_routes.clone(),
1993            HashMap::new(),
1994        )
1995        .await
1996        .unwrap();
1997
1998        let mut new_table_info = table_info.clone();
1999        new_table_info.name = "hi".to_string();
2000        let current_table_info_value =
2001            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2002        // should be ok.
2003        table_metadata_manager
2004            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2005            .await
2006            .unwrap();
2007        // if table info was updated, it should be ok.
2008        table_metadata_manager
2009            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2010            .await
2011            .unwrap();
2012
2013        // updated table_info should equal the `new_table_info`
2014        let updated_table_info = table_metadata_manager
2015            .table_info_manager()
2016            .get(table_id)
2017            .await
2018            .unwrap()
2019            .unwrap()
2020            .into_inner();
2021        assert_eq!(updated_table_info.table_info, new_table_info);
2022
2023        let mut wrong_table_info = table_info.clone();
2024        wrong_table_info.name = "wrong".to_string();
2025        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2026            current_table_info_value.update(wrong_table_info),
2027        );
2028        // if the current_table_info_value is wrong, it should return an error.
2029        // The ABA problem.
2030        assert!(
2031            table_metadata_manager
2032                .update_table_info(&wrong_table_info_value, None, new_table_info)
2033                .await
2034                .is_err()
2035        )
2036    }
2037
2038    #[tokio::test]
2039    async fn test_update_table_leader_region_status() {
2040        let mem_kv = Arc::new(MemoryKvBackend::default());
2041        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2042        let datanode = 1;
2043        let region_routes = vec![
2044            RegionRoute {
2045                region: Region {
2046                    id: 1.into(),
2047                    name: "r1".to_string(),
2048                    partition: None,
2049                    attrs: BTreeMap::new(),
2050                    partition_expr: Default::default(),
2051                },
2052                leader_peer: Some(Peer::new(datanode, "a2")),
2053                leader_state: Some(LeaderState::Downgrading),
2054                follower_peers: vec![],
2055                leader_down_since: Some(current_time_millis()),
2056            },
2057            RegionRoute {
2058                region: Region {
2059                    id: 2.into(),
2060                    name: "r2".to_string(),
2061                    partition: None,
2062                    attrs: BTreeMap::new(),
2063                    partition_expr: Default::default(),
2064                },
2065                leader_peer: Some(Peer::new(datanode, "a1")),
2066                leader_state: None,
2067                follower_peers: vec![],
2068                leader_down_since: None,
2069            },
2070        ];
2071        let table_info = new_test_table_info();
2072        let table_id = table_info.ident.table_id;
2073        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2074            TableRouteValue::physical(region_routes.clone()),
2075        );
2076
2077        // creates metadata.
2078        create_physical_table_metadata(
2079            &table_metadata_manager,
2080            table_info.clone(),
2081            region_routes.clone(),
2082            HashMap::new(),
2083        )
2084        .await
2085        .unwrap();
2086
2087        table_metadata_manager
2088            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2089                if region_route.leader_state.is_some() {
2090                    None
2091                } else {
2092                    Some(Some(LeaderState::Downgrading))
2093                }
2094            })
2095            .await
2096            .unwrap();
2097
2098        let updated_route_value = table_metadata_manager
2099            .table_route_manager()
2100            .table_route_storage()
2101            .get(table_id)
2102            .await
2103            .unwrap()
2104            .unwrap();
2105
2106        assert_eq!(
2107            updated_route_value.region_routes().unwrap()[0].leader_state,
2108            Some(LeaderState::Downgrading)
2109        );
2110
2111        assert!(
2112            updated_route_value.region_routes().unwrap()[0]
2113                .leader_down_since
2114                .is_some()
2115        );
2116
2117        assert_eq!(
2118            updated_route_value.region_routes().unwrap()[1].leader_state,
2119            Some(LeaderState::Downgrading)
2120        );
2121        assert!(
2122            updated_route_value.region_routes().unwrap()[1]
2123                .leader_down_since
2124                .is_some()
2125        );
2126    }
2127
2128    async fn assert_datanode_table(
2129        table_metadata_manager: &TableMetadataManager,
2130        table_id: u32,
2131        region_routes: &[RegionRoute],
2132    ) {
2133        let region_distribution = region_distribution(region_routes);
2134        for (datanode, regions) in region_distribution {
2135            let got = table_metadata_manager
2136                .datanode_table_manager()
2137                .get(&DatanodeTableKey::new(datanode, table_id))
2138                .await
2139                .unwrap()
2140                .unwrap();
2141
2142            assert_eq!(got.regions, regions.leader_regions);
2143            assert_eq!(got.follower_regions, regions.follower_regions);
2144        }
2145    }
2146
2147    #[tokio::test]
2148    async fn test_update_table_route() {
2149        let mem_kv = Arc::new(MemoryKvBackend::default());
2150        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2151        let region_route = new_test_region_route();
2152        let region_routes = vec![region_route.clone()];
2153        let table_info = new_test_table_info();
2154        let table_id = table_info.ident.table_id;
2155        let engine = table_info.meta.engine.as_str();
2156        let region_storage_path =
2157            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2158        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2159            TableRouteValue::physical(region_routes.clone()),
2160        );
2161
2162        // creates metadata.
2163        create_physical_table_metadata(
2164            &table_metadata_manager,
2165            table_info.clone(),
2166            region_routes.clone(),
2167            HashMap::new(),
2168        )
2169        .await
2170        .unwrap();
2171
2172        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2173        let new_region_routes = vec![
2174            new_region_route(1, 1),
2175            new_region_route(2, 2),
2176            new_region_route(3, 3),
2177        ];
2178        // it should be ok.
2179        table_metadata_manager
2180            .update_table_route(
2181                table_id,
2182                RegionInfo {
2183                    engine: engine.to_string(),
2184                    region_storage_path: region_storage_path.clone(),
2185                    region_options: HashMap::new(),
2186                    region_wal_options: HashMap::new(),
2187                },
2188                &current_table_route_value,
2189                new_region_routes.clone(),
2190                &HashMap::new(),
2191                &HashMap::new(),
2192            )
2193            .await
2194            .unwrap();
2195        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2196
2197        // if the table route was updated. it should be ok.
2198        table_metadata_manager
2199            .update_table_route(
2200                table_id,
2201                RegionInfo {
2202                    engine: engine.to_string(),
2203                    region_storage_path: region_storage_path.clone(),
2204                    region_options: HashMap::new(),
2205                    region_wal_options: HashMap::new(),
2206                },
2207                &current_table_route_value,
2208                new_region_routes.clone(),
2209                &HashMap::new(),
2210                &HashMap::new(),
2211            )
2212            .await
2213            .unwrap();
2214
2215        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2216            current_table_route_value
2217                .inner
2218                .update(new_region_routes.clone())
2219                .unwrap(),
2220        );
2221        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2222        // it should be ok.
2223        table_metadata_manager
2224            .update_table_route(
2225                table_id,
2226                RegionInfo {
2227                    engine: engine.to_string(),
2228                    region_storage_path: region_storage_path.clone(),
2229                    region_options: HashMap::new(),
2230                    region_wal_options: HashMap::new(),
2231                },
2232                &current_table_route_value,
2233                new_region_routes.clone(),
2234                &HashMap::new(),
2235                &HashMap::new(),
2236            )
2237            .await
2238            .unwrap();
2239        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2240
2241        // if the current_table_route_value is wrong, it should return an error.
2242        // The ABA problem.
2243        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2244            current_table_route_value
2245                .update(vec![
2246                    new_region_route(1, 1),
2247                    new_region_route(2, 2),
2248                    new_region_route(3, 3),
2249                    new_region_route(4, 4),
2250                ])
2251                .unwrap(),
2252        );
2253        assert!(
2254            table_metadata_manager
2255                .update_table_route(
2256                    table_id,
2257                    RegionInfo {
2258                        engine: engine.to_string(),
2259                        region_storage_path: region_storage_path.clone(),
2260                        region_options: HashMap::new(),
2261                        region_wal_options: HashMap::new(),
2262                    },
2263                    &wrong_table_route_value,
2264                    new_region_routes,
2265                    &HashMap::new(),
2266                    &HashMap::new(),
2267                )
2268                .await
2269                .is_err()
2270        );
2271    }
2272
2273    #[tokio::test]
2274    async fn test_update_table_route_with_topic_region_mapping() {
2275        let mem_kv = Arc::new(MemoryKvBackend::default());
2276        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2277        let region_route = new_test_region_route();
2278        let region_routes = vec![region_route.clone()];
2279        let table_info = new_test_table_info();
2280        let table_id = table_info.ident.table_id;
2281        let engine = table_info.meta.engine.as_str();
2282        let region_storage_path =
2283            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2284
2285        // Create initial metadata with Kafka WAL options
2286        let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2287            (
2288                1,
2289                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2290                    topic: "topic_1".to_string(),
2291                }))
2292                .unwrap(),
2293            ),
2294            (
2295                2,
2296                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2297                    topic: "topic_2".to_string(),
2298                }))
2299                .unwrap(),
2300            ),
2301        ]
2302        .into_iter()
2303        .collect();
2304
2305        create_physical_table_metadata(
2306            &table_metadata_manager,
2307            table_info.clone(),
2308            region_routes.clone(),
2309            old_region_wal_options.clone(),
2310        )
2311        .await
2312        .unwrap();
2313
2314        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2315            TableRouteValue::physical(region_routes.clone()),
2316        );
2317
2318        // Verify initial topic region mappings exist
2319        let region_id_1 = RegionId::new(table_id, 1);
2320        let region_id_2 = RegionId::new(table_id, 2);
2321        let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2322        let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2323        assert!(
2324            table_metadata_manager
2325                .topic_region_manager
2326                .get(topic_1_key.clone())
2327                .await
2328                .unwrap()
2329                .is_some()
2330        );
2331        assert!(
2332            table_metadata_manager
2333                .topic_region_manager
2334                .get(topic_2_key.clone())
2335                .await
2336                .unwrap()
2337                .is_some()
2338        );
2339
2340        // Test 1: Add new region with new topic
2341        let new_region_routes = vec![
2342            new_region_route(1, 1),
2343            new_region_route(2, 2),
2344            new_region_route(3, 3), // New region
2345        ];
2346        let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2347            (
2348                1,
2349                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2350                    topic: "topic_1".to_string(), // Unchanged
2351                }))
2352                .unwrap(),
2353            ),
2354            (
2355                2,
2356                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2357                    topic: "topic_2".to_string(), // Unchanged
2358                }))
2359                .unwrap(),
2360            ),
2361            (
2362                3,
2363                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2364                    topic: "topic_3".to_string(), // New topic
2365                }))
2366                .unwrap(),
2367            ),
2368        ]
2369        .into_iter()
2370        .collect();
2371        let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2372            current_table_route_value
2373                .inner
2374                .update(new_region_routes.clone())
2375                .unwrap(),
2376        );
2377        table_metadata_manager
2378            .update_table_route(
2379                table_id,
2380                RegionInfo {
2381                    engine: engine.to_string(),
2382                    region_storage_path: region_storage_path.clone(),
2383                    region_options: HashMap::new(),
2384                    region_wal_options: old_region_wal_options.clone(),
2385                },
2386                &current_table_route_value,
2387                new_region_routes.clone(),
2388                &HashMap::new(),
2389                &new_region_wal_options,
2390            )
2391            .await
2392            .unwrap();
2393        // Verify new topic region mapping was created
2394        let region_id_3 = RegionId::new(table_id, 3);
2395        let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2396        assert!(
2397            table_metadata_manager
2398                .topic_region_manager
2399                .get(topic_3_key)
2400                .await
2401                .unwrap()
2402                .is_some()
2403        );
2404        // Test 2: Remove a region and change topic for another
2405        let newer_region_routes = vec![
2406            new_region_route(1, 1),
2407            // Region 2 removed
2408            // Region 3 now has different topic
2409        ];
2410        let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2411            (
2412                1,
2413                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2414                    topic: "topic_1".to_string(), // Unchanged
2415                }))
2416                .unwrap(),
2417            ),
2418            (
2419                3,
2420                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2421                    topic: "topic_3_new".to_string(), // Changed topic
2422                }))
2423                .unwrap(),
2424            ),
2425        ]
2426        .into_iter()
2427        .collect();
2428        table_metadata_manager
2429            .update_table_route(
2430                table_id,
2431                RegionInfo {
2432                    engine: engine.to_string(),
2433                    region_storage_path: region_storage_path.clone(),
2434                    region_options: HashMap::new(),
2435                    region_wal_options: new_region_wal_options.clone(),
2436                },
2437                &current_table_route_value_updated,
2438                newer_region_routes.clone(),
2439                &HashMap::new(),
2440                &newer_region_wal_options,
2441            )
2442            .await
2443            .unwrap();
2444        // Verify region 2 mapping was deleted
2445        let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2446        assert!(
2447            table_metadata_manager
2448                .topic_region_manager
2449                .get(topic_2_key_new)
2450                .await
2451                .unwrap()
2452                .is_none()
2453        );
2454        // Verify region 3 old topic mapping was deleted
2455        let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2456        assert!(
2457            table_metadata_manager
2458                .topic_region_manager
2459                .get(topic_3_key_old)
2460                .await
2461                .unwrap()
2462                .is_none()
2463        );
2464        // Verify region 3 new topic mapping was created
2465        let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2466        assert!(
2467            table_metadata_manager
2468                .topic_region_manager
2469                .get(topic_3_key_new)
2470                .await
2471                .unwrap()
2472                .is_some()
2473        );
2474        // Verify region 1 mapping still exists (unchanged)
2475        assert!(
2476            table_metadata_manager
2477                .topic_region_manager
2478                .get(topic_1_key)
2479                .await
2480                .unwrap()
2481                .is_some()
2482        );
2483    }
2484
2485    #[tokio::test]
2486    async fn test_destroy_table_metadata() {
2487        let mem_kv = Arc::new(MemoryKvBackend::default());
2488        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2489        let table_id = 1025;
2490        let table_name = "foo";
2491        let task = test_create_table_task(table_name, table_id);
2492        let options = create_mock_region_wal_options();
2493        let serialized_options = options
2494            .iter()
2495            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2496            .collect::<HashMap<_, _>>();
2497        table_metadata_manager
2498            .create_table_metadata(
2499                task.table_info,
2500                TableRouteValue::physical(vec![
2501                    RegionRoute {
2502                        region: Region::new_test(RegionId::new(table_id, 1)),
2503                        leader_peer: Some(Peer::empty(1)),
2504                        follower_peers: vec![Peer::empty(5)],
2505                        leader_state: None,
2506                        leader_down_since: None,
2507                    },
2508                    RegionRoute {
2509                        region: Region::new_test(RegionId::new(table_id, 2)),
2510                        leader_peer: Some(Peer::empty(2)),
2511                        follower_peers: vec![Peer::empty(4)],
2512                        leader_state: None,
2513                        leader_down_since: None,
2514                    },
2515                    RegionRoute {
2516                        region: Region::new_test(RegionId::new(table_id, 3)),
2517                        leader_peer: Some(Peer::empty(3)),
2518                        follower_peers: vec![],
2519                        leader_state: None,
2520                        leader_down_since: None,
2521                    },
2522                ]),
2523                serialized_options,
2524            )
2525            .await
2526            .unwrap();
2527        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2528        let table_route_value = table_metadata_manager
2529            .table_route_manager
2530            .table_route_storage()
2531            .get_with_raw_bytes(table_id)
2532            .await
2533            .unwrap()
2534            .unwrap();
2535        table_metadata_manager
2536            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2537            .await
2538            .unwrap();
2539        assert!(mem_kv.is_empty());
2540    }
2541
2542    #[tokio::test]
2543    async fn test_restore_table_metadata() {
2544        let mem_kv = Arc::new(MemoryKvBackend::default());
2545        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2546        let table_id = 1025;
2547        let table_name = "foo";
2548        let task = test_create_table_task(table_name, table_id);
2549        let options = create_mock_region_wal_options();
2550        let serialized_options = options
2551            .iter()
2552            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2553            .collect::<HashMap<_, _>>();
2554        table_metadata_manager
2555            .create_table_metadata(
2556                task.table_info,
2557                TableRouteValue::physical(vec![
2558                    RegionRoute {
2559                        region: Region::new_test(RegionId::new(table_id, 1)),
2560                        leader_peer: Some(Peer::empty(1)),
2561                        follower_peers: vec![Peer::empty(5)],
2562                        leader_state: None,
2563                        leader_down_since: None,
2564                    },
2565                    RegionRoute {
2566                        region: Region::new_test(RegionId::new(table_id, 2)),
2567                        leader_peer: Some(Peer::empty(2)),
2568                        follower_peers: vec![Peer::empty(4)],
2569                        leader_state: None,
2570                        leader_down_since: None,
2571                    },
2572                    RegionRoute {
2573                        region: Region::new_test(RegionId::new(table_id, 3)),
2574                        leader_peer: Some(Peer::empty(3)),
2575                        follower_peers: vec![],
2576                        leader_state: None,
2577                        leader_down_since: None,
2578                    },
2579                ]),
2580                serialized_options,
2581            )
2582            .await
2583            .unwrap();
2584        let expected_result = mem_kv.dump();
2585        let table_route_value = table_metadata_manager
2586            .table_route_manager
2587            .table_route_storage()
2588            .get_with_raw_bytes(table_id)
2589            .await
2590            .unwrap()
2591            .unwrap();
2592        let region_routes = table_route_value.region_routes().unwrap();
2593        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2594        let table_route_value = TableRouteValue::physical(region_routes.clone());
2595        table_metadata_manager
2596            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2597            .await
2598            .unwrap();
2599        table_metadata_manager
2600            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2601            .await
2602            .unwrap();
2603        let kvs = mem_kv.dump();
2604        assert_eq!(kvs, expected_result);
2605        // Should be ignored.
2606        table_metadata_manager
2607            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2608            .await
2609            .unwrap();
2610        let kvs = mem_kv.dump();
2611        assert_eq!(kvs, expected_result);
2612    }
2613
2614    #[tokio::test]
2615    async fn test_create_update_view_info() {
2616        let mem_kv = Arc::new(MemoryKvBackend::default());
2617        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2618
2619        let view_info = new_test_table_info();
2620
2621        let view_id = view_info.ident.table_id;
2622
2623        let logical_plan: Vec<u8> = vec![1, 2, 3];
2624        let columns = vec!["a".to_string()];
2625        let plan_columns = vec!["number".to_string()];
2626        let table_names = new_test_table_names();
2627        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2628
2629        // Create metadata
2630        table_metadata_manager
2631            .create_view_metadata(
2632                view_info.clone(),
2633                logical_plan.clone(),
2634                table_names.clone(),
2635                columns.clone(),
2636                plan_columns.clone(),
2637                definition.to_string(),
2638            )
2639            .await
2640            .unwrap();
2641
2642        {
2643            // assert view info
2644            let current_view_info = table_metadata_manager
2645                .view_info_manager()
2646                .get(view_id)
2647                .await
2648                .unwrap()
2649                .unwrap()
2650                .into_inner();
2651            assert_eq!(current_view_info.view_info, logical_plan);
2652            assert_eq!(current_view_info.table_names, table_names);
2653            assert_eq!(current_view_info.definition, definition);
2654            assert_eq!(current_view_info.columns, columns);
2655            assert_eq!(current_view_info.plan_columns, plan_columns);
2656            // assert table info
2657            let current_table_info = table_metadata_manager
2658                .table_info_manager()
2659                .get(view_id)
2660                .await
2661                .unwrap()
2662                .unwrap()
2663                .into_inner();
2664            assert_eq!(current_table_info.table_info, view_info);
2665        }
2666
2667        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2668        let new_table_names = {
2669            let mut set = HashSet::new();
2670            set.insert(TableName {
2671                catalog_name: "greptime".to_string(),
2672                schema_name: "public".to_string(),
2673                table_name: "b_table".to_string(),
2674            });
2675            set.insert(TableName {
2676                catalog_name: "greptime".to_string(),
2677                schema_name: "public".to_string(),
2678                table_name: "c_table".to_string(),
2679            });
2680            set
2681        };
2682        let new_columns = vec!["b".to_string()];
2683        let new_plan_columns = vec!["number2".to_string()];
2684        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2685
2686        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2687            logical_plan.clone(),
2688            table_names,
2689            columns,
2690            plan_columns,
2691            definition.to_string(),
2692        ));
2693        // should be ok.
2694        table_metadata_manager
2695            .update_view_info(
2696                view_id,
2697                &current_view_info_value,
2698                new_logical_plan.clone(),
2699                new_table_names.clone(),
2700                new_columns.clone(),
2701                new_plan_columns.clone(),
2702                new_definition.to_string(),
2703            )
2704            .await
2705            .unwrap();
2706        // if table info was updated, it should be ok.
2707        table_metadata_manager
2708            .update_view_info(
2709                view_id,
2710                &current_view_info_value,
2711                new_logical_plan.clone(),
2712                new_table_names.clone(),
2713                new_columns.clone(),
2714                new_plan_columns.clone(),
2715                new_definition.to_string(),
2716            )
2717            .await
2718            .unwrap();
2719
2720        // updated view_info should equal the `new_logical_plan`
2721        let updated_view_info = table_metadata_manager
2722            .view_info_manager()
2723            .get(view_id)
2724            .await
2725            .unwrap()
2726            .unwrap()
2727            .into_inner();
2728        assert_eq!(updated_view_info.view_info, new_logical_plan);
2729        assert_eq!(updated_view_info.table_names, new_table_names);
2730        assert_eq!(updated_view_info.definition, new_definition);
2731        assert_eq!(updated_view_info.columns, new_columns);
2732        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2733
2734        let wrong_view_info = logical_plan.clone();
2735        let wrong_definition = "wrong_definition";
2736        let wrong_view_info_value =
2737            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2738                wrong_view_info,
2739                new_table_names.clone(),
2740                new_columns.clone(),
2741                new_plan_columns.clone(),
2742                wrong_definition.to_string(),
2743            ));
2744        // if the current_view_info_value is wrong, it should return an error.
2745        // The ABA problem.
2746        assert!(
2747            table_metadata_manager
2748                .update_view_info(
2749                    view_id,
2750                    &wrong_view_info_value,
2751                    new_logical_plan.clone(),
2752                    new_table_names.clone(),
2753                    vec!["c".to_string()],
2754                    vec!["number3".to_string()],
2755                    wrong_definition.to_string(),
2756                )
2757                .await
2758                .is_err()
2759        );
2760
2761        // The view_info is not changed.
2762        let current_view_info = table_metadata_manager
2763            .view_info_manager()
2764            .get(view_id)
2765            .await
2766            .unwrap()
2767            .unwrap()
2768            .into_inner();
2769        assert_eq!(current_view_info.view_info, new_logical_plan);
2770        assert_eq!(current_view_info.table_names, new_table_names);
2771        assert_eq!(current_view_info.definition, new_definition);
2772        assert_eq!(current_view_info.columns, new_columns);
2773        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2774    }
2775
2776    #[test]
2777    fn test_region_role_set_deserialize() {
2778        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2779        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2780        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2781        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2782
2783        let s = r#"[1, 2, 3]"#;
2784        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2785        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2786        assert!(region_role_set.follower_regions.is_empty());
2787    }
2788
2789    #[test]
2790    fn test_region_distribution_deserialize() {
2791        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2792        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2793        assert_eq!(region_distribution.len(), 2);
2794        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2795        assert!(region_distribution[&1].follower_regions.is_empty());
2796        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2797        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2798    }
2799}