common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{TableId, TableInfo};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186// The legacy topic key prefix is used to store the topic name in previous versions.
187pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190/// The election key.
191pub const ELECTION_KEY: &str = "__metasrv_election";
192/// The root key of metasrv election candidates.
193pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195/// The keys with these prefixes will be loaded into the cache when the leader starts.
196pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197    TABLE_NAME_KEY_PREFIX,
198    CATALOG_NAME_KEY_PREFIX,
199    SCHEMA_NAME_KEY_PREFIX,
200    TABLE_ROUTE_PREFIX,
201    NODE_ADDRESS_PREFIX,
202];
203
204/// A set of regions with the same role.
205#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207    /// Leader regions.
208    pub leader_regions: Vec<RegionNumber>,
209    /// Follower regions.
210    pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215    where
216        D: serde::Deserializer<'de>,
217    {
218        #[derive(Deserialize)]
219        #[serde(untagged)]
220        enum RegionRoleSetOrLeaderOnly {
221            Full {
222                leader_regions: Vec<RegionNumber>,
223                follower_regions: Vec<RegionNumber>,
224            },
225            LeaderOnly(Vec<RegionNumber>),
226        }
227        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228            RegionRoleSetOrLeaderOnly::Full {
229                leader_regions,
230                follower_regions,
231            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233                Ok(RegionRoleSet::new(leader_regions, vec![]))
234            }
235        }
236    }
237}
238
239impl RegionRoleSet {
240    /// Create a new region role set.
241    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242        Self {
243            leader_regions,
244            follower_regions,
245        }
246    }
247
248    /// Add a leader region to the set.
249    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250        self.leader_regions.push(region_number);
251    }
252
253    /// Add a follower region to the set.
254    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255        self.follower_regions.push(region_number);
256    }
257
258    /// Sort the regions.
259    pub fn sort(&mut self) {
260        self.follower_regions.sort();
261        self.leader_regions.sort();
262    }
263}
264
265/// The distribution of regions.
266///
267/// The key is the datanode id, the value is the region role set.
268pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270/// The id of flow.
271pub type FlowId = u32;
272/// The partition of flow.
273pub type FlowPartitionId = u32;
274
275lazy_static! {
276    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280    static ref TABLE_INFO_KEY_PATTERN: Regex =
281        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285    static ref VIEW_INFO_KEY_PATTERN: Regex =
286        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307    ))
308    .unwrap();
309}
310
311lazy_static! {
312    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
313    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315    ))
316        .unwrap();
317}
318
319lazy_static! {
320    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
321    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323    ))
324        .unwrap();
325}
326
327lazy_static! {
328    static ref NODE_ADDRESS_PATTERN: Regex =
329        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340    ))
341    .unwrap();
342}
343
344/// The key of metadata.
345pub trait MetadataKey<'a, T> {
346    fn to_bytes(&self) -> Vec<u8>;
347
348    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355    fn from(value: Vec<u8>) -> Self {
356        Self(value)
357    }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361    fn to_bytes(&self) -> Vec<u8> {
362        self.0.clone()
363    }
364
365    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366        Ok(BytesAdapter(bytes.to_vec()))
367    }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371    fn build_get_op(
372        &self,
373    ) -> (
374        TxnOp,
375        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376    );
377}
378
379pub trait MetadataValue {
380    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381    where
382        Self: Sized;
383
384    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390    table_name_manager: TableNameManager,
391    table_info_manager: TableInfoManager,
392    view_info_manager: ViewInfoManager,
393    datanode_table_manager: DatanodeTableManager,
394    catalog_manager: CatalogManager,
395    schema_manager: SchemaManager,
396    table_route_manager: TableRouteManager,
397    table_repart_manager: TableRepartManager,
398    tombstone_manager: TombstoneManager,
399    topic_name_manager: TopicNameManager,
400    topic_region_manager: TopicRegionManager,
401    kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406    ($got:expr, $expected_value:expr, $name:expr) => {
407        ensure!(
408            $got == $expected_value,
409            error::UnexpectedSnafu {
410                err_msg: format!(
411                    "Reads the different value: {:?} during {}, expected: {:?}",
412                    $got, $name, $expected_value
413                )
414            }
415        );
416    };
417}
418
419/// A struct containing a deserialized value(`inner`) and an original bytes.
420///
421/// - Serialize behaviors:
422///
423/// The `inner` field will be ignored.
424///
425/// - Deserialize behaviors:
426///
427/// The `inner` field will be deserialized from the `bytes` field.
428pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429    // The original bytes of the inner.
430    bytes: Bytes,
431    // The value was deserialized from the original bytes.
432    inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436    type Target = T;
437
438    fn deref(&self) -> &Self::Target {
439        &self.inner
440    }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444    fn deref_mut(&mut self) -> &mut Self::Target {
445        &mut self.inner
446    }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454            self.inner, self.bytes
455        )
456    }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460    /// - Serialize behaviors:
461    ///
462    /// The `inner` field will be ignored.
463    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464    where
465        S: serde::Serializer,
466    {
467        // Safety: The original bytes are always JSON encoded.
468        // It's more efficiently than `serialize_bytes`.
469        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470    }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474    for DeserializedValueWithBytes<T>
475{
476    /// - Deserialize behaviors:
477    ///
478    /// The `inner` field will be deserialized from the `bytes` field.
479    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480    where
481        D: serde::Deserializer<'de>,
482    {
483        let buf = String::deserialize(deserializer)?;
484        let bytes = Bytes::from(buf);
485
486        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489        Ok(value)
490    }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494    fn clone(&self) -> Self {
495        Self {
496            bytes: self.bytes.clone(),
497            inner: self.inner.clone(),
498        }
499    }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503    /// Returns a struct containing a deserialized value and an original `bytes`.
504    /// It accepts original bytes of inner.
505    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506        let inner = T::try_from_raw_value(&bytes)?;
507        Ok(Self { bytes, inner })
508    }
509
510    /// Returns a struct containing a deserialized value and an original `bytes`.
511    /// It accepts original bytes of inner.
512    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514    }
515
516    pub fn into_inner(self) -> T {
517        self.inner
518    }
519
520    pub fn get_inner_ref(&self) -> &T {
521        &self.inner
522    }
523
524    /// Returns original `bytes`
525    pub fn get_raw_bytes(&self) -> Vec<u8> {
526        self.bytes.to_vec()
527    }
528
529    #[cfg(any(test, feature = "testing"))]
530    pub fn from_inner(inner: T) -> Self {
531        let bytes = serde_json::to_vec(&inner).unwrap();
532
533        Self {
534            bytes: Bytes::from(bytes),
535            inner,
536        }
537    }
538}
539
540impl TableMetadataManager {
541    pub fn new(kv_backend: KvBackendRef) -> Self {
542        TableMetadataManager {
543            table_name_manager: TableNameManager::new(kv_backend.clone()),
544            table_info_manager: TableInfoManager::new(kv_backend.clone()),
545            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547            catalog_manager: CatalogManager::new(kv_backend.clone()),
548            schema_manager: SchemaManager::new(kv_backend.clone()),
549            table_route_manager: TableRouteManager::new(kv_backend.clone()),
550            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554            kv_backend,
555        }
556    }
557
558    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
559    pub fn new_with_custom_tombstone_prefix(
560        kv_backend: KvBackendRef,
561        tombstone_prefix: &str,
562    ) -> Self {
563        Self {
564            table_name_manager: TableNameManager::new(kv_backend.clone()),
565            table_info_manager: TableInfoManager::new(kv_backend.clone()),
566            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568            catalog_manager: CatalogManager::new(kv_backend.clone()),
569            schema_manager: SchemaManager::new(kv_backend.clone()),
570            table_route_manager: TableRouteManager::new(kv_backend.clone()),
571            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572            tombstone_manager: TombstoneManager::new_with_prefix(
573                kv_backend.clone(),
574                tombstone_prefix,
575            ),
576            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578            kv_backend,
579        }
580    }
581
582    pub async fn init(&self) -> Result<()> {
583        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585        self.catalog_manager().create(catalog_name, true).await?;
586
587        let internal_schemas = [
588            DEFAULT_SCHEMA_NAME,
589            INFORMATION_SCHEMA_NAME,
590            DEFAULT_PRIVATE_SCHEMA_NAME,
591        ];
592
593        for schema_name in internal_schemas {
594            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596            self.schema_manager().create(schema_key, None, true).await?;
597        }
598
599        Ok(())
600    }
601
602    pub fn table_name_manager(&self) -> &TableNameManager {
603        &self.table_name_manager
604    }
605
606    pub fn table_info_manager(&self) -> &TableInfoManager {
607        &self.table_info_manager
608    }
609
610    pub fn view_info_manager(&self) -> &ViewInfoManager {
611        &self.view_info_manager
612    }
613
614    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615        &self.datanode_table_manager
616    }
617
618    pub fn catalog_manager(&self) -> &CatalogManager {
619        &self.catalog_manager
620    }
621
622    pub fn schema_manager(&self) -> &SchemaManager {
623        &self.schema_manager
624    }
625
626    pub fn table_route_manager(&self) -> &TableRouteManager {
627        &self.table_route_manager
628    }
629
630    pub fn table_repart_manager(&self) -> &TableRepartManager {
631        &self.table_repart_manager
632    }
633
634    pub fn topic_name_manager(&self) -> &TopicNameManager {
635        &self.topic_name_manager
636    }
637
638    pub fn topic_region_manager(&self) -> &TopicRegionManager {
639        &self.topic_region_manager
640    }
641
642    pub fn kv_backend(&self) -> &KvBackendRef {
643        &self.kv_backend
644    }
645
646    pub async fn get_full_table_info(
647        &self,
648        table_id: TableId,
649    ) -> Result<(
650        Option<DeserializedValueWithBytes<TableInfoValue>>,
651        Option<DeserializedValueWithBytes<TableRouteValue>>,
652    )> {
653        let table_info_key = TableInfoKey::new(table_id);
654        let table_route_key = TableRouteKey::new(table_id);
655        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659        let mut res = self.kv_backend.txn(txn).await?;
660        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662        let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663        if let Some(table_route_value) = &mut table_route_value {
664            self.table_route_manager()
665                .table_route_storage()
666                .remap_route_address(table_route_value)
667                .await?;
668        }
669        Ok((table_info_value, table_route_value))
670    }
671
672    /// Creates metadata for view and returns an error if different metadata exists.
673    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
674    /// Parameters include:
675    /// - `view_info`: the encoded logical plan
676    /// - `table_names`: the resolved fully table names in logical plan
677    /// - `columns`: the view columns
678    /// - `plan_columns`: the original plan columns
679    /// - `definition`: The SQL to create the view
680    ///
681    pub async fn create_view_metadata(
682        &self,
683        view_info: TableInfo,
684        raw_logical_plan: Vec<u8>,
685        table_names: HashSet<TableName>,
686        columns: Vec<String>,
687        plan_columns: Vec<String>,
688        definition: String,
689    ) -> Result<()> {
690        let view_id = view_info.ident.table_id;
691
692        // Creates view name.
693        let view_name = TableNameKey::new(
694            &view_info.catalog_name,
695            &view_info.schema_name,
696            &view_info.name,
697        );
698        let create_table_name_txn = self
699            .table_name_manager()
700            .build_create_txn(&view_name, view_id)?;
701
702        // Creates table info.
703        let table_info_value = TableInfoValue::new(view_info);
704
705        let (create_table_info_txn, on_create_table_info_failure) = self
706            .table_info_manager()
707            .build_create_txn(view_id, &table_info_value)?;
708
709        // Creates view info
710        let view_info_value = ViewInfoValue::new(
711            raw_logical_plan,
712            table_names,
713            columns,
714            plan_columns,
715            definition,
716        );
717        let (create_view_info_txn, on_create_view_info_failure) = self
718            .view_info_manager()
719            .build_create_txn(view_id, &view_info_value)?;
720
721        let txn = Txn::merge_all(vec![
722            create_table_name_txn,
723            create_table_info_txn,
724            create_view_info_txn,
725        ]);
726
727        let mut r = self.kv_backend.txn(txn).await?;
728
729        // Checks whether metadata was already created.
730        if !r.succeeded {
731            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
732            let remote_table_info = on_create_table_info_failure(&mut set)?
733                .context(error::UnexpectedSnafu {
734                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
735                })?
736                .into_inner();
737
738            let remote_view_info = on_create_view_info_failure(&mut set)?
739                .context(error::UnexpectedSnafu {
740                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
741                })?
742                .into_inner();
743
744            let op_name = "the creating view metadata";
745            ensure_values!(remote_table_info, table_info_value, op_name);
746            ensure_values!(remote_view_info, view_info_value, op_name);
747        }
748
749        Ok(())
750    }
751
752    /// Creates metadata for table and returns an error if different metadata exists.
753    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
754    pub async fn create_table_metadata(
755        &self,
756        table_info: TableInfo,
757        table_route_value: TableRouteValue,
758        region_wal_options: HashMap<RegionNumber, String>,
759    ) -> Result<()> {
760        let table_id = table_info.ident.table_id;
761        let engine = table_info.meta.engine.clone();
762
763        // Creates table name.
764        let table_name = TableNameKey::new(
765            &table_info.catalog_name,
766            &table_info.schema_name,
767            &table_info.name,
768        );
769        let create_table_name_txn = self
770            .table_name_manager()
771            .build_create_txn(&table_name, table_id)?;
772
773        let region_options = table_info.to_region_options();
774        // Creates table info.
775        let table_info_value = TableInfoValue::new(table_info);
776        let (create_table_info_txn, on_create_table_info_failure) = self
777            .table_info_manager()
778            .build_create_txn(table_id, &table_info_value)?;
779
780        let (create_table_route_txn, on_create_table_route_failure) = self
781            .table_route_manager()
782            .table_route_storage()
783            .build_create_txn(table_id, &table_route_value)?;
784
785        let create_topic_region_txn = self
786            .topic_region_manager
787            .build_create_txn(table_id, &region_wal_options)?;
788
789        let mut txn = Txn::merge_all(vec![
790            create_table_name_txn,
791            create_table_info_txn,
792            create_table_route_txn,
793            create_topic_region_txn,
794        ]);
795
796        if let TableRouteValue::Physical(x) = &table_route_value {
797            let region_storage_path = table_info_value.region_storage_path();
798            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
799                table_id,
800                &engine,
801                &region_storage_path,
802                region_options,
803                region_wal_options,
804                region_distribution(&x.region_routes),
805            )?;
806            txn = txn.merge(create_datanode_table_txn);
807        }
808
809        let mut r = self.kv_backend.txn(txn).await?;
810
811        // Checks whether metadata was already created.
812        if !r.succeeded {
813            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
814            let remote_table_info = on_create_table_info_failure(&mut set)?
815                .context(error::UnexpectedSnafu {
816                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
817                })?
818                .into_inner();
819
820            let remote_table_route = on_create_table_route_failure(&mut set)?
821                .context(error::UnexpectedSnafu {
822                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
823                })?
824                .into_inner();
825
826            let op_name = "the creating table metadata";
827            ensure_values!(remote_table_info, table_info_value, op_name);
828            ensure_values!(remote_table_route, table_route_value, op_name);
829        }
830
831        Ok(())
832    }
833
834    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
835        // The batch size is max_txn_size / 3 because the size of the `tables_data`
836        // is 3 times the size of the `tables_data`.
837        self.kv_backend.max_txn_ops() / 3
838    }
839
840    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
841    pub async fn create_logical_tables_metadata(
842        &self,
843        tables_data: Vec<(TableInfo, TableRouteValue)>,
844    ) -> Result<()> {
845        let len = tables_data.len();
846        let mut txns = Vec::with_capacity(3 * len);
847        struct OnFailure<F1, R1, F2, R2>
848        where
849            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
850            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
851        {
852            table_info_value: TableInfoValue,
853            on_create_table_info_failure: F1,
854            table_route_value: TableRouteValue,
855            on_create_table_route_failure: F2,
856        }
857        let mut on_failures = Vec::with_capacity(len);
858        for (table_info, table_route_value) in tables_data {
859            let table_id = table_info.ident.table_id;
860
861            // Creates table name.
862            let table_name = TableNameKey::new(
863                &table_info.catalog_name,
864                &table_info.schema_name,
865                &table_info.name,
866            );
867            let create_table_name_txn = self
868                .table_name_manager()
869                .build_create_txn(&table_name, table_id)?;
870            txns.push(create_table_name_txn);
871
872            // Creates table info.
873            let table_info_value = TableInfoValue::new(table_info);
874            let (create_table_info_txn, on_create_table_info_failure) =
875                self.table_info_manager()
876                    .build_create_txn(table_id, &table_info_value)?;
877            txns.push(create_table_info_txn);
878
879            let (create_table_route_txn, on_create_table_route_failure) = self
880                .table_route_manager()
881                .table_route_storage()
882                .build_create_txn(table_id, &table_route_value)?;
883            txns.push(create_table_route_txn);
884
885            on_failures.push(OnFailure {
886                table_info_value,
887                on_create_table_info_failure,
888                table_route_value,
889                on_create_table_route_failure,
890            });
891        }
892
893        let txn = Txn::merge_all(txns);
894        let mut r = self.kv_backend.txn(txn).await?;
895
896        // Checks whether metadata was already created.
897        if !r.succeeded {
898            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
899            for on_failure in on_failures {
900                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
901                    .context(error::UnexpectedSnafu {
902                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
903                    })?
904                    .into_inner();
905
906                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
907                    .context(error::UnexpectedSnafu {
908                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
909                    })?
910                    .into_inner();
911
912                let op_name = "the creating logical tables metadata";
913                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
914                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
915            }
916        }
917
918        Ok(())
919    }
920
921    fn table_metadata_keys(
922        &self,
923        table_id: TableId,
924        table_name: &TableName,
925        table_route_value: &TableRouteValue,
926        region_wal_options: &HashMap<RegionNumber, WalOptions>,
927    ) -> Result<Vec<Vec<u8>>> {
928        // Builds keys
929        let datanode_ids = if table_route_value.is_physical() {
930            region_distribution(table_route_value.region_routes()?)
931                .into_keys()
932                .collect()
933        } else {
934            vec![]
935        };
936        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
937        let table_name = TableNameKey::new(
938            &table_name.catalog_name,
939            &table_name.schema_name,
940            &table_name.table_name,
941        );
942        let table_info_key = TableInfoKey::new(table_id);
943        let table_route_key = TableRouteKey::new(table_id);
944        let table_repart_key = TableRepartKey::new(table_id);
945        let datanode_table_keys = datanode_ids
946            .into_iter()
947            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
948            .collect::<HashSet<_>>();
949        let topic_region_map = self
950            .topic_region_manager
951            .get_topic_region_mapping(table_id, region_wal_options);
952        let topic_region_keys = topic_region_map
953            .iter()
954            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
955            .collect::<Vec<_>>();
956        keys.push(table_name.to_bytes());
957        keys.push(table_info_key.to_bytes());
958        keys.push(table_route_key.to_bytes());
959        keys.push(table_repart_key.to_bytes());
960        for key in &datanode_table_keys {
961            keys.push(key.to_bytes());
962        }
963        for key in topic_region_keys {
964            keys.push(key.to_bytes());
965        }
966        Ok(keys)
967    }
968
969    /// Deletes metadata for table **logically**.
970    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
971    pub async fn delete_table_metadata(
972        &self,
973        table_id: TableId,
974        table_name: &TableName,
975        table_route_value: &TableRouteValue,
976        region_wal_options: &HashMap<RegionNumber, WalOptions>,
977    ) -> Result<()> {
978        let keys =
979            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
980        self.tombstone_manager.create(keys).await.map(|_| ())
981    }
982
983    /// Deletes metadata tombstone for table **permanently**.
984    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
985    pub async fn delete_table_metadata_tombstone(
986        &self,
987        table_id: TableId,
988        table_name: &TableName,
989        table_route_value: &TableRouteValue,
990        region_wal_options: &HashMap<RegionNumber, WalOptions>,
991    ) -> Result<()> {
992        let table_metadata_keys =
993            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
994        self.tombstone_manager
995            .delete(table_metadata_keys)
996            .await
997            .map(|_| ())
998    }
999
1000    /// Restores metadata for table.
1001    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1002    pub async fn restore_table_metadata(
1003        &self,
1004        table_id: TableId,
1005        table_name: &TableName,
1006        table_route_value: &TableRouteValue,
1007        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1008    ) -> Result<()> {
1009        let keys =
1010            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1011        self.tombstone_manager.restore(keys).await.map(|_| ())
1012    }
1013
1014    /// Deletes metadata for table **permanently**.
1015    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1016    pub async fn destroy_table_metadata(
1017        &self,
1018        table_id: TableId,
1019        table_name: &TableName,
1020        table_route_value: &TableRouteValue,
1021        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1022    ) -> Result<()> {
1023        let keys =
1024            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1025        let _ = self
1026            .kv_backend
1027            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1028            .await?;
1029        Ok(())
1030    }
1031
1032    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1033        let mut keys = Vec::with_capacity(3);
1034        let view_name = TableNameKey::new(
1035            &view_name.catalog_name,
1036            &view_name.schema_name,
1037            &view_name.table_name,
1038        );
1039        let table_info_key = TableInfoKey::new(view_id);
1040        let view_info_key = ViewInfoKey::new(view_id);
1041        keys.push(view_name.to_bytes());
1042        keys.push(table_info_key.to_bytes());
1043        keys.push(view_info_key.to_bytes());
1044
1045        Ok(keys)
1046    }
1047
1048    /// Deletes metadata for view **permanently**.
1049    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1050    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1051        let keys = self.view_info_keys(view_id, view_name)?;
1052        let _ = self
1053            .kv_backend
1054            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1055            .await?;
1056        Ok(())
1057    }
1058
1059    /// Renames the table name and returns an error if different metadata exists.
1060    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1061    /// and the new `TableNameKey` MUST be empty.
1062    pub async fn rename_table(
1063        &self,
1064        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1065        new_table_name: String,
1066    ) -> Result<()> {
1067        let current_table_info = &current_table_info_value.table_info;
1068        let table_id = current_table_info.ident.table_id;
1069
1070        let table_name_key = TableNameKey::new(
1071            &current_table_info.catalog_name,
1072            &current_table_info.schema_name,
1073            &current_table_info.name,
1074        );
1075
1076        let new_table_name_key = TableNameKey::new(
1077            &current_table_info.catalog_name,
1078            &current_table_info.schema_name,
1079            &new_table_name,
1080        );
1081
1082        // Updates table name.
1083        let update_table_name_txn = self.table_name_manager().build_update_txn(
1084            &table_name_key,
1085            &new_table_name_key,
1086            table_id,
1087        )?;
1088
1089        let new_table_info_value = current_table_info_value
1090            .inner
1091            .with_update(move |table_info| {
1092                table_info.name = new_table_name;
1093            });
1094
1095        // Updates table info.
1096        let (update_table_info_txn, on_update_table_info_failure) = self
1097            .table_info_manager()
1098            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1099
1100        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1101
1102        let mut r = self.kv_backend.txn(txn).await?;
1103
1104        // Checks whether metadata was already updated.
1105        if !r.succeeded {
1106            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1107            let remote_table_info = on_update_table_info_failure(&mut set)?
1108                .context(error::UnexpectedSnafu {
1109                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1110                })?
1111                .into_inner();
1112
1113            let op_name = "the renaming table metadata";
1114            ensure_values!(remote_table_info, new_table_info_value, op_name);
1115        }
1116
1117        Ok(())
1118    }
1119
1120    /// Updates table info and returns an error if different metadata exists.
1121    /// And cascade-ly update all redundant table options for each region
1122    /// if region_distribution is present.
1123    pub async fn update_table_info(
1124        &self,
1125        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1126        region_distribution: Option<RegionDistribution>,
1127        new_table_info: TableInfo,
1128    ) -> Result<()> {
1129        let table_id = current_table_info_value.table_info.ident.table_id;
1130        let new_table_info_value = current_table_info_value.update(new_table_info);
1131
1132        // Updates table info.
1133        let (update_table_info_txn, on_update_table_info_failure) = self
1134            .table_info_manager()
1135            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1136
1137        let txn = if let Some(region_distribution) = region_distribution {
1138            // region options induced from table info.
1139            let new_region_options = new_table_info_value.table_info.to_region_options();
1140            let update_datanode_table_options_txn = self
1141                .datanode_table_manager
1142                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1143                .await?;
1144            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1145        } else {
1146            update_table_info_txn
1147        };
1148
1149        let mut r = self.kv_backend.txn(txn).await?;
1150        // Checks whether metadata was already updated.
1151        if !r.succeeded {
1152            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1153            let remote_table_info = on_update_table_info_failure(&mut set)?
1154                .context(error::UnexpectedSnafu {
1155                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1156                })?
1157                .into_inner();
1158
1159            let op_name = "the updating table info";
1160            ensure_values!(remote_table_info, new_table_info_value, op_name);
1161        }
1162        Ok(())
1163    }
1164
1165    /// Updates view info and returns an error if different metadata exists.
1166    /// Parameters include:
1167    /// - `view_id`: the view id
1168    /// - `current_view_info_value`: the current view info for CAS checking
1169    /// - `new_view_info`: the encoded logical plan
1170    /// - `table_names`: the resolved fully table names in logical plan
1171    /// - `columns`: the view columns
1172    /// - `plan_columns`: the original plan columns
1173    /// - `definition`: The SQL to create the view
1174    ///
1175    #[allow(clippy::too_many_arguments)]
1176    pub async fn update_view_info(
1177        &self,
1178        view_id: TableId,
1179        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1180        new_view_info: Vec<u8>,
1181        table_names: HashSet<TableName>,
1182        columns: Vec<String>,
1183        plan_columns: Vec<String>,
1184        definition: String,
1185    ) -> Result<()> {
1186        let new_view_info_value = current_view_info_value.update(
1187            new_view_info,
1188            table_names,
1189            columns,
1190            plan_columns,
1191            definition,
1192        );
1193
1194        // Updates view info.
1195        let (update_view_info_txn, on_update_view_info_failure) = self
1196            .view_info_manager()
1197            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1198
1199        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1200
1201        // Checks whether metadata was already updated.
1202        if !r.succeeded {
1203            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1204            let remote_view_info = on_update_view_info_failure(&mut set)?
1205                .context(error::UnexpectedSnafu {
1206                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1207                })?
1208                .into_inner();
1209
1210            let op_name = "the updating view info";
1211            ensure_values!(remote_view_info, new_view_info_value, op_name);
1212        }
1213        Ok(())
1214    }
1215
1216    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1217        self.kv_backend.max_txn_ops()
1218    }
1219
1220    pub async fn batch_update_table_info_values(
1221        &self,
1222        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1223    ) -> Result<()> {
1224        let len = table_info_value_pairs.len();
1225        let mut txns = Vec::with_capacity(len);
1226        struct OnFailure<F, R>
1227        where
1228            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1229        {
1230            table_info_value: TableInfoValue,
1231            on_update_table_info_failure: F,
1232        }
1233        let mut on_failures = Vec::with_capacity(len);
1234
1235        for (table_info_value, new_table_info) in table_info_value_pairs {
1236            let table_id = table_info_value.table_info.ident.table_id;
1237
1238            let new_table_info_value = table_info_value.update(new_table_info);
1239
1240            let (update_table_info_txn, on_update_table_info_failure) =
1241                self.table_info_manager().build_update_txn(
1242                    table_id,
1243                    &table_info_value,
1244                    &new_table_info_value,
1245                )?;
1246
1247            txns.push(update_table_info_txn);
1248
1249            on_failures.push(OnFailure {
1250                table_info_value: new_table_info_value,
1251                on_update_table_info_failure,
1252            });
1253        }
1254
1255        let txn = Txn::merge_all(txns);
1256        let mut r = self.kv_backend.txn(txn).await?;
1257
1258        if !r.succeeded {
1259            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1260            for on_failure in on_failures {
1261                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1262                    .context(error::UnexpectedSnafu {
1263                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1264                    })?
1265                    .into_inner();
1266
1267                let op_name = "the batch updating table info";
1268                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1269            }
1270        }
1271
1272        Ok(())
1273    }
1274
1275    pub async fn update_table_route(
1276        &self,
1277        table_id: TableId,
1278        region_info: RegionInfo,
1279        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1280        new_region_routes: Vec<RegionRoute>,
1281        new_region_options: &HashMap<String, String>,
1282        new_region_wal_options: &HashMap<RegionNumber, String>,
1283    ) -> Result<()> {
1284        // Updates the datanode table key value pairs.
1285        let current_region_distribution =
1286            region_distribution(current_table_route_value.region_routes()?);
1287        let new_region_distribution = region_distribution(&new_region_routes);
1288
1289        let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1290            table_id,
1291            &region_info.region_wal_options,
1292            new_region_wal_options,
1293        )?;
1294        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1295            table_id,
1296            region_info,
1297            current_region_distribution,
1298            new_region_distribution,
1299            new_region_options,
1300            new_region_wal_options,
1301        )?;
1302
1303        // Updates the table_route.
1304        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1305        let (update_table_route_txn, on_update_table_route_failure) = self
1306            .table_route_manager()
1307            .table_route_storage()
1308            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1309
1310        let txn = Txn::merge_all(vec![
1311            update_datanode_table_txn,
1312            update_table_route_txn,
1313            update_topic_region_txn,
1314        ]);
1315
1316        let mut r = self.kv_backend.txn(txn).await?;
1317
1318        // Checks whether metadata was already updated.
1319        if !r.succeeded {
1320            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1321            let remote_table_route = on_update_table_route_failure(&mut set)?
1322                .context(error::UnexpectedSnafu {
1323                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1324                })?
1325                .into_inner();
1326
1327            let op_name = "the updating table route";
1328            ensure_values!(remote_table_route, new_table_route_value, op_name);
1329        }
1330
1331        Ok(())
1332    }
1333
1334    /// Updates the leader status of the [RegionRoute].
1335    pub async fn update_leader_region_status<F>(
1336        &self,
1337        table_id: TableId,
1338        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1339        next_region_route_status: F,
1340    ) -> Result<()>
1341    where
1342        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1343    {
1344        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1345
1346        let mut updated = 0;
1347        for route in &mut new_region_routes {
1348            if let Some(state) = next_region_route_status(route)
1349                && route.set_leader_state(state)
1350            {
1351                updated += 1;
1352            }
1353        }
1354
1355        if updated == 0 {
1356            warn!("No leader status updated");
1357            return Ok(());
1358        }
1359
1360        // Updates the table_route.
1361        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1362
1363        let (update_table_route_txn, on_update_table_route_failure) = self
1364            .table_route_manager()
1365            .table_route_storage()
1366            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1367
1368        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1369
1370        // Checks whether metadata was already updated.
1371        if !r.succeeded {
1372            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1373            let remote_table_route = on_update_table_route_failure(&mut set)?
1374                .context(error::UnexpectedSnafu {
1375                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1376                })?
1377                .into_inner();
1378
1379            let op_name = "the updating leader region status";
1380            ensure_values!(remote_table_route, new_table_route_value, op_name);
1381        }
1382
1383        Ok(())
1384    }
1385}
1386
1387#[macro_export]
1388macro_rules! impl_metadata_value {
1389    ($($val_ty: ty), *) => {
1390        $(
1391            impl $crate::key::MetadataValue for $val_ty {
1392                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1393                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1394                }
1395
1396                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1397                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1398                }
1399            }
1400        )*
1401    }
1402}
1403
1404macro_rules! impl_metadata_key_get_txn_op {
1405    ($($key: ty), *) => {
1406        $(
1407            impl $crate::key::MetadataKeyGetTxnOp for $key {
1408                /// Returns a [TxnOp] to retrieve the corresponding value
1409                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1410                fn build_get_op(
1411                    &self,
1412                ) -> (
1413                    TxnOp,
1414                    impl for<'a> FnMut(
1415                        &'a mut TxnOpGetResponseSet,
1416                    ) -> Option<Vec<u8>>,
1417                ) {
1418                    let raw_key = self.to_bytes();
1419                    (
1420                        TxnOp::Get(raw_key.clone()),
1421                        TxnOpGetResponseSet::filter(raw_key),
1422                    )
1423                }
1424            }
1425        )*
1426    }
1427}
1428
1429impl_metadata_key_get_txn_op! {
1430    TableNameKey<'_>,
1431    TableInfoKey,
1432    ViewInfoKey,
1433    TableRouteKey,
1434    DatanodeTableKey
1435}
1436
1437#[macro_export]
1438macro_rules! impl_optional_metadata_value {
1439    ($($val_ty: ty), *) => {
1440        $(
1441            impl $val_ty {
1442                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1443                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1444                }
1445
1446                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1447                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1448                }
1449            }
1450        )*
1451    }
1452}
1453
1454impl_metadata_value! {
1455    TableNameValue,
1456    TableInfoValue,
1457    ViewInfoValue,
1458    DatanodeTableValue,
1459    FlowInfoValue,
1460    FlowNameValue,
1461    FlowRouteValue,
1462    TableFlowValue,
1463    NodeAddressValue,
1464    SchemaNameValue,
1465    FlowStateValue,
1466    PoisonValue,
1467    TopicRegionValue
1468}
1469
1470impl_optional_metadata_value! {
1471    CatalogNameValue,
1472    SchemaNameValue
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477    use std::collections::{BTreeMap, HashMap, HashSet};
1478    use std::sync::Arc;
1479
1480    use bytes::Bytes;
1481    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1482    use common_time::util::current_time_millis;
1483    use common_wal::options::{KafkaWalOptions, WalOptions};
1484    use futures::TryStreamExt;
1485    use store_api::storage::{RegionId, RegionNumber};
1486    use table::metadata::TableInfo;
1487    use table::table_name::TableName;
1488
1489    use super::datanode_table::DatanodeTableKey;
1490    use super::test_utils;
1491    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1492    use crate::ddl::test_util::create_table::test_create_table_task;
1493    use crate::ddl::utils::region_storage_path;
1494    use crate::error::Result;
1495    use crate::key::datanode_table::RegionInfo;
1496    use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1497    use crate::key::table_info::TableInfoValue;
1498    use crate::key::table_name::TableNameKey;
1499    use crate::key::table_route::TableRouteValue;
1500    use crate::key::topic_region::TopicRegionKey;
1501    use crate::key::{
1502        DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1503        TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1504    };
1505    use crate::kv_backend::KvBackend;
1506    use crate::kv_backend::memory::MemoryKvBackend;
1507    use crate::peer::Peer;
1508    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1509    use crate::rpc::store::{PutRequest, RangeRequest};
1510    use crate::wal_provider::WalProvider;
1511
1512    #[test]
1513    fn test_deserialized_value_with_bytes() {
1514        let region_route = new_test_region_route();
1515        let region_routes = vec![region_route.clone()];
1516
1517        let expected_region_routes =
1518            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1519        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1520
1521        // Serialize behaviors:
1522        // The inner field will be ignored.
1523        let value = DeserializedValueWithBytes {
1524            // ignored
1525            inner: TableRouteValue::physical(region_routes.clone()),
1526            bytes: Bytes::from(expected.clone()),
1527        };
1528
1529        let encoded = serde_json::to_vec(&value).unwrap();
1530
1531        // Deserialize behaviors:
1532        // The inner field will be deserialized from the bytes field.
1533        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1534            serde_json::from_slice(&encoded).unwrap();
1535
1536        assert_eq!(decoded.inner, expected_region_routes);
1537        assert_eq!(decoded.bytes, expected);
1538    }
1539
1540    fn new_test_region_route() -> RegionRoute {
1541        new_region_route(1, 2)
1542    }
1543
1544    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1545        RegionRoute {
1546            region: Region {
1547                id: region_id.into(),
1548                name: "r1".to_string(),
1549                attrs: BTreeMap::new(),
1550                partition_expr: Default::default(),
1551            },
1552            leader_peer: Some(Peer::new(datanode, "a2")),
1553            follower_peers: vec![],
1554            leader_state: None,
1555            leader_down_since: None,
1556            write_route_policy: None,
1557        }
1558    }
1559
1560    fn new_test_table_info() -> TableInfo {
1561        test_utils::new_test_table_info(10)
1562    }
1563
1564    fn new_test_table_names() -> HashSet<TableName> {
1565        let mut set = HashSet::new();
1566        set.insert(TableName {
1567            catalog_name: "greptime".to_string(),
1568            schema_name: "public".to_string(),
1569            table_name: "a_table".to_string(),
1570        });
1571        set.insert(TableName {
1572            catalog_name: "greptime".to_string(),
1573            schema_name: "public".to_string(),
1574            table_name: "b_table".to_string(),
1575        });
1576        set
1577    }
1578
1579    async fn create_physical_table_metadata(
1580        table_metadata_manager: &TableMetadataManager,
1581        table_info: TableInfo,
1582        region_routes: Vec<RegionRoute>,
1583        region_wal_options: HashMap<RegionNumber, String>,
1584    ) -> Result<()> {
1585        table_metadata_manager
1586            .create_table_metadata(
1587                table_info,
1588                TableRouteValue::physical(region_routes),
1589                region_wal_options,
1590            )
1591            .await
1592    }
1593
1594    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1595        let topics = (0..2)
1596            .map(|i| format!("greptimedb_topic{}", i))
1597            .collect::<Vec<_>>();
1598        let wal_options = topics
1599            .iter()
1600            .map(|topic| {
1601                WalOptions::Kafka(KafkaWalOptions {
1602                    topic: topic.clone(),
1603                })
1604            })
1605            .collect::<Vec<_>>();
1606
1607        (0..16)
1608            .enumerate()
1609            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1610            .collect()
1611    }
1612
1613    #[tokio::test]
1614    async fn test_raft_engine_topic_region_map() {
1615        let mem_kv = Arc::new(MemoryKvBackend::default());
1616        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1617        let region_route = new_test_region_route();
1618        let region_routes = &vec![region_route.clone()];
1619        let table_info = new_test_table_info();
1620        let wal_provider = WalProvider::RaftEngine;
1621        let regions: Vec<_> = (0..16).collect();
1622        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1623        create_physical_table_metadata(
1624            &table_metadata_manager,
1625            table_info.clone(),
1626            region_routes.clone(),
1627            region_wal_options.clone(),
1628        )
1629        .await
1630        .unwrap();
1631
1632        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1633        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1634        let resp = mem_kv.range(range_req).await.unwrap();
1635        // Should be empty because the topic region map is empty for raft engine.
1636        assert!(resp.kvs.is_empty());
1637    }
1638
1639    #[tokio::test]
1640    async fn test_create_table_metadata() {
1641        let mem_kv = Arc::new(MemoryKvBackend::default());
1642        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1643        let region_route = new_test_region_route();
1644        let region_routes = &vec![region_route.clone()];
1645        let table_info = new_test_table_info();
1646        let region_wal_options = create_mock_region_wal_options()
1647            .into_iter()
1648            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1649            .collect::<HashMap<_, _>>();
1650
1651        // creates metadata.
1652        create_physical_table_metadata(
1653            &table_metadata_manager,
1654            table_info.clone(),
1655            region_routes.clone(),
1656            region_wal_options.clone(),
1657        )
1658        .await
1659        .unwrap();
1660
1661        // if metadata was already created, it should be ok.
1662        assert!(
1663            create_physical_table_metadata(
1664                &table_metadata_manager,
1665                table_info.clone(),
1666                region_routes.clone(),
1667                region_wal_options.clone(),
1668            )
1669            .await
1670            .is_ok()
1671        );
1672
1673        let mut modified_region_routes = region_routes.clone();
1674        modified_region_routes.push(region_route.clone());
1675        // if remote metadata was exists, it should return an error.
1676        assert!(
1677            create_physical_table_metadata(
1678                &table_metadata_manager,
1679                table_info.clone(),
1680                modified_region_routes,
1681                region_wal_options.clone(),
1682            )
1683            .await
1684            .is_err()
1685        );
1686
1687        let (remote_table_info, remote_table_route) = table_metadata_manager
1688            .get_full_table_info(10)
1689            .await
1690            .unwrap();
1691
1692        assert_eq!(
1693            remote_table_info.unwrap().into_inner().table_info,
1694            table_info
1695        );
1696        assert_eq!(
1697            remote_table_route
1698                .unwrap()
1699                .into_inner()
1700                .region_routes()
1701                .unwrap(),
1702            region_routes
1703        );
1704
1705        for i in 0..2 {
1706            let region_number = i as u32;
1707            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1708            let topic = format!("greptimedb_topic{}", i);
1709            let regions = table_metadata_manager
1710                .topic_region_manager
1711                .regions(&topic)
1712                .await
1713                .unwrap()
1714                .into_keys()
1715                .collect::<Vec<_>>();
1716            assert_eq!(regions.len(), 8);
1717            assert!(regions.contains(&region_id));
1718        }
1719    }
1720
1721    #[tokio::test]
1722    async fn test_get_full_table_info_remaps_route_address() {
1723        let mem_kv = Arc::new(MemoryKvBackend::default());
1724        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1725
1726        let mut region_route = new_test_region_route();
1727        region_route.follower_peers = vec![Peer::empty(3)];
1728        let region_routes = vec![region_route];
1729        let table_info = new_test_table_info();
1730        let table_id = table_info.ident.table_id;
1731
1732        create_physical_table_metadata(
1733            &table_metadata_manager,
1734            table_info,
1735            region_routes,
1736            HashMap::new(),
1737        )
1738        .await
1739        .unwrap();
1740
1741        mem_kv
1742            .put(PutRequest {
1743                key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1744                value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1745                    .try_as_raw_value()
1746                    .unwrap(),
1747                ..Default::default()
1748            })
1749            .await
1750            .unwrap();
1751        mem_kv
1752            .put(PutRequest {
1753                key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1754                value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1755                    .try_as_raw_value()
1756                    .unwrap(),
1757                ..Default::default()
1758            })
1759            .await
1760            .unwrap();
1761
1762        let (_, table_route) = table_metadata_manager
1763            .get_full_table_info(table_id)
1764            .await
1765            .unwrap();
1766        let table_route = table_route.unwrap().into_inner();
1767        let region_routes = table_route.region_routes().unwrap();
1768
1769        assert_eq!(
1770            region_routes[0].leader_peer.as_ref().unwrap().addr,
1771            "new-a2"
1772        );
1773        assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1774    }
1775
1776    #[tokio::test]
1777    async fn test_create_logic_tables_metadata() {
1778        let mem_kv = Arc::new(MemoryKvBackend::default());
1779        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1780        let region_route = new_test_region_route();
1781        let region_routes = vec![region_route.clone()];
1782        let table_info = new_test_table_info();
1783        let table_id = table_info.ident.table_id;
1784        let table_route_value = TableRouteValue::physical(region_routes.clone());
1785
1786        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1787        // creates metadata.
1788        table_metadata_manager
1789            .create_logical_tables_metadata(tables_data.clone())
1790            .await
1791            .unwrap();
1792
1793        // if metadata was already created, it should be ok.
1794        assert!(
1795            table_metadata_manager
1796                .create_logical_tables_metadata(tables_data)
1797                .await
1798                .is_ok()
1799        );
1800
1801        let mut modified_region_routes = region_routes.clone();
1802        modified_region_routes.push(new_region_route(2, 3));
1803        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1804        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1805        // if remote metadata was exists, it should return an error.
1806        assert!(
1807            table_metadata_manager
1808                .create_logical_tables_metadata(modified_tables_data)
1809                .await
1810                .is_err()
1811        );
1812
1813        let (remote_table_info, remote_table_route) = table_metadata_manager
1814            .get_full_table_info(table_id)
1815            .await
1816            .unwrap();
1817
1818        assert_eq!(
1819            remote_table_info.unwrap().into_inner().table_info,
1820            table_info
1821        );
1822        assert_eq!(
1823            remote_table_route
1824                .unwrap()
1825                .into_inner()
1826                .region_routes()
1827                .unwrap(),
1828            &region_routes
1829        );
1830    }
1831
1832    #[tokio::test]
1833    async fn test_create_many_logical_tables_metadata() {
1834        let kv_backend = Arc::new(MemoryKvBackend::default());
1835        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1836
1837        let mut tables_data = vec![];
1838        for i in 0..128 {
1839            let table_id = i + 1;
1840            let regin_number = table_id * 3;
1841            let region_id = RegionId::new(table_id, regin_number);
1842            let region_route = new_region_route(region_id.as_u64(), 2);
1843            let region_routes = vec![region_route.clone()];
1844            let table_info = test_utils::new_test_table_info_with_name(
1845                table_id,
1846                &format!("my_table_{}", table_id),
1847            );
1848            let table_route_value = TableRouteValue::physical(region_routes.clone());
1849
1850            tables_data.push((table_info, table_route_value));
1851        }
1852
1853        // creates metadata.
1854        table_metadata_manager
1855            .create_logical_tables_metadata(tables_data)
1856            .await
1857            .unwrap();
1858    }
1859
1860    #[tokio::test]
1861    async fn test_delete_table_metadata() {
1862        let mem_kv = Arc::new(MemoryKvBackend::default());
1863        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1864        let region_route = new_test_region_route();
1865        let region_routes = &vec![region_route.clone()];
1866        let table_info = new_test_table_info();
1867        let table_id = table_info.ident.table_id;
1868        let datanode_id = 2;
1869        let region_wal_options = create_mock_region_wal_options();
1870        let serialized_region_wal_options = region_wal_options
1871            .iter()
1872            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1873            .collect::<HashMap<_, _>>();
1874
1875        // creates metadata.
1876        create_physical_table_metadata(
1877            &table_metadata_manager,
1878            table_info.clone(),
1879            region_routes.clone(),
1880            serialized_region_wal_options,
1881        )
1882        .await
1883        .unwrap();
1884
1885        let table_name = TableName::new(
1886            table_info.catalog_name,
1887            table_info.schema_name,
1888            table_info.name,
1889        );
1890        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1891        // deletes metadata.
1892        table_metadata_manager
1893            .delete_table_metadata(
1894                table_id,
1895                &table_name,
1896                table_route_value,
1897                &region_wal_options,
1898            )
1899            .await
1900            .unwrap();
1901        // Should be ignored.
1902        table_metadata_manager
1903            .delete_table_metadata(
1904                table_id,
1905                &table_name,
1906                table_route_value,
1907                &region_wal_options,
1908            )
1909            .await
1910            .unwrap();
1911        assert!(
1912            table_metadata_manager
1913                .table_info_manager()
1914                .get(table_id)
1915                .await
1916                .unwrap()
1917                .is_none()
1918        );
1919        assert!(
1920            table_metadata_manager
1921                .table_route_manager()
1922                .table_route_storage()
1923                .get(table_id)
1924                .await
1925                .unwrap()
1926                .is_none()
1927        );
1928        assert!(
1929            table_metadata_manager
1930                .datanode_table_manager()
1931                .tables(datanode_id)
1932                .try_collect::<Vec<_>>()
1933                .await
1934                .unwrap()
1935                .is_empty()
1936        );
1937        // Checks removed values
1938        let table_info = table_metadata_manager
1939            .table_info_manager()
1940            .get(table_id)
1941            .await
1942            .unwrap();
1943        assert!(table_info.is_none());
1944        let table_route = table_metadata_manager
1945            .table_route_manager()
1946            .table_route_storage()
1947            .get(table_id)
1948            .await
1949            .unwrap();
1950        assert!(table_route.is_none());
1951        // Logical delete removes the topic region mapping as well.
1952        let regions = table_metadata_manager
1953            .topic_region_manager
1954            .regions("greptimedb_topic0")
1955            .await
1956            .unwrap();
1957        assert_eq!(regions.len(), 0);
1958        let regions = table_metadata_manager
1959            .topic_region_manager
1960            .regions("greptimedb_topic1")
1961            .await
1962            .unwrap();
1963        assert_eq!(regions.len(), 0);
1964    }
1965
1966    #[tokio::test]
1967    async fn test_rename_table() {
1968        let mem_kv = Arc::new(MemoryKvBackend::default());
1969        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1970        let region_route = new_test_region_route();
1971        let region_routes = vec![region_route.clone()];
1972        let table_info = new_test_table_info();
1973        let table_id = table_info.ident.table_id;
1974        // creates metadata.
1975        create_physical_table_metadata(
1976            &table_metadata_manager,
1977            table_info.clone(),
1978            region_routes.clone(),
1979            HashMap::new(),
1980        )
1981        .await
1982        .unwrap();
1983
1984        let new_table_name = "another_name".to_string();
1985        let table_info_value =
1986            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1987
1988        table_metadata_manager
1989            .rename_table(&table_info_value, new_table_name.clone())
1990            .await
1991            .unwrap();
1992        // if remote metadata was updated, it should be ok.
1993        table_metadata_manager
1994            .rename_table(&table_info_value, new_table_name.clone())
1995            .await
1996            .unwrap();
1997        let mut modified_table_info = table_info.clone();
1998        modified_table_info.name = "hi".to_string();
1999        let modified_table_info_value =
2000            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2001        // if the table_info_value is wrong, it should return an error.
2002        // The ABA problem.
2003        assert!(
2004            table_metadata_manager
2005                .rename_table(&modified_table_info_value, new_table_name.clone())
2006                .await
2007                .is_err()
2008        );
2009
2010        let old_table_name = TableNameKey::new(
2011            &table_info.catalog_name,
2012            &table_info.schema_name,
2013            &table_info.name,
2014        );
2015        let new_table_name = TableNameKey::new(
2016            &table_info.catalog_name,
2017            &table_info.schema_name,
2018            &new_table_name,
2019        );
2020
2021        assert!(
2022            table_metadata_manager
2023                .table_name_manager()
2024                .get(old_table_name)
2025                .await
2026                .unwrap()
2027                .is_none()
2028        );
2029
2030        assert_eq!(
2031            table_metadata_manager
2032                .table_name_manager()
2033                .get(new_table_name)
2034                .await
2035                .unwrap()
2036                .unwrap()
2037                .table_id(),
2038            table_id
2039        );
2040    }
2041
2042    #[tokio::test]
2043    async fn test_update_table_info() {
2044        let mem_kv = Arc::new(MemoryKvBackend::default());
2045        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2046        let region_route = new_test_region_route();
2047        let region_routes = vec![region_route.clone()];
2048        let table_info = new_test_table_info();
2049        let table_id = table_info.ident.table_id;
2050        // creates metadata.
2051        create_physical_table_metadata(
2052            &table_metadata_manager,
2053            table_info.clone(),
2054            region_routes.clone(),
2055            HashMap::new(),
2056        )
2057        .await
2058        .unwrap();
2059
2060        let mut new_table_info = table_info.clone();
2061        new_table_info.name = "hi".to_string();
2062        let current_table_info_value =
2063            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2064        // should be ok.
2065        table_metadata_manager
2066            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2067            .await
2068            .unwrap();
2069        // if table info was updated, it should be ok.
2070        table_metadata_manager
2071            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2072            .await
2073            .unwrap();
2074
2075        // updated table_info should equal the `new_table_info`
2076        let updated_table_info = table_metadata_manager
2077            .table_info_manager()
2078            .get(table_id)
2079            .await
2080            .unwrap()
2081            .unwrap()
2082            .into_inner();
2083        assert_eq!(updated_table_info.table_info, new_table_info);
2084
2085        let mut wrong_table_info = table_info.clone();
2086        wrong_table_info.name = "wrong".to_string();
2087        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2088            current_table_info_value.update(wrong_table_info),
2089        );
2090        // if the current_table_info_value is wrong, it should return an error.
2091        // The ABA problem.
2092        assert!(
2093            table_metadata_manager
2094                .update_table_info(&wrong_table_info_value, None, new_table_info)
2095                .await
2096                .is_err()
2097        )
2098    }
2099
2100    #[tokio::test]
2101    async fn test_update_table_leader_region_status() {
2102        let mem_kv = Arc::new(MemoryKvBackend::default());
2103        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2104        let datanode = 1;
2105        let region_routes = vec![
2106            RegionRoute {
2107                region: Region {
2108                    id: 1.into(),
2109                    name: "r1".to_string(),
2110                    attrs: BTreeMap::new(),
2111                    partition_expr: Default::default(),
2112                },
2113                leader_peer: Some(Peer::new(datanode, "a2")),
2114                leader_state: Some(LeaderState::Downgrading),
2115                follower_peers: vec![],
2116                leader_down_since: Some(current_time_millis()),
2117                write_route_policy: None,
2118            },
2119            RegionRoute {
2120                region: Region {
2121                    id: 2.into(),
2122                    name: "r2".to_string(),
2123                    attrs: BTreeMap::new(),
2124                    partition_expr: Default::default(),
2125                },
2126                leader_peer: Some(Peer::new(datanode, "a1")),
2127                leader_state: None,
2128                follower_peers: vec![],
2129                leader_down_since: None,
2130                write_route_policy: None,
2131            },
2132        ];
2133        let table_info = new_test_table_info();
2134        let table_id = table_info.ident.table_id;
2135        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2136            TableRouteValue::physical(region_routes.clone()),
2137        );
2138
2139        // creates metadata.
2140        create_physical_table_metadata(
2141            &table_metadata_manager,
2142            table_info.clone(),
2143            region_routes.clone(),
2144            HashMap::new(),
2145        )
2146        .await
2147        .unwrap();
2148
2149        table_metadata_manager
2150            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2151                if region_route.leader_state.is_some() {
2152                    None
2153                } else {
2154                    Some(Some(LeaderState::Downgrading))
2155                }
2156            })
2157            .await
2158            .unwrap();
2159
2160        let updated_route_value = table_metadata_manager
2161            .table_route_manager()
2162            .table_route_storage()
2163            .get(table_id)
2164            .await
2165            .unwrap()
2166            .unwrap();
2167
2168        assert_eq!(
2169            updated_route_value.region_routes().unwrap()[0].leader_state,
2170            Some(LeaderState::Downgrading)
2171        );
2172
2173        assert!(
2174            updated_route_value.region_routes().unwrap()[0]
2175                .leader_down_since
2176                .is_some()
2177        );
2178
2179        assert_eq!(
2180            updated_route_value.region_routes().unwrap()[1].leader_state,
2181            Some(LeaderState::Downgrading)
2182        );
2183        assert!(
2184            updated_route_value.region_routes().unwrap()[1]
2185                .leader_down_since
2186                .is_some()
2187        );
2188    }
2189
2190    async fn assert_datanode_table(
2191        table_metadata_manager: &TableMetadataManager,
2192        table_id: u32,
2193        region_routes: &[RegionRoute],
2194    ) {
2195        let region_distribution = region_distribution(region_routes);
2196        for (datanode, regions) in region_distribution {
2197            let got = table_metadata_manager
2198                .datanode_table_manager()
2199                .get(&DatanodeTableKey::new(datanode, table_id))
2200                .await
2201                .unwrap()
2202                .unwrap();
2203
2204            assert_eq!(got.regions, regions.leader_regions);
2205            assert_eq!(got.follower_regions, regions.follower_regions);
2206        }
2207    }
2208
2209    #[tokio::test]
2210    async fn test_update_table_route() {
2211        let mem_kv = Arc::new(MemoryKvBackend::default());
2212        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2213        let region_route = new_test_region_route();
2214        let region_routes = vec![region_route.clone()];
2215        let table_info = new_test_table_info();
2216        let table_id = table_info.ident.table_id;
2217        let engine = table_info.meta.engine.as_str();
2218        let region_storage_path =
2219            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2220        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2221            TableRouteValue::physical(region_routes.clone()),
2222        );
2223
2224        // creates metadata.
2225        create_physical_table_metadata(
2226            &table_metadata_manager,
2227            table_info.clone(),
2228            region_routes.clone(),
2229            HashMap::new(),
2230        )
2231        .await
2232        .unwrap();
2233
2234        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2235        let new_region_routes = vec![
2236            new_region_route(1, 1),
2237            new_region_route(2, 2),
2238            new_region_route(3, 3),
2239        ];
2240        // it should be ok.
2241        table_metadata_manager
2242            .update_table_route(
2243                table_id,
2244                RegionInfo {
2245                    engine: engine.to_string(),
2246                    region_storage_path: region_storage_path.clone(),
2247                    region_options: HashMap::new(),
2248                    region_wal_options: HashMap::new(),
2249                },
2250                &current_table_route_value,
2251                new_region_routes.clone(),
2252                &HashMap::new(),
2253                &HashMap::new(),
2254            )
2255            .await
2256            .unwrap();
2257        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2258
2259        // if the table route was updated. it should be ok.
2260        table_metadata_manager
2261            .update_table_route(
2262                table_id,
2263                RegionInfo {
2264                    engine: engine.to_string(),
2265                    region_storage_path: region_storage_path.clone(),
2266                    region_options: HashMap::new(),
2267                    region_wal_options: HashMap::new(),
2268                },
2269                &current_table_route_value,
2270                new_region_routes.clone(),
2271                &HashMap::new(),
2272                &HashMap::new(),
2273            )
2274            .await
2275            .unwrap();
2276
2277        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2278            current_table_route_value
2279                .inner
2280                .update(new_region_routes.clone())
2281                .unwrap(),
2282        );
2283        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2284        // it should be ok.
2285        table_metadata_manager
2286            .update_table_route(
2287                table_id,
2288                RegionInfo {
2289                    engine: engine.to_string(),
2290                    region_storage_path: region_storage_path.clone(),
2291                    region_options: HashMap::new(),
2292                    region_wal_options: HashMap::new(),
2293                },
2294                &current_table_route_value,
2295                new_region_routes.clone(),
2296                &HashMap::new(),
2297                &HashMap::new(),
2298            )
2299            .await
2300            .unwrap();
2301        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2302
2303        // if the current_table_route_value is wrong, it should return an error.
2304        // The ABA problem.
2305        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2306            current_table_route_value
2307                .update(vec![
2308                    new_region_route(1, 1),
2309                    new_region_route(2, 2),
2310                    new_region_route(3, 3),
2311                    new_region_route(4, 4),
2312                ])
2313                .unwrap(),
2314        );
2315        assert!(
2316            table_metadata_manager
2317                .update_table_route(
2318                    table_id,
2319                    RegionInfo {
2320                        engine: engine.to_string(),
2321                        region_storage_path: region_storage_path.clone(),
2322                        region_options: HashMap::new(),
2323                        region_wal_options: HashMap::new(),
2324                    },
2325                    &wrong_table_route_value,
2326                    new_region_routes,
2327                    &HashMap::new(),
2328                    &HashMap::new(),
2329                )
2330                .await
2331                .is_err()
2332        );
2333    }
2334
2335    #[tokio::test]
2336    async fn test_update_table_route_with_topic_region_mapping() {
2337        let mem_kv = Arc::new(MemoryKvBackend::default());
2338        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2339        let region_route = new_test_region_route();
2340        let region_routes = vec![region_route.clone()];
2341        let table_info = new_test_table_info();
2342        let table_id = table_info.ident.table_id;
2343        let engine = table_info.meta.engine.as_str();
2344        let region_storage_path =
2345            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2346
2347        // Create initial metadata with Kafka WAL options
2348        let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2349            (
2350                1,
2351                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2352                    topic: "topic_1".to_string(),
2353                }))
2354                .unwrap(),
2355            ),
2356            (
2357                2,
2358                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2359                    topic: "topic_2".to_string(),
2360                }))
2361                .unwrap(),
2362            ),
2363        ]
2364        .into_iter()
2365        .collect();
2366
2367        create_physical_table_metadata(
2368            &table_metadata_manager,
2369            table_info.clone(),
2370            region_routes.clone(),
2371            old_region_wal_options.clone(),
2372        )
2373        .await
2374        .unwrap();
2375
2376        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2377            TableRouteValue::physical(region_routes.clone()),
2378        );
2379
2380        // Verify initial topic region mappings exist
2381        let region_id_1 = RegionId::new(table_id, 1);
2382        let region_id_2 = RegionId::new(table_id, 2);
2383        let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2384        let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2385        assert!(
2386            table_metadata_manager
2387                .topic_region_manager
2388                .get(topic_1_key.clone())
2389                .await
2390                .unwrap()
2391                .is_some()
2392        );
2393        assert!(
2394            table_metadata_manager
2395                .topic_region_manager
2396                .get(topic_2_key.clone())
2397                .await
2398                .unwrap()
2399                .is_some()
2400        );
2401
2402        // Test 1: Add new region with new topic
2403        let new_region_routes = vec![
2404            new_region_route(1, 1),
2405            new_region_route(2, 2),
2406            new_region_route(3, 3), // New region
2407        ];
2408        let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2409            (
2410                1,
2411                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2412                    topic: "topic_1".to_string(), // Unchanged
2413                }))
2414                .unwrap(),
2415            ),
2416            (
2417                2,
2418                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2419                    topic: "topic_2".to_string(), // Unchanged
2420                }))
2421                .unwrap(),
2422            ),
2423            (
2424                3,
2425                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2426                    topic: "topic_3".to_string(), // New topic
2427                }))
2428                .unwrap(),
2429            ),
2430        ]
2431        .into_iter()
2432        .collect();
2433        let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2434            current_table_route_value
2435                .inner
2436                .update(new_region_routes.clone())
2437                .unwrap(),
2438        );
2439        table_metadata_manager
2440            .update_table_route(
2441                table_id,
2442                RegionInfo {
2443                    engine: engine.to_string(),
2444                    region_storage_path: region_storage_path.clone(),
2445                    region_options: HashMap::new(),
2446                    region_wal_options: old_region_wal_options.clone(),
2447                },
2448                &current_table_route_value,
2449                new_region_routes.clone(),
2450                &HashMap::new(),
2451                &new_region_wal_options,
2452            )
2453            .await
2454            .unwrap();
2455        // Verify new topic region mapping was created
2456        let region_id_3 = RegionId::new(table_id, 3);
2457        let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2458        assert!(
2459            table_metadata_manager
2460                .topic_region_manager
2461                .get(topic_3_key)
2462                .await
2463                .unwrap()
2464                .is_some()
2465        );
2466        // Test 2: Remove a region and change topic for another
2467        let newer_region_routes = vec![
2468            new_region_route(1, 1),
2469            // Region 2 removed
2470            // Region 3 now has different topic
2471        ];
2472        let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2473            (
2474                1,
2475                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2476                    topic: "topic_1".to_string(), // Unchanged
2477                }))
2478                .unwrap(),
2479            ),
2480            (
2481                3,
2482                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2483                    topic: "topic_3_new".to_string(), // Changed topic
2484                }))
2485                .unwrap(),
2486            ),
2487        ]
2488        .into_iter()
2489        .collect();
2490        table_metadata_manager
2491            .update_table_route(
2492                table_id,
2493                RegionInfo {
2494                    engine: engine.to_string(),
2495                    region_storage_path: region_storage_path.clone(),
2496                    region_options: HashMap::new(),
2497                    region_wal_options: new_region_wal_options.clone(),
2498                },
2499                &current_table_route_value_updated,
2500                newer_region_routes.clone(),
2501                &HashMap::new(),
2502                &newer_region_wal_options,
2503            )
2504            .await
2505            .unwrap();
2506        // Verify region 2 mapping was deleted
2507        let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2508        assert!(
2509            table_metadata_manager
2510                .topic_region_manager
2511                .get(topic_2_key_new)
2512                .await
2513                .unwrap()
2514                .is_none()
2515        );
2516        // Verify region 3 old topic mapping was deleted
2517        let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2518        assert!(
2519            table_metadata_manager
2520                .topic_region_manager
2521                .get(topic_3_key_old)
2522                .await
2523                .unwrap()
2524                .is_none()
2525        );
2526        // Verify region 3 new topic mapping was created
2527        let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2528        assert!(
2529            table_metadata_manager
2530                .topic_region_manager
2531                .get(topic_3_key_new)
2532                .await
2533                .unwrap()
2534                .is_some()
2535        );
2536        // Verify region 1 mapping still exists (unchanged)
2537        assert!(
2538            table_metadata_manager
2539                .topic_region_manager
2540                .get(topic_1_key)
2541                .await
2542                .unwrap()
2543                .is_some()
2544        );
2545    }
2546
2547    #[tokio::test]
2548    async fn test_destroy_table_metadata() {
2549        let mem_kv = Arc::new(MemoryKvBackend::default());
2550        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2551        let table_id = 1025;
2552        let table_name = "foo";
2553        let task = test_create_table_task(table_name, table_id);
2554        let options = create_mock_region_wal_options();
2555        let serialized_options = options
2556            .iter()
2557            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2558            .collect::<HashMap<_, _>>();
2559        table_metadata_manager
2560            .create_table_metadata(
2561                task.table_info,
2562                TableRouteValue::physical(vec![
2563                    RegionRoute {
2564                        region: Region::new_test(RegionId::new(table_id, 1)),
2565                        leader_peer: Some(Peer::empty(1)),
2566                        follower_peers: vec![Peer::empty(5)],
2567                        leader_state: None,
2568                        leader_down_since: None,
2569                        write_route_policy: None,
2570                    },
2571                    RegionRoute {
2572                        region: Region::new_test(RegionId::new(table_id, 2)),
2573                        leader_peer: Some(Peer::empty(2)),
2574                        follower_peers: vec![Peer::empty(4)],
2575                        leader_state: None,
2576                        leader_down_since: None,
2577                        write_route_policy: None,
2578                    },
2579                    RegionRoute {
2580                        region: Region::new_test(RegionId::new(table_id, 3)),
2581                        leader_peer: Some(Peer::empty(3)),
2582                        follower_peers: vec![],
2583                        leader_state: None,
2584                        leader_down_since: None,
2585                        write_route_policy: None,
2586                    },
2587                ]),
2588                serialized_options,
2589            )
2590            .await
2591            .unwrap();
2592        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2593        let table_route_value = table_metadata_manager
2594            .table_route_manager
2595            .table_route_storage()
2596            .get_with_raw_bytes(table_id)
2597            .await
2598            .unwrap()
2599            .unwrap();
2600        table_metadata_manager
2601            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2602            .await
2603            .unwrap();
2604        assert!(mem_kv.is_empty());
2605    }
2606
2607    #[tokio::test]
2608    async fn test_restore_table_metadata() {
2609        let mem_kv = Arc::new(MemoryKvBackend::default());
2610        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2611        let table_id = 1025;
2612        let table_name = "foo";
2613        let task = test_create_table_task(table_name, table_id);
2614        let options = create_mock_region_wal_options();
2615        let serialized_options = options
2616            .iter()
2617            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2618            .collect::<HashMap<_, _>>();
2619        table_metadata_manager
2620            .create_table_metadata(
2621                task.table_info,
2622                TableRouteValue::physical(vec![
2623                    RegionRoute {
2624                        region: Region::new_test(RegionId::new(table_id, 1)),
2625                        leader_peer: Some(Peer::empty(1)),
2626                        follower_peers: vec![Peer::empty(5)],
2627                        leader_state: None,
2628                        leader_down_since: None,
2629                        write_route_policy: None,
2630                    },
2631                    RegionRoute {
2632                        region: Region::new_test(RegionId::new(table_id, 2)),
2633                        leader_peer: Some(Peer::empty(2)),
2634                        follower_peers: vec![Peer::empty(4)],
2635                        leader_state: None,
2636                        leader_down_since: None,
2637                        write_route_policy: None,
2638                    },
2639                    RegionRoute {
2640                        region: Region::new_test(RegionId::new(table_id, 3)),
2641                        leader_peer: Some(Peer::empty(3)),
2642                        follower_peers: vec![],
2643                        leader_state: None,
2644                        leader_down_since: None,
2645                        write_route_policy: None,
2646                    },
2647                ]),
2648                serialized_options,
2649            )
2650            .await
2651            .unwrap();
2652        let expected_result = mem_kv.dump();
2653        let table_route_value = table_metadata_manager
2654            .table_route_manager
2655            .table_route_storage()
2656            .get_with_raw_bytes(table_id)
2657            .await
2658            .unwrap()
2659            .unwrap();
2660        let region_routes = table_route_value.region_routes().unwrap();
2661        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2662        let table_route_value = TableRouteValue::physical(region_routes.clone());
2663        table_metadata_manager
2664            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2665            .await
2666            .unwrap();
2667        table_metadata_manager
2668            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2669            .await
2670            .unwrap();
2671        let kvs = mem_kv.dump();
2672        assert_eq!(kvs, expected_result);
2673        // Should be ignored.
2674        table_metadata_manager
2675            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2676            .await
2677            .unwrap();
2678        let kvs = mem_kv.dump();
2679        assert_eq!(kvs, expected_result);
2680    }
2681
2682    #[tokio::test]
2683    async fn test_create_update_view_info() {
2684        let mem_kv = Arc::new(MemoryKvBackend::default());
2685        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2686
2687        let view_info = new_test_table_info();
2688
2689        let view_id = view_info.ident.table_id;
2690
2691        let logical_plan: Vec<u8> = vec![1, 2, 3];
2692        let columns = vec!["a".to_string()];
2693        let plan_columns = vec!["number".to_string()];
2694        let table_names = new_test_table_names();
2695        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2696
2697        // Create metadata
2698        table_metadata_manager
2699            .create_view_metadata(
2700                view_info.clone(),
2701                logical_plan.clone(),
2702                table_names.clone(),
2703                columns.clone(),
2704                plan_columns.clone(),
2705                definition.to_string(),
2706            )
2707            .await
2708            .unwrap();
2709
2710        {
2711            // assert view info
2712            let current_view_info = table_metadata_manager
2713                .view_info_manager()
2714                .get(view_id)
2715                .await
2716                .unwrap()
2717                .unwrap()
2718                .into_inner();
2719            assert_eq!(current_view_info.view_info, logical_plan);
2720            assert_eq!(current_view_info.table_names, table_names);
2721            assert_eq!(current_view_info.definition, definition);
2722            assert_eq!(current_view_info.columns, columns);
2723            assert_eq!(current_view_info.plan_columns, plan_columns);
2724            // assert table info
2725            let current_table_info = table_metadata_manager
2726                .table_info_manager()
2727                .get(view_id)
2728                .await
2729                .unwrap()
2730                .unwrap()
2731                .into_inner();
2732            assert_eq!(current_table_info.table_info, view_info);
2733        }
2734
2735        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2736        let new_table_names = {
2737            let mut set = HashSet::new();
2738            set.insert(TableName {
2739                catalog_name: "greptime".to_string(),
2740                schema_name: "public".to_string(),
2741                table_name: "b_table".to_string(),
2742            });
2743            set.insert(TableName {
2744                catalog_name: "greptime".to_string(),
2745                schema_name: "public".to_string(),
2746                table_name: "c_table".to_string(),
2747            });
2748            set
2749        };
2750        let new_columns = vec!["b".to_string()];
2751        let new_plan_columns = vec!["number2".to_string()];
2752        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2753
2754        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2755            logical_plan.clone(),
2756            table_names,
2757            columns,
2758            plan_columns,
2759            definition.to_string(),
2760        ));
2761        // should be ok.
2762        table_metadata_manager
2763            .update_view_info(
2764                view_id,
2765                &current_view_info_value,
2766                new_logical_plan.clone(),
2767                new_table_names.clone(),
2768                new_columns.clone(),
2769                new_plan_columns.clone(),
2770                new_definition.to_string(),
2771            )
2772            .await
2773            .unwrap();
2774        // if table info was updated, it should be ok.
2775        table_metadata_manager
2776            .update_view_info(
2777                view_id,
2778                &current_view_info_value,
2779                new_logical_plan.clone(),
2780                new_table_names.clone(),
2781                new_columns.clone(),
2782                new_plan_columns.clone(),
2783                new_definition.to_string(),
2784            )
2785            .await
2786            .unwrap();
2787
2788        // updated view_info should equal the `new_logical_plan`
2789        let updated_view_info = table_metadata_manager
2790            .view_info_manager()
2791            .get(view_id)
2792            .await
2793            .unwrap()
2794            .unwrap()
2795            .into_inner();
2796        assert_eq!(updated_view_info.view_info, new_logical_plan);
2797        assert_eq!(updated_view_info.table_names, new_table_names);
2798        assert_eq!(updated_view_info.definition, new_definition);
2799        assert_eq!(updated_view_info.columns, new_columns);
2800        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2801
2802        let wrong_view_info = logical_plan.clone();
2803        let wrong_definition = "wrong_definition";
2804        let wrong_view_info_value =
2805            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2806                wrong_view_info,
2807                new_table_names.clone(),
2808                new_columns.clone(),
2809                new_plan_columns.clone(),
2810                wrong_definition.to_string(),
2811            ));
2812        // if the current_view_info_value is wrong, it should return an error.
2813        // The ABA problem.
2814        assert!(
2815            table_metadata_manager
2816                .update_view_info(
2817                    view_id,
2818                    &wrong_view_info_value,
2819                    new_logical_plan.clone(),
2820                    new_table_names.clone(),
2821                    vec!["c".to_string()],
2822                    vec!["number3".to_string()],
2823                    wrong_definition.to_string(),
2824                )
2825                .await
2826                .is_err()
2827        );
2828
2829        // The view_info is not changed.
2830        let current_view_info = table_metadata_manager
2831            .view_info_manager()
2832            .get(view_id)
2833            .await
2834            .unwrap()
2835            .unwrap()
2836            .into_inner();
2837        assert_eq!(current_view_info.view_info, new_logical_plan);
2838        assert_eq!(current_view_info.table_names, new_table_names);
2839        assert_eq!(current_view_info.definition, new_definition);
2840        assert_eq!(current_view_info.columns, new_columns);
2841        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2842    }
2843
2844    #[test]
2845    fn test_region_role_set_deserialize() {
2846        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2847        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2848        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2849        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2850
2851        let s = r#"[1, 2, 3]"#;
2852        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2853        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2854        assert!(region_role_set.follower_regions.is_empty());
2855    }
2856
2857    #[test]
2858    fn test_region_distribution_deserialize() {
2859        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2860        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2861        assert_eq!(region_distribution.len(), 2);
2862        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2863        assert!(region_distribution[&1].follower_regions.is_empty());
2864        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2865        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2866    }
2867}