common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{RawTableInfo, TableId};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186// The legacy topic key prefix is used to store the topic name in previous versions.
187pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190/// The election key.
191pub const ELECTION_KEY: &str = "__metasrv_election";
192/// The root key of metasrv election candidates.
193pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195/// The keys with these prefixes will be loaded into the cache when the leader starts.
196pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197    TABLE_NAME_KEY_PREFIX,
198    CATALOG_NAME_KEY_PREFIX,
199    SCHEMA_NAME_KEY_PREFIX,
200    TABLE_ROUTE_PREFIX,
201    NODE_ADDRESS_PREFIX,
202];
203
204/// A set of regions with the same role.
205#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207    /// Leader regions.
208    pub leader_regions: Vec<RegionNumber>,
209    /// Follower regions.
210    pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215    where
216        D: serde::Deserializer<'de>,
217    {
218        #[derive(Deserialize)]
219        #[serde(untagged)]
220        enum RegionRoleSetOrLeaderOnly {
221            Full {
222                leader_regions: Vec<RegionNumber>,
223                follower_regions: Vec<RegionNumber>,
224            },
225            LeaderOnly(Vec<RegionNumber>),
226        }
227        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228            RegionRoleSetOrLeaderOnly::Full {
229                leader_regions,
230                follower_regions,
231            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233                Ok(RegionRoleSet::new(leader_regions, vec![]))
234            }
235        }
236    }
237}
238
239impl RegionRoleSet {
240    /// Create a new region role set.
241    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242        Self {
243            leader_regions,
244            follower_regions,
245        }
246    }
247
248    /// Add a leader region to the set.
249    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250        self.leader_regions.push(region_number);
251    }
252
253    /// Add a follower region to the set.
254    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255        self.follower_regions.push(region_number);
256    }
257
258    /// Sort the regions.
259    pub fn sort(&mut self) {
260        self.follower_regions.sort();
261        self.leader_regions.sort();
262    }
263}
264
265/// The distribution of regions.
266///
267/// The key is the datanode id, the value is the region role set.
268pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270/// The id of flow.
271pub type FlowId = u32;
272/// The partition of flow.
273pub type FlowPartitionId = u32;
274
275lazy_static! {
276    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280    static ref TABLE_INFO_KEY_PATTERN: Regex =
281        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285    static ref VIEW_INFO_KEY_PATTERN: Regex =
286        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307    ))
308    .unwrap();
309}
310
311lazy_static! {
312    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
313    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315    ))
316        .unwrap();
317}
318
319lazy_static! {
320    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
321    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323    ))
324        .unwrap();
325}
326
327lazy_static! {
328    static ref NODE_ADDRESS_PATTERN: Regex =
329        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340    ))
341    .unwrap();
342}
343
344/// The key of metadata.
345pub trait MetadataKey<'a, T> {
346    fn to_bytes(&self) -> Vec<u8>;
347
348    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355    fn from(value: Vec<u8>) -> Self {
356        Self(value)
357    }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361    fn to_bytes(&self) -> Vec<u8> {
362        self.0.clone()
363    }
364
365    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366        Ok(BytesAdapter(bytes.to_vec()))
367    }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371    fn build_get_op(
372        &self,
373    ) -> (
374        TxnOp,
375        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376    );
377}
378
379pub trait MetadataValue {
380    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381    where
382        Self: Sized;
383
384    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390    table_name_manager: TableNameManager,
391    table_info_manager: TableInfoManager,
392    view_info_manager: ViewInfoManager,
393    datanode_table_manager: DatanodeTableManager,
394    catalog_manager: CatalogManager,
395    schema_manager: SchemaManager,
396    table_route_manager: TableRouteManager,
397    table_repart_manager: TableRepartManager,
398    tombstone_manager: TombstoneManager,
399    topic_name_manager: TopicNameManager,
400    topic_region_manager: TopicRegionManager,
401    kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406    ($got:expr, $expected_value:expr, $name:expr) => {
407        ensure!(
408            $got == $expected_value,
409            error::UnexpectedSnafu {
410                err_msg: format!(
411                    "Reads the different value: {:?} during {}, expected: {:?}",
412                    $got, $name, $expected_value
413                )
414            }
415        );
416    };
417}
418
419/// A struct containing a deserialized value(`inner`) and an original bytes.
420///
421/// - Serialize behaviors:
422///
423/// The `inner` field will be ignored.
424///
425/// - Deserialize behaviors:
426///
427/// The `inner` field will be deserialized from the `bytes` field.
428pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429    // The original bytes of the inner.
430    bytes: Bytes,
431    // The value was deserialized from the original bytes.
432    inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436    type Target = T;
437
438    fn deref(&self) -> &Self::Target {
439        &self.inner
440    }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444    fn deref_mut(&mut self) -> &mut Self::Target {
445        &mut self.inner
446    }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(
452            f,
453            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454            self.inner, self.bytes
455        )
456    }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460    /// - Serialize behaviors:
461    ///
462    /// The `inner` field will be ignored.
463    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464    where
465        S: serde::Serializer,
466    {
467        // Safety: The original bytes are always JSON encoded.
468        // It's more efficiently than `serialize_bytes`.
469        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470    }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474    for DeserializedValueWithBytes<T>
475{
476    /// - Deserialize behaviors:
477    ///
478    /// The `inner` field will be deserialized from the `bytes` field.
479    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480    where
481        D: serde::Deserializer<'de>,
482    {
483        let buf = String::deserialize(deserializer)?;
484        let bytes = Bytes::from(buf);
485
486        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489        Ok(value)
490    }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494    fn clone(&self) -> Self {
495        Self {
496            bytes: self.bytes.clone(),
497            inner: self.inner.clone(),
498        }
499    }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503    /// Returns a struct containing a deserialized value and an original `bytes`.
504    /// It accepts original bytes of inner.
505    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506        let inner = T::try_from_raw_value(&bytes)?;
507        Ok(Self { bytes, inner })
508    }
509
510    /// Returns a struct containing a deserialized value and an original `bytes`.
511    /// It accepts original bytes of inner.
512    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514    }
515
516    pub fn into_inner(self) -> T {
517        self.inner
518    }
519
520    pub fn get_inner_ref(&self) -> &T {
521        &self.inner
522    }
523
524    /// Returns original `bytes`
525    pub fn get_raw_bytes(&self) -> Vec<u8> {
526        self.bytes.to_vec()
527    }
528
529    #[cfg(any(test, feature = "testing"))]
530    pub fn from_inner(inner: T) -> Self {
531        let bytes = serde_json::to_vec(&inner).unwrap();
532
533        Self {
534            bytes: Bytes::from(bytes),
535            inner,
536        }
537    }
538}
539
540impl TableMetadataManager {
541    pub fn new(kv_backend: KvBackendRef) -> Self {
542        TableMetadataManager {
543            table_name_manager: TableNameManager::new(kv_backend.clone()),
544            table_info_manager: TableInfoManager::new(kv_backend.clone()),
545            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547            catalog_manager: CatalogManager::new(kv_backend.clone()),
548            schema_manager: SchemaManager::new(kv_backend.clone()),
549            table_route_manager: TableRouteManager::new(kv_backend.clone()),
550            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554            kv_backend,
555        }
556    }
557
558    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
559    pub fn new_with_custom_tombstone_prefix(
560        kv_backend: KvBackendRef,
561        tombstone_prefix: &str,
562    ) -> Self {
563        Self {
564            table_name_manager: TableNameManager::new(kv_backend.clone()),
565            table_info_manager: TableInfoManager::new(kv_backend.clone()),
566            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568            catalog_manager: CatalogManager::new(kv_backend.clone()),
569            schema_manager: SchemaManager::new(kv_backend.clone()),
570            table_route_manager: TableRouteManager::new(kv_backend.clone()),
571            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572            tombstone_manager: TombstoneManager::new_with_prefix(
573                kv_backend.clone(),
574                tombstone_prefix,
575            ),
576            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578            kv_backend,
579        }
580    }
581
582    pub async fn init(&self) -> Result<()> {
583        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585        self.catalog_manager().create(catalog_name, true).await?;
586
587        let internal_schemas = [
588            DEFAULT_SCHEMA_NAME,
589            INFORMATION_SCHEMA_NAME,
590            DEFAULT_PRIVATE_SCHEMA_NAME,
591        ];
592
593        for schema_name in internal_schemas {
594            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596            self.schema_manager().create(schema_key, None, true).await?;
597        }
598
599        Ok(())
600    }
601
602    pub fn table_name_manager(&self) -> &TableNameManager {
603        &self.table_name_manager
604    }
605
606    pub fn table_info_manager(&self) -> &TableInfoManager {
607        &self.table_info_manager
608    }
609
610    pub fn view_info_manager(&self) -> &ViewInfoManager {
611        &self.view_info_manager
612    }
613
614    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615        &self.datanode_table_manager
616    }
617
618    pub fn catalog_manager(&self) -> &CatalogManager {
619        &self.catalog_manager
620    }
621
622    pub fn schema_manager(&self) -> &SchemaManager {
623        &self.schema_manager
624    }
625
626    pub fn table_route_manager(&self) -> &TableRouteManager {
627        &self.table_route_manager
628    }
629
630    pub fn table_repart_manager(&self) -> &TableRepartManager {
631        &self.table_repart_manager
632    }
633
634    pub fn topic_name_manager(&self) -> &TopicNameManager {
635        &self.topic_name_manager
636    }
637
638    pub fn topic_region_manager(&self) -> &TopicRegionManager {
639        &self.topic_region_manager
640    }
641
642    pub fn kv_backend(&self) -> &KvBackendRef {
643        &self.kv_backend
644    }
645
646    pub async fn get_full_table_info(
647        &self,
648        table_id: TableId,
649    ) -> Result<(
650        Option<DeserializedValueWithBytes<TableInfoValue>>,
651        Option<DeserializedValueWithBytes<TableRouteValue>>,
652    )> {
653        let table_info_key = TableInfoKey::new(table_id);
654        let table_route_key = TableRouteKey::new(table_id);
655        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659        let mut res = self.kv_backend.txn(txn).await?;
660        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663        Ok((table_info_value, table_route_value))
664    }
665
666    /// Creates metadata for view and returns an error if different metadata exists.
667    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
668    /// Parameters include:
669    /// - `view_info`: the encoded logical plan
670    /// - `table_names`: the resolved fully table names in logical plan
671    /// - `columns`: the view columns
672    /// - `plan_columns`: the original plan columns
673    /// - `definition`: The SQL to create the view
674    ///
675    pub async fn create_view_metadata(
676        &self,
677        view_info: RawTableInfo,
678        raw_logical_plan: Vec<u8>,
679        table_names: HashSet<TableName>,
680        columns: Vec<String>,
681        plan_columns: Vec<String>,
682        definition: String,
683    ) -> Result<()> {
684        let view_id = view_info.ident.table_id;
685
686        // Creates view name.
687        let view_name = TableNameKey::new(
688            &view_info.catalog_name,
689            &view_info.schema_name,
690            &view_info.name,
691        );
692        let create_table_name_txn = self
693            .table_name_manager()
694            .build_create_txn(&view_name, view_id)?;
695
696        // Creates table info.
697        let table_info_value = TableInfoValue::new(view_info);
698
699        let (create_table_info_txn, on_create_table_info_failure) = self
700            .table_info_manager()
701            .build_create_txn(view_id, &table_info_value)?;
702
703        // Creates view info
704        let view_info_value = ViewInfoValue::new(
705            raw_logical_plan,
706            table_names,
707            columns,
708            plan_columns,
709            definition,
710        );
711        let (create_view_info_txn, on_create_view_info_failure) = self
712            .view_info_manager()
713            .build_create_txn(view_id, &view_info_value)?;
714
715        let txn = Txn::merge_all(vec![
716            create_table_name_txn,
717            create_table_info_txn,
718            create_view_info_txn,
719        ]);
720
721        let mut r = self.kv_backend.txn(txn).await?;
722
723        // Checks whether metadata was already created.
724        if !r.succeeded {
725            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726            let remote_table_info = on_create_table_info_failure(&mut set)?
727                .context(error::UnexpectedSnafu {
728                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729                })?
730                .into_inner();
731
732            let remote_view_info = on_create_view_info_failure(&mut set)?
733                .context(error::UnexpectedSnafu {
734                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735                })?
736                .into_inner();
737
738            let op_name = "the creating view metadata";
739            ensure_values!(remote_table_info, table_info_value, op_name);
740            ensure_values!(remote_view_info, view_info_value, op_name);
741        }
742
743        Ok(())
744    }
745
746    /// Creates metadata for table and returns an error if different metadata exists.
747    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
748    pub async fn create_table_metadata(
749        &self,
750        table_info: RawTableInfo,
751        table_route_value: TableRouteValue,
752        region_wal_options: HashMap<RegionNumber, String>,
753    ) -> Result<()> {
754        let table_id = table_info.ident.table_id;
755        let engine = table_info.meta.engine.clone();
756
757        // Creates table name.
758        let table_name = TableNameKey::new(
759            &table_info.catalog_name,
760            &table_info.schema_name,
761            &table_info.name,
762        );
763        let create_table_name_txn = self
764            .table_name_manager()
765            .build_create_txn(&table_name, table_id)?;
766
767        let region_options = table_info.to_region_options();
768        // Creates table info.
769        let table_info_value = TableInfoValue::new(table_info);
770        let (create_table_info_txn, on_create_table_info_failure) = self
771            .table_info_manager()
772            .build_create_txn(table_id, &table_info_value)?;
773
774        let (create_table_route_txn, on_create_table_route_failure) = self
775            .table_route_manager()
776            .table_route_storage()
777            .build_create_txn(table_id, &table_route_value)?;
778
779        let create_topic_region_txn = self
780            .topic_region_manager
781            .build_create_txn(table_id, &region_wal_options)?;
782
783        let mut txn = Txn::merge_all(vec![
784            create_table_name_txn,
785            create_table_info_txn,
786            create_table_route_txn,
787            create_topic_region_txn,
788        ]);
789
790        if let TableRouteValue::Physical(x) = &table_route_value {
791            let region_storage_path = table_info_value.region_storage_path();
792            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793                table_id,
794                &engine,
795                &region_storage_path,
796                region_options,
797                region_wal_options,
798                region_distribution(&x.region_routes),
799            )?;
800            txn = txn.merge(create_datanode_table_txn);
801        }
802
803        let mut r = self.kv_backend.txn(txn).await?;
804
805        // Checks whether metadata was already created.
806        if !r.succeeded {
807            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808            let remote_table_info = on_create_table_info_failure(&mut set)?
809                .context(error::UnexpectedSnafu {
810                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811                })?
812                .into_inner();
813
814            let remote_table_route = on_create_table_route_failure(&mut set)?
815                .context(error::UnexpectedSnafu {
816                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817                })?
818                .into_inner();
819
820            let op_name = "the creating table metadata";
821            ensure_values!(remote_table_info, table_info_value, op_name);
822            ensure_values!(remote_table_route, table_route_value, op_name);
823        }
824
825        Ok(())
826    }
827
828    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829        // The batch size is max_txn_size / 3 because the size of the `tables_data`
830        // is 3 times the size of the `tables_data`.
831        self.kv_backend.max_txn_ops() / 3
832    }
833
834    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
835    pub async fn create_logical_tables_metadata(
836        &self,
837        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
838    ) -> Result<()> {
839        let len = tables_data.len();
840        let mut txns = Vec::with_capacity(3 * len);
841        struct OnFailure<F1, R1, F2, R2>
842        where
843            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845        {
846            table_info_value: TableInfoValue,
847            on_create_table_info_failure: F1,
848            table_route_value: TableRouteValue,
849            on_create_table_route_failure: F2,
850        }
851        let mut on_failures = Vec::with_capacity(len);
852        for (table_info, table_route_value) in tables_data {
853            let table_id = table_info.ident.table_id;
854
855            // Creates table name.
856            let table_name = TableNameKey::new(
857                &table_info.catalog_name,
858                &table_info.schema_name,
859                &table_info.name,
860            );
861            let create_table_name_txn = self
862                .table_name_manager()
863                .build_create_txn(&table_name, table_id)?;
864            txns.push(create_table_name_txn);
865
866            // Creates table info.
867            let table_info_value = TableInfoValue::new(table_info);
868            let (create_table_info_txn, on_create_table_info_failure) =
869                self.table_info_manager()
870                    .build_create_txn(table_id, &table_info_value)?;
871            txns.push(create_table_info_txn);
872
873            let (create_table_route_txn, on_create_table_route_failure) = self
874                .table_route_manager()
875                .table_route_storage()
876                .build_create_txn(table_id, &table_route_value)?;
877            txns.push(create_table_route_txn);
878
879            on_failures.push(OnFailure {
880                table_info_value,
881                on_create_table_info_failure,
882                table_route_value,
883                on_create_table_route_failure,
884            });
885        }
886
887        let txn = Txn::merge_all(txns);
888        let mut r = self.kv_backend.txn(txn).await?;
889
890        // Checks whether metadata was already created.
891        if !r.succeeded {
892            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893            for on_failure in on_failures {
894                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895                    .context(error::UnexpectedSnafu {
896                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897                    })?
898                    .into_inner();
899
900                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901                    .context(error::UnexpectedSnafu {
902                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903                    })?
904                    .into_inner();
905
906                let op_name = "the creating logical tables metadata";
907                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909            }
910        }
911
912        Ok(())
913    }
914
915    fn table_metadata_keys(
916        &self,
917        table_id: TableId,
918        table_name: &TableName,
919        table_route_value: &TableRouteValue,
920        region_wal_options: &HashMap<RegionNumber, WalOptions>,
921    ) -> Result<Vec<Vec<u8>>> {
922        // Builds keys
923        let datanode_ids = if table_route_value.is_physical() {
924            region_distribution(table_route_value.region_routes()?)
925                .into_keys()
926                .collect()
927        } else {
928            vec![]
929        };
930        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931        let table_name = TableNameKey::new(
932            &table_name.catalog_name,
933            &table_name.schema_name,
934            &table_name.table_name,
935        );
936        let table_info_key = TableInfoKey::new(table_id);
937        let table_route_key = TableRouteKey::new(table_id);
938        let table_repart_key = TableRepartKey::new(table_id);
939        let datanode_table_keys = datanode_ids
940            .into_iter()
941            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942            .collect::<HashSet<_>>();
943        let topic_region_map = self
944            .topic_region_manager
945            .get_topic_region_mapping(table_id, region_wal_options);
946        let topic_region_keys = topic_region_map
947            .iter()
948            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949            .collect::<Vec<_>>();
950        keys.push(table_name.to_bytes());
951        keys.push(table_info_key.to_bytes());
952        keys.push(table_route_key.to_bytes());
953        keys.push(table_repart_key.to_bytes());
954        for key in &datanode_table_keys {
955            keys.push(key.to_bytes());
956        }
957        for key in topic_region_keys {
958            keys.push(key.to_bytes());
959        }
960        Ok(keys)
961    }
962
963    /// Deletes metadata for table **logically**.
964    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
965    pub async fn delete_table_metadata(
966        &self,
967        table_id: TableId,
968        table_name: &TableName,
969        table_route_value: &TableRouteValue,
970        region_wal_options: &HashMap<RegionNumber, WalOptions>,
971    ) -> Result<()> {
972        let keys =
973            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974        self.tombstone_manager.create(keys).await.map(|_| ())
975    }
976
977    /// Deletes metadata tombstone for table **permanently**.
978    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
979    pub async fn delete_table_metadata_tombstone(
980        &self,
981        table_id: TableId,
982        table_name: &TableName,
983        table_route_value: &TableRouteValue,
984        region_wal_options: &HashMap<RegionNumber, WalOptions>,
985    ) -> Result<()> {
986        let table_metadata_keys =
987            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988        self.tombstone_manager
989            .delete(table_metadata_keys)
990            .await
991            .map(|_| ())
992    }
993
994    /// Restores metadata for table.
995    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
996    pub async fn restore_table_metadata(
997        &self,
998        table_id: TableId,
999        table_name: &TableName,
1000        table_route_value: &TableRouteValue,
1001        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002    ) -> Result<()> {
1003        let keys =
1004            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005        self.tombstone_manager.restore(keys).await.map(|_| ())
1006    }
1007
1008    /// Deletes metadata for table **permanently**.
1009    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1010    pub async fn destroy_table_metadata(
1011        &self,
1012        table_id: TableId,
1013        table_name: &TableName,
1014        table_route_value: &TableRouteValue,
1015        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016    ) -> Result<()> {
1017        let keys =
1018            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019        let _ = self
1020            .kv_backend
1021            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022            .await?;
1023        Ok(())
1024    }
1025
1026    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027        let mut keys = Vec::with_capacity(3);
1028        let view_name = TableNameKey::new(
1029            &view_name.catalog_name,
1030            &view_name.schema_name,
1031            &view_name.table_name,
1032        );
1033        let table_info_key = TableInfoKey::new(view_id);
1034        let view_info_key = ViewInfoKey::new(view_id);
1035        keys.push(view_name.to_bytes());
1036        keys.push(table_info_key.to_bytes());
1037        keys.push(view_info_key.to_bytes());
1038
1039        Ok(keys)
1040    }
1041
1042    /// Deletes metadata for view **permanently**.
1043    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1044    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045        let keys = self.view_info_keys(view_id, view_name)?;
1046        let _ = self
1047            .kv_backend
1048            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049            .await?;
1050        Ok(())
1051    }
1052
1053    /// Renames the table name and returns an error if different metadata exists.
1054    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1055    /// and the new `TableNameKey` MUST be empty.
1056    pub async fn rename_table(
1057        &self,
1058        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059        new_table_name: String,
1060    ) -> Result<()> {
1061        let current_table_info = &current_table_info_value.table_info;
1062        let table_id = current_table_info.ident.table_id;
1063
1064        let table_name_key = TableNameKey::new(
1065            &current_table_info.catalog_name,
1066            &current_table_info.schema_name,
1067            &current_table_info.name,
1068        );
1069
1070        let new_table_name_key = TableNameKey::new(
1071            &current_table_info.catalog_name,
1072            &current_table_info.schema_name,
1073            &new_table_name,
1074        );
1075
1076        // Updates table name.
1077        let update_table_name_txn = self.table_name_manager().build_update_txn(
1078            &table_name_key,
1079            &new_table_name_key,
1080            table_id,
1081        )?;
1082
1083        let new_table_info_value = current_table_info_value
1084            .inner
1085            .with_update(move |table_info| {
1086                table_info.name = new_table_name;
1087            });
1088
1089        // Updates table info.
1090        let (update_table_info_txn, on_update_table_info_failure) = self
1091            .table_info_manager()
1092            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096        let mut r = self.kv_backend.txn(txn).await?;
1097
1098        // Checks whether metadata was already updated.
1099        if !r.succeeded {
1100            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101            let remote_table_info = on_update_table_info_failure(&mut set)?
1102                .context(error::UnexpectedSnafu {
1103                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104                })?
1105                .into_inner();
1106
1107            let op_name = "the renaming table metadata";
1108            ensure_values!(remote_table_info, new_table_info_value, op_name);
1109        }
1110
1111        Ok(())
1112    }
1113
1114    /// Updates table info and returns an error if different metadata exists.
1115    /// And cascade-ly update all redundant table options for each region
1116    /// if region_distribution is present.
1117    pub async fn update_table_info(
1118        &self,
1119        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120        region_distribution: Option<RegionDistribution>,
1121        new_table_info: RawTableInfo,
1122    ) -> Result<()> {
1123        let table_id = current_table_info_value.table_info.ident.table_id;
1124        let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126        // Updates table info.
1127        let (update_table_info_txn, on_update_table_info_failure) = self
1128            .table_info_manager()
1129            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131        let txn = if let Some(region_distribution) = region_distribution {
1132            // region options induced from table info.
1133            let new_region_options = new_table_info_value.table_info.to_region_options();
1134            let update_datanode_table_options_txn = self
1135                .datanode_table_manager
1136                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137                .await?;
1138            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139        } else {
1140            update_table_info_txn
1141        };
1142
1143        let mut r = self.kv_backend.txn(txn).await?;
1144        // Checks whether metadata was already updated.
1145        if !r.succeeded {
1146            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147            let remote_table_info = on_update_table_info_failure(&mut set)?
1148                .context(error::UnexpectedSnafu {
1149                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150                })?
1151                .into_inner();
1152
1153            let op_name = "the updating table info";
1154            ensure_values!(remote_table_info, new_table_info_value, op_name);
1155        }
1156        Ok(())
1157    }
1158
1159    /// Updates view info and returns an error if different metadata exists.
1160    /// Parameters include:
1161    /// - `view_id`: the view id
1162    /// - `current_view_info_value`: the current view info for CAS checking
1163    /// - `new_view_info`: the encoded logical plan
1164    /// - `table_names`: the resolved fully table names in logical plan
1165    /// - `columns`: the view columns
1166    /// - `plan_columns`: the original plan columns
1167    /// - `definition`: The SQL to create the view
1168    ///
1169    #[allow(clippy::too_many_arguments)]
1170    pub async fn update_view_info(
1171        &self,
1172        view_id: TableId,
1173        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174        new_view_info: Vec<u8>,
1175        table_names: HashSet<TableName>,
1176        columns: Vec<String>,
1177        plan_columns: Vec<String>,
1178        definition: String,
1179    ) -> Result<()> {
1180        let new_view_info_value = current_view_info_value.update(
1181            new_view_info,
1182            table_names,
1183            columns,
1184            plan_columns,
1185            definition,
1186        );
1187
1188        // Updates view info.
1189        let (update_view_info_txn, on_update_view_info_failure) = self
1190            .view_info_manager()
1191            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195        // Checks whether metadata was already updated.
1196        if !r.succeeded {
1197            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198            let remote_view_info = on_update_view_info_failure(&mut set)?
1199                .context(error::UnexpectedSnafu {
1200                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201                })?
1202                .into_inner();
1203
1204            let op_name = "the updating view info";
1205            ensure_values!(remote_view_info, new_view_info_value, op_name);
1206        }
1207        Ok(())
1208    }
1209
1210    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211        self.kv_backend.max_txn_ops()
1212    }
1213
1214    pub async fn batch_update_table_info_values(
1215        &self,
1216        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1217    ) -> Result<()> {
1218        let len = table_info_value_pairs.len();
1219        let mut txns = Vec::with_capacity(len);
1220        struct OnFailure<F, R>
1221        where
1222            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223        {
1224            table_info_value: TableInfoValue,
1225            on_update_table_info_failure: F,
1226        }
1227        let mut on_failures = Vec::with_capacity(len);
1228
1229        for (table_info_value, new_table_info) in table_info_value_pairs {
1230            let table_id = table_info_value.table_info.ident.table_id;
1231
1232            let new_table_info_value = table_info_value.update(new_table_info);
1233
1234            let (update_table_info_txn, on_update_table_info_failure) =
1235                self.table_info_manager().build_update_txn(
1236                    table_id,
1237                    &table_info_value,
1238                    &new_table_info_value,
1239                )?;
1240
1241            txns.push(update_table_info_txn);
1242
1243            on_failures.push(OnFailure {
1244                table_info_value: new_table_info_value,
1245                on_update_table_info_failure,
1246            });
1247        }
1248
1249        let txn = Txn::merge_all(txns);
1250        let mut r = self.kv_backend.txn(txn).await?;
1251
1252        if !r.succeeded {
1253            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254            for on_failure in on_failures {
1255                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256                    .context(error::UnexpectedSnafu {
1257                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258                    })?
1259                    .into_inner();
1260
1261                let op_name = "the batch updating table info";
1262                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268
1269    pub async fn update_table_route(
1270        &self,
1271        table_id: TableId,
1272        region_info: RegionInfo,
1273        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274        new_region_routes: Vec<RegionRoute>,
1275        new_region_options: &HashMap<String, String>,
1276        new_region_wal_options: &HashMap<RegionNumber, String>,
1277    ) -> Result<()> {
1278        // Updates the datanode table key value pairs.
1279        let current_region_distribution =
1280            region_distribution(current_table_route_value.region_routes()?);
1281        let new_region_distribution = region_distribution(&new_region_routes);
1282
1283        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1284            table_id,
1285            region_info,
1286            current_region_distribution,
1287            new_region_distribution,
1288            new_region_options,
1289            new_region_wal_options,
1290        )?;
1291
1292        // Updates the table_route.
1293        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1294
1295        let (update_table_route_txn, on_update_table_route_failure) = self
1296            .table_route_manager()
1297            .table_route_storage()
1298            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1299
1300        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1301
1302        let mut r = self.kv_backend.txn(txn).await?;
1303
1304        // Checks whether metadata was already updated.
1305        if !r.succeeded {
1306            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1307            let remote_table_route = on_update_table_route_failure(&mut set)?
1308                .context(error::UnexpectedSnafu {
1309                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1310                })?
1311                .into_inner();
1312
1313            let op_name = "the updating table route";
1314            ensure_values!(remote_table_route, new_table_route_value, op_name);
1315        }
1316
1317        Ok(())
1318    }
1319
1320    /// Updates the leader status of the [RegionRoute].
1321    pub async fn update_leader_region_status<F>(
1322        &self,
1323        table_id: TableId,
1324        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1325        next_region_route_status: F,
1326    ) -> Result<()>
1327    where
1328        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1329    {
1330        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1331
1332        let mut updated = 0;
1333        for route in &mut new_region_routes {
1334            if let Some(state) = next_region_route_status(route)
1335                && route.set_leader_state(state)
1336            {
1337                updated += 1;
1338            }
1339        }
1340
1341        if updated == 0 {
1342            warn!("No leader status updated");
1343            return Ok(());
1344        }
1345
1346        // Updates the table_route.
1347        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1348
1349        let (update_table_route_txn, on_update_table_route_failure) = self
1350            .table_route_manager()
1351            .table_route_storage()
1352            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1353
1354        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1355
1356        // Checks whether metadata was already updated.
1357        if !r.succeeded {
1358            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1359            let remote_table_route = on_update_table_route_failure(&mut set)?
1360                .context(error::UnexpectedSnafu {
1361                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1362                })?
1363                .into_inner();
1364
1365            let op_name = "the updating leader region status";
1366            ensure_values!(remote_table_route, new_table_route_value, op_name);
1367        }
1368
1369        Ok(())
1370    }
1371}
1372
1373#[macro_export]
1374macro_rules! impl_metadata_value {
1375    ($($val_ty: ty), *) => {
1376        $(
1377            impl $crate::key::MetadataValue for $val_ty {
1378                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1379                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1380                }
1381
1382                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1383                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1384                }
1385            }
1386        )*
1387    }
1388}
1389
1390macro_rules! impl_metadata_key_get_txn_op {
1391    ($($key: ty), *) => {
1392        $(
1393            impl $crate::key::MetadataKeyGetTxnOp for $key {
1394                /// Returns a [TxnOp] to retrieve the corresponding value
1395                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1396                fn build_get_op(
1397                    &self,
1398                ) -> (
1399                    TxnOp,
1400                    impl for<'a> FnMut(
1401                        &'a mut TxnOpGetResponseSet,
1402                    ) -> Option<Vec<u8>>,
1403                ) {
1404                    let raw_key = self.to_bytes();
1405                    (
1406                        TxnOp::Get(raw_key.clone()),
1407                        TxnOpGetResponseSet::filter(raw_key),
1408                    )
1409                }
1410            }
1411        )*
1412    }
1413}
1414
1415impl_metadata_key_get_txn_op! {
1416    TableNameKey<'_>,
1417    TableInfoKey,
1418    ViewInfoKey,
1419    TableRouteKey,
1420    DatanodeTableKey
1421}
1422
1423#[macro_export]
1424macro_rules! impl_optional_metadata_value {
1425    ($($val_ty: ty), *) => {
1426        $(
1427            impl $val_ty {
1428                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1429                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1430                }
1431
1432                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1433                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1434                }
1435            }
1436        )*
1437    }
1438}
1439
1440impl_metadata_value! {
1441    TableNameValue,
1442    TableInfoValue,
1443    ViewInfoValue,
1444    DatanodeTableValue,
1445    FlowInfoValue,
1446    FlowNameValue,
1447    FlowRouteValue,
1448    TableFlowValue,
1449    NodeAddressValue,
1450    SchemaNameValue,
1451    FlowStateValue,
1452    PoisonValue,
1453    TopicRegionValue
1454}
1455
1456impl_optional_metadata_value! {
1457    CatalogNameValue,
1458    SchemaNameValue
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use std::collections::{BTreeMap, HashMap, HashSet};
1464    use std::sync::Arc;
1465
1466    use bytes::Bytes;
1467    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1468    use common_time::util::current_time_millis;
1469    use common_wal::options::{KafkaWalOptions, WalOptions};
1470    use futures::TryStreamExt;
1471    use store_api::storage::{RegionId, RegionNumber};
1472    use table::metadata::{RawTableInfo, TableInfo};
1473    use table::table_name::TableName;
1474
1475    use super::datanode_table::DatanodeTableKey;
1476    use super::test_utils;
1477    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1478    use crate::ddl::test_util::create_table::test_create_table_task;
1479    use crate::ddl::utils::region_storage_path;
1480    use crate::error::Result;
1481    use crate::key::datanode_table::RegionInfo;
1482    use crate::key::table_info::TableInfoValue;
1483    use crate::key::table_name::TableNameKey;
1484    use crate::key::table_route::TableRouteValue;
1485    use crate::key::{
1486        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1487        TableMetadataManager, ViewInfoValue,
1488    };
1489    use crate::kv_backend::KvBackend;
1490    use crate::kv_backend::memory::MemoryKvBackend;
1491    use crate::peer::Peer;
1492    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1493    use crate::rpc::store::RangeRequest;
1494    use crate::wal_provider::WalProvider;
1495
1496    #[test]
1497    fn test_deserialized_value_with_bytes() {
1498        let region_route = new_test_region_route();
1499        let region_routes = vec![region_route.clone()];
1500
1501        let expected_region_routes =
1502            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1503        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1504
1505        // Serialize behaviors:
1506        // The inner field will be ignored.
1507        let value = DeserializedValueWithBytes {
1508            // ignored
1509            inner: TableRouteValue::physical(region_routes.clone()),
1510            bytes: Bytes::from(expected.clone()),
1511        };
1512
1513        let encoded = serde_json::to_vec(&value).unwrap();
1514
1515        // Deserialize behaviors:
1516        // The inner field will be deserialized from the bytes field.
1517        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1518            serde_json::from_slice(&encoded).unwrap();
1519
1520        assert_eq!(decoded.inner, expected_region_routes);
1521        assert_eq!(decoded.bytes, expected);
1522    }
1523
1524    fn new_test_region_route() -> RegionRoute {
1525        new_region_route(1, 2)
1526    }
1527
1528    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1529        RegionRoute {
1530            region: Region {
1531                id: region_id.into(),
1532                name: "r1".to_string(),
1533                partition: None,
1534                attrs: BTreeMap::new(),
1535                partition_expr: Default::default(),
1536            },
1537            leader_peer: Some(Peer::new(datanode, "a2")),
1538            follower_peers: vec![],
1539            leader_state: None,
1540            leader_down_since: None,
1541        }
1542    }
1543
1544    fn new_test_table_info() -> TableInfo {
1545        test_utils::new_test_table_info(10)
1546    }
1547
1548    fn new_test_table_names() -> HashSet<TableName> {
1549        let mut set = HashSet::new();
1550        set.insert(TableName {
1551            catalog_name: "greptime".to_string(),
1552            schema_name: "public".to_string(),
1553            table_name: "a_table".to_string(),
1554        });
1555        set.insert(TableName {
1556            catalog_name: "greptime".to_string(),
1557            schema_name: "public".to_string(),
1558            table_name: "b_table".to_string(),
1559        });
1560        set
1561    }
1562
1563    async fn create_physical_table_metadata(
1564        table_metadata_manager: &TableMetadataManager,
1565        table_info: RawTableInfo,
1566        region_routes: Vec<RegionRoute>,
1567        region_wal_options: HashMap<RegionNumber, String>,
1568    ) -> Result<()> {
1569        table_metadata_manager
1570            .create_table_metadata(
1571                table_info,
1572                TableRouteValue::physical(region_routes),
1573                region_wal_options,
1574            )
1575            .await
1576    }
1577
1578    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1579        let topics = (0..2)
1580            .map(|i| format!("greptimedb_topic{}", i))
1581            .collect::<Vec<_>>();
1582        let wal_options = topics
1583            .iter()
1584            .map(|topic| {
1585                WalOptions::Kafka(KafkaWalOptions {
1586                    topic: topic.clone(),
1587                })
1588            })
1589            .collect::<Vec<_>>();
1590
1591        (0..16)
1592            .enumerate()
1593            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1594            .collect()
1595    }
1596
1597    #[tokio::test]
1598    async fn test_raft_engine_topic_region_map() {
1599        let mem_kv = Arc::new(MemoryKvBackend::default());
1600        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1601        let region_route = new_test_region_route();
1602        let region_routes = &vec![region_route.clone()];
1603        let table_info: RawTableInfo = new_test_table_info().into();
1604        let wal_provider = WalProvider::RaftEngine;
1605        let regions: Vec<_> = (0..16).collect();
1606        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1607        create_physical_table_metadata(
1608            &table_metadata_manager,
1609            table_info.clone(),
1610            region_routes.clone(),
1611            region_wal_options.clone(),
1612        )
1613        .await
1614        .unwrap();
1615
1616        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1617        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1618        let resp = mem_kv.range(range_req).await.unwrap();
1619        // Should be empty because the topic region map is empty for raft engine.
1620        assert!(resp.kvs.is_empty());
1621    }
1622
1623    #[tokio::test]
1624    async fn test_create_table_metadata() {
1625        let mem_kv = Arc::new(MemoryKvBackend::default());
1626        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1627        let region_route = new_test_region_route();
1628        let region_routes = &vec![region_route.clone()];
1629        let table_info: RawTableInfo = new_test_table_info().into();
1630        let region_wal_options = create_mock_region_wal_options()
1631            .into_iter()
1632            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1633            .collect::<HashMap<_, _>>();
1634
1635        // creates metadata.
1636        create_physical_table_metadata(
1637            &table_metadata_manager,
1638            table_info.clone(),
1639            region_routes.clone(),
1640            region_wal_options.clone(),
1641        )
1642        .await
1643        .unwrap();
1644
1645        // if metadata was already created, it should be ok.
1646        assert!(
1647            create_physical_table_metadata(
1648                &table_metadata_manager,
1649                table_info.clone(),
1650                region_routes.clone(),
1651                region_wal_options.clone(),
1652            )
1653            .await
1654            .is_ok()
1655        );
1656
1657        let mut modified_region_routes = region_routes.clone();
1658        modified_region_routes.push(region_route.clone());
1659        // if remote metadata was exists, it should return an error.
1660        assert!(
1661            create_physical_table_metadata(
1662                &table_metadata_manager,
1663                table_info.clone(),
1664                modified_region_routes,
1665                region_wal_options.clone(),
1666            )
1667            .await
1668            .is_err()
1669        );
1670
1671        let (remote_table_info, remote_table_route) = table_metadata_manager
1672            .get_full_table_info(10)
1673            .await
1674            .unwrap();
1675
1676        assert_eq!(
1677            remote_table_info.unwrap().into_inner().table_info,
1678            table_info
1679        );
1680        assert_eq!(
1681            remote_table_route
1682                .unwrap()
1683                .into_inner()
1684                .region_routes()
1685                .unwrap(),
1686            region_routes
1687        );
1688
1689        for i in 0..2 {
1690            let region_number = i as u32;
1691            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1692            let topic = format!("greptimedb_topic{}", i);
1693            let regions = table_metadata_manager
1694                .topic_region_manager
1695                .regions(&topic)
1696                .await
1697                .unwrap()
1698                .into_keys()
1699                .collect::<Vec<_>>();
1700            assert_eq!(regions.len(), 8);
1701            assert!(regions.contains(&region_id));
1702        }
1703    }
1704
1705    #[tokio::test]
1706    async fn test_create_logic_tables_metadata() {
1707        let mem_kv = Arc::new(MemoryKvBackend::default());
1708        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1709        let region_route = new_test_region_route();
1710        let region_routes = vec![region_route.clone()];
1711        let table_info: RawTableInfo = new_test_table_info().into();
1712        let table_id = table_info.ident.table_id;
1713        let table_route_value = TableRouteValue::physical(region_routes.clone());
1714
1715        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1716        // creates metadata.
1717        table_metadata_manager
1718            .create_logical_tables_metadata(tables_data.clone())
1719            .await
1720            .unwrap();
1721
1722        // if metadata was already created, it should be ok.
1723        assert!(
1724            table_metadata_manager
1725                .create_logical_tables_metadata(tables_data)
1726                .await
1727                .is_ok()
1728        );
1729
1730        let mut modified_region_routes = region_routes.clone();
1731        modified_region_routes.push(new_region_route(2, 3));
1732        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1733        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1734        // if remote metadata was exists, it should return an error.
1735        assert!(
1736            table_metadata_manager
1737                .create_logical_tables_metadata(modified_tables_data)
1738                .await
1739                .is_err()
1740        );
1741
1742        let (remote_table_info, remote_table_route) = table_metadata_manager
1743            .get_full_table_info(table_id)
1744            .await
1745            .unwrap();
1746
1747        assert_eq!(
1748            remote_table_info.unwrap().into_inner().table_info,
1749            table_info
1750        );
1751        assert_eq!(
1752            remote_table_route
1753                .unwrap()
1754                .into_inner()
1755                .region_routes()
1756                .unwrap(),
1757            &region_routes
1758        );
1759    }
1760
1761    #[tokio::test]
1762    async fn test_create_many_logical_tables_metadata() {
1763        let kv_backend = Arc::new(MemoryKvBackend::default());
1764        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1765
1766        let mut tables_data = vec![];
1767        for i in 0..128 {
1768            let table_id = i + 1;
1769            let regin_number = table_id * 3;
1770            let region_id = RegionId::new(table_id, regin_number);
1771            let region_route = new_region_route(region_id.as_u64(), 2);
1772            let region_routes = vec![region_route.clone()];
1773            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1774                table_id,
1775                &format!("my_table_{}", table_id),
1776            )
1777            .into();
1778            let table_route_value = TableRouteValue::physical(region_routes.clone());
1779
1780            tables_data.push((table_info, table_route_value));
1781        }
1782
1783        // creates metadata.
1784        table_metadata_manager
1785            .create_logical_tables_metadata(tables_data)
1786            .await
1787            .unwrap();
1788    }
1789
1790    #[tokio::test]
1791    async fn test_delete_table_metadata() {
1792        let mem_kv = Arc::new(MemoryKvBackend::default());
1793        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1794        let region_route = new_test_region_route();
1795        let region_routes = &vec![region_route.clone()];
1796        let table_info: RawTableInfo = new_test_table_info().into();
1797        let table_id = table_info.ident.table_id;
1798        let datanode_id = 2;
1799        let region_wal_options = create_mock_region_wal_options();
1800        let serialized_region_wal_options = region_wal_options
1801            .iter()
1802            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1803            .collect::<HashMap<_, _>>();
1804
1805        // creates metadata.
1806        create_physical_table_metadata(
1807            &table_metadata_manager,
1808            table_info.clone(),
1809            region_routes.clone(),
1810            serialized_region_wal_options,
1811        )
1812        .await
1813        .unwrap();
1814
1815        let table_name = TableName::new(
1816            table_info.catalog_name,
1817            table_info.schema_name,
1818            table_info.name,
1819        );
1820        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1821        // deletes metadata.
1822        table_metadata_manager
1823            .delete_table_metadata(
1824                table_id,
1825                &table_name,
1826                table_route_value,
1827                &region_wal_options,
1828            )
1829            .await
1830            .unwrap();
1831        // Should be ignored.
1832        table_metadata_manager
1833            .delete_table_metadata(
1834                table_id,
1835                &table_name,
1836                table_route_value,
1837                &region_wal_options,
1838            )
1839            .await
1840            .unwrap();
1841        assert!(
1842            table_metadata_manager
1843                .table_info_manager()
1844                .get(table_id)
1845                .await
1846                .unwrap()
1847                .is_none()
1848        );
1849        assert!(
1850            table_metadata_manager
1851                .table_route_manager()
1852                .table_route_storage()
1853                .get(table_id)
1854                .await
1855                .unwrap()
1856                .is_none()
1857        );
1858        assert!(
1859            table_metadata_manager
1860                .datanode_table_manager()
1861                .tables(datanode_id)
1862                .try_collect::<Vec<_>>()
1863                .await
1864                .unwrap()
1865                .is_empty()
1866        );
1867        // Checks removed values
1868        let table_info = table_metadata_manager
1869            .table_info_manager()
1870            .get(table_id)
1871            .await
1872            .unwrap();
1873        assert!(table_info.is_none());
1874        let table_route = table_metadata_manager
1875            .table_route_manager()
1876            .table_route_storage()
1877            .get(table_id)
1878            .await
1879            .unwrap();
1880        assert!(table_route.is_none());
1881        // Logical delete removes the topic region mapping as well.
1882        let regions = table_metadata_manager
1883            .topic_region_manager
1884            .regions("greptimedb_topic0")
1885            .await
1886            .unwrap();
1887        assert_eq!(regions.len(), 0);
1888        let regions = table_metadata_manager
1889            .topic_region_manager
1890            .regions("greptimedb_topic1")
1891            .await
1892            .unwrap();
1893        assert_eq!(regions.len(), 0);
1894    }
1895
1896    #[tokio::test]
1897    async fn test_rename_table() {
1898        let mem_kv = Arc::new(MemoryKvBackend::default());
1899        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1900        let region_route = new_test_region_route();
1901        let region_routes = vec![region_route.clone()];
1902        let table_info: RawTableInfo = new_test_table_info().into();
1903        let table_id = table_info.ident.table_id;
1904        // creates metadata.
1905        create_physical_table_metadata(
1906            &table_metadata_manager,
1907            table_info.clone(),
1908            region_routes.clone(),
1909            HashMap::new(),
1910        )
1911        .await
1912        .unwrap();
1913
1914        let new_table_name = "another_name".to_string();
1915        let table_info_value =
1916            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1917
1918        table_metadata_manager
1919            .rename_table(&table_info_value, new_table_name.clone())
1920            .await
1921            .unwrap();
1922        // if remote metadata was updated, it should be ok.
1923        table_metadata_manager
1924            .rename_table(&table_info_value, new_table_name.clone())
1925            .await
1926            .unwrap();
1927        let mut modified_table_info = table_info.clone();
1928        modified_table_info.name = "hi".to_string();
1929        let modified_table_info_value =
1930            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1931        // if the table_info_value is wrong, it should return an error.
1932        // The ABA problem.
1933        assert!(
1934            table_metadata_manager
1935                .rename_table(&modified_table_info_value, new_table_name.clone())
1936                .await
1937                .is_err()
1938        );
1939
1940        let old_table_name = TableNameKey::new(
1941            &table_info.catalog_name,
1942            &table_info.schema_name,
1943            &table_info.name,
1944        );
1945        let new_table_name = TableNameKey::new(
1946            &table_info.catalog_name,
1947            &table_info.schema_name,
1948            &new_table_name,
1949        );
1950
1951        assert!(
1952            table_metadata_manager
1953                .table_name_manager()
1954                .get(old_table_name)
1955                .await
1956                .unwrap()
1957                .is_none()
1958        );
1959
1960        assert_eq!(
1961            table_metadata_manager
1962                .table_name_manager()
1963                .get(new_table_name)
1964                .await
1965                .unwrap()
1966                .unwrap()
1967                .table_id(),
1968            table_id
1969        );
1970    }
1971
1972    #[tokio::test]
1973    async fn test_update_table_info() {
1974        let mem_kv = Arc::new(MemoryKvBackend::default());
1975        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1976        let region_route = new_test_region_route();
1977        let region_routes = vec![region_route.clone()];
1978        let table_info: RawTableInfo = new_test_table_info().into();
1979        let table_id = table_info.ident.table_id;
1980        // creates metadata.
1981        create_physical_table_metadata(
1982            &table_metadata_manager,
1983            table_info.clone(),
1984            region_routes.clone(),
1985            HashMap::new(),
1986        )
1987        .await
1988        .unwrap();
1989
1990        let mut new_table_info = table_info.clone();
1991        new_table_info.name = "hi".to_string();
1992        let current_table_info_value =
1993            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1994        // should be ok.
1995        table_metadata_manager
1996            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1997            .await
1998            .unwrap();
1999        // if table info was updated, it should be ok.
2000        table_metadata_manager
2001            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2002            .await
2003            .unwrap();
2004
2005        // updated table_info should equal the `new_table_info`
2006        let updated_table_info = table_metadata_manager
2007            .table_info_manager()
2008            .get(table_id)
2009            .await
2010            .unwrap()
2011            .unwrap()
2012            .into_inner();
2013        assert_eq!(updated_table_info.table_info, new_table_info);
2014
2015        let mut wrong_table_info = table_info.clone();
2016        wrong_table_info.name = "wrong".to_string();
2017        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2018            current_table_info_value.update(wrong_table_info),
2019        );
2020        // if the current_table_info_value is wrong, it should return an error.
2021        // The ABA problem.
2022        assert!(
2023            table_metadata_manager
2024                .update_table_info(&wrong_table_info_value, None, new_table_info)
2025                .await
2026                .is_err()
2027        )
2028    }
2029
2030    #[tokio::test]
2031    async fn test_update_table_leader_region_status() {
2032        let mem_kv = Arc::new(MemoryKvBackend::default());
2033        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2034        let datanode = 1;
2035        let region_routes = vec![
2036            RegionRoute {
2037                region: Region {
2038                    id: 1.into(),
2039                    name: "r1".to_string(),
2040                    partition: None,
2041                    attrs: BTreeMap::new(),
2042                    partition_expr: Default::default(),
2043                },
2044                leader_peer: Some(Peer::new(datanode, "a2")),
2045                leader_state: Some(LeaderState::Downgrading),
2046                follower_peers: vec![],
2047                leader_down_since: Some(current_time_millis()),
2048            },
2049            RegionRoute {
2050                region: Region {
2051                    id: 2.into(),
2052                    name: "r2".to_string(),
2053                    partition: None,
2054                    attrs: BTreeMap::new(),
2055                    partition_expr: Default::default(),
2056                },
2057                leader_peer: Some(Peer::new(datanode, "a1")),
2058                leader_state: None,
2059                follower_peers: vec![],
2060                leader_down_since: None,
2061            },
2062        ];
2063        let table_info: RawTableInfo = new_test_table_info().into();
2064        let table_id = table_info.ident.table_id;
2065        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2066            TableRouteValue::physical(region_routes.clone()),
2067        );
2068
2069        // creates metadata.
2070        create_physical_table_metadata(
2071            &table_metadata_manager,
2072            table_info.clone(),
2073            region_routes.clone(),
2074            HashMap::new(),
2075        )
2076        .await
2077        .unwrap();
2078
2079        table_metadata_manager
2080            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2081                if region_route.leader_state.is_some() {
2082                    None
2083                } else {
2084                    Some(Some(LeaderState::Downgrading))
2085                }
2086            })
2087            .await
2088            .unwrap();
2089
2090        let updated_route_value = table_metadata_manager
2091            .table_route_manager()
2092            .table_route_storage()
2093            .get(table_id)
2094            .await
2095            .unwrap()
2096            .unwrap();
2097
2098        assert_eq!(
2099            updated_route_value.region_routes().unwrap()[0].leader_state,
2100            Some(LeaderState::Downgrading)
2101        );
2102
2103        assert!(
2104            updated_route_value.region_routes().unwrap()[0]
2105                .leader_down_since
2106                .is_some()
2107        );
2108
2109        assert_eq!(
2110            updated_route_value.region_routes().unwrap()[1].leader_state,
2111            Some(LeaderState::Downgrading)
2112        );
2113        assert!(
2114            updated_route_value.region_routes().unwrap()[1]
2115                .leader_down_since
2116                .is_some()
2117        );
2118    }
2119
2120    async fn assert_datanode_table(
2121        table_metadata_manager: &TableMetadataManager,
2122        table_id: u32,
2123        region_routes: &[RegionRoute],
2124    ) {
2125        let region_distribution = region_distribution(region_routes);
2126        for (datanode, regions) in region_distribution {
2127            let got = table_metadata_manager
2128                .datanode_table_manager()
2129                .get(&DatanodeTableKey::new(datanode, table_id))
2130                .await
2131                .unwrap()
2132                .unwrap();
2133
2134            assert_eq!(got.regions, regions.leader_regions);
2135            assert_eq!(got.follower_regions, regions.follower_regions);
2136        }
2137    }
2138
2139    #[tokio::test]
2140    async fn test_update_table_route() {
2141        let mem_kv = Arc::new(MemoryKvBackend::default());
2142        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2143        let region_route = new_test_region_route();
2144        let region_routes = vec![region_route.clone()];
2145        let table_info: RawTableInfo = new_test_table_info().into();
2146        let table_id = table_info.ident.table_id;
2147        let engine = table_info.meta.engine.as_str();
2148        let region_storage_path =
2149            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2150        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2151            TableRouteValue::physical(region_routes.clone()),
2152        );
2153
2154        // creates metadata.
2155        create_physical_table_metadata(
2156            &table_metadata_manager,
2157            table_info.clone(),
2158            region_routes.clone(),
2159            HashMap::new(),
2160        )
2161        .await
2162        .unwrap();
2163
2164        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2165        let new_region_routes = vec![
2166            new_region_route(1, 1),
2167            new_region_route(2, 2),
2168            new_region_route(3, 3),
2169        ];
2170        // it should be ok.
2171        table_metadata_manager
2172            .update_table_route(
2173                table_id,
2174                RegionInfo {
2175                    engine: engine.to_string(),
2176                    region_storage_path: region_storage_path.clone(),
2177                    region_options: HashMap::new(),
2178                    region_wal_options: HashMap::new(),
2179                },
2180                &current_table_route_value,
2181                new_region_routes.clone(),
2182                &HashMap::new(),
2183                &HashMap::new(),
2184            )
2185            .await
2186            .unwrap();
2187        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2188
2189        // if the table route was updated. it should be ok.
2190        table_metadata_manager
2191            .update_table_route(
2192                table_id,
2193                RegionInfo {
2194                    engine: engine.to_string(),
2195                    region_storage_path: region_storage_path.clone(),
2196                    region_options: HashMap::new(),
2197                    region_wal_options: HashMap::new(),
2198                },
2199                &current_table_route_value,
2200                new_region_routes.clone(),
2201                &HashMap::new(),
2202                &HashMap::new(),
2203            )
2204            .await
2205            .unwrap();
2206
2207        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2208            current_table_route_value
2209                .inner
2210                .update(new_region_routes.clone())
2211                .unwrap(),
2212        );
2213        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2214        // it should be ok.
2215        table_metadata_manager
2216            .update_table_route(
2217                table_id,
2218                RegionInfo {
2219                    engine: engine.to_string(),
2220                    region_storage_path: region_storage_path.clone(),
2221                    region_options: HashMap::new(),
2222                    region_wal_options: HashMap::new(),
2223                },
2224                &current_table_route_value,
2225                new_region_routes.clone(),
2226                &HashMap::new(),
2227                &HashMap::new(),
2228            )
2229            .await
2230            .unwrap();
2231        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2232
2233        // if the current_table_route_value is wrong, it should return an error.
2234        // The ABA problem.
2235        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2236            current_table_route_value
2237                .update(vec![
2238                    new_region_route(1, 1),
2239                    new_region_route(2, 2),
2240                    new_region_route(3, 3),
2241                    new_region_route(4, 4),
2242                ])
2243                .unwrap(),
2244        );
2245        assert!(
2246            table_metadata_manager
2247                .update_table_route(
2248                    table_id,
2249                    RegionInfo {
2250                        engine: engine.to_string(),
2251                        region_storage_path: region_storage_path.clone(),
2252                        region_options: HashMap::new(),
2253                        region_wal_options: HashMap::new(),
2254                    },
2255                    &wrong_table_route_value,
2256                    new_region_routes,
2257                    &HashMap::new(),
2258                    &HashMap::new(),
2259                )
2260                .await
2261                .is_err()
2262        );
2263    }
2264
2265    #[tokio::test]
2266    async fn test_destroy_table_metadata() {
2267        let mem_kv = Arc::new(MemoryKvBackend::default());
2268        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2269        let table_id = 1025;
2270        let table_name = "foo";
2271        let task = test_create_table_task(table_name, table_id);
2272        let options = create_mock_region_wal_options();
2273        let serialized_options = options
2274            .iter()
2275            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2276            .collect::<HashMap<_, _>>();
2277        table_metadata_manager
2278            .create_table_metadata(
2279                task.table_info,
2280                TableRouteValue::physical(vec![
2281                    RegionRoute {
2282                        region: Region::new_test(RegionId::new(table_id, 1)),
2283                        leader_peer: Some(Peer::empty(1)),
2284                        follower_peers: vec![Peer::empty(5)],
2285                        leader_state: None,
2286                        leader_down_since: None,
2287                    },
2288                    RegionRoute {
2289                        region: Region::new_test(RegionId::new(table_id, 2)),
2290                        leader_peer: Some(Peer::empty(2)),
2291                        follower_peers: vec![Peer::empty(4)],
2292                        leader_state: None,
2293                        leader_down_since: None,
2294                    },
2295                    RegionRoute {
2296                        region: Region::new_test(RegionId::new(table_id, 3)),
2297                        leader_peer: Some(Peer::empty(3)),
2298                        follower_peers: vec![],
2299                        leader_state: None,
2300                        leader_down_since: None,
2301                    },
2302                ]),
2303                serialized_options,
2304            )
2305            .await
2306            .unwrap();
2307        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2308        let table_route_value = table_metadata_manager
2309            .table_route_manager
2310            .table_route_storage()
2311            .get_with_raw_bytes(table_id)
2312            .await
2313            .unwrap()
2314            .unwrap();
2315        table_metadata_manager
2316            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2317            .await
2318            .unwrap();
2319        assert!(mem_kv.is_empty());
2320    }
2321
2322    #[tokio::test]
2323    async fn test_restore_table_metadata() {
2324        let mem_kv = Arc::new(MemoryKvBackend::default());
2325        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2326        let table_id = 1025;
2327        let table_name = "foo";
2328        let task = test_create_table_task(table_name, table_id);
2329        let options = create_mock_region_wal_options();
2330        let serialized_options = options
2331            .iter()
2332            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2333            .collect::<HashMap<_, _>>();
2334        table_metadata_manager
2335            .create_table_metadata(
2336                task.table_info,
2337                TableRouteValue::physical(vec![
2338                    RegionRoute {
2339                        region: Region::new_test(RegionId::new(table_id, 1)),
2340                        leader_peer: Some(Peer::empty(1)),
2341                        follower_peers: vec![Peer::empty(5)],
2342                        leader_state: None,
2343                        leader_down_since: None,
2344                    },
2345                    RegionRoute {
2346                        region: Region::new_test(RegionId::new(table_id, 2)),
2347                        leader_peer: Some(Peer::empty(2)),
2348                        follower_peers: vec![Peer::empty(4)],
2349                        leader_state: None,
2350                        leader_down_since: None,
2351                    },
2352                    RegionRoute {
2353                        region: Region::new_test(RegionId::new(table_id, 3)),
2354                        leader_peer: Some(Peer::empty(3)),
2355                        follower_peers: vec![],
2356                        leader_state: None,
2357                        leader_down_since: None,
2358                    },
2359                ]),
2360                serialized_options,
2361            )
2362            .await
2363            .unwrap();
2364        let expected_result = mem_kv.dump();
2365        let table_route_value = table_metadata_manager
2366            .table_route_manager
2367            .table_route_storage()
2368            .get_with_raw_bytes(table_id)
2369            .await
2370            .unwrap()
2371            .unwrap();
2372        let region_routes = table_route_value.region_routes().unwrap();
2373        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2374        let table_route_value = TableRouteValue::physical(region_routes.clone());
2375        table_metadata_manager
2376            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2377            .await
2378            .unwrap();
2379        table_metadata_manager
2380            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2381            .await
2382            .unwrap();
2383        let kvs = mem_kv.dump();
2384        assert_eq!(kvs, expected_result);
2385        // Should be ignored.
2386        table_metadata_manager
2387            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2388            .await
2389            .unwrap();
2390        let kvs = mem_kv.dump();
2391        assert_eq!(kvs, expected_result);
2392    }
2393
2394    #[tokio::test]
2395    async fn test_create_update_view_info() {
2396        let mem_kv = Arc::new(MemoryKvBackend::default());
2397        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2398
2399        let view_info: RawTableInfo = new_test_table_info().into();
2400
2401        let view_id = view_info.ident.table_id;
2402
2403        let logical_plan: Vec<u8> = vec![1, 2, 3];
2404        let columns = vec!["a".to_string()];
2405        let plan_columns = vec!["number".to_string()];
2406        let table_names = new_test_table_names();
2407        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2408
2409        // Create metadata
2410        table_metadata_manager
2411            .create_view_metadata(
2412                view_info.clone(),
2413                logical_plan.clone(),
2414                table_names.clone(),
2415                columns.clone(),
2416                plan_columns.clone(),
2417                definition.to_string(),
2418            )
2419            .await
2420            .unwrap();
2421
2422        {
2423            // assert view info
2424            let current_view_info = table_metadata_manager
2425                .view_info_manager()
2426                .get(view_id)
2427                .await
2428                .unwrap()
2429                .unwrap()
2430                .into_inner();
2431            assert_eq!(current_view_info.view_info, logical_plan);
2432            assert_eq!(current_view_info.table_names, table_names);
2433            assert_eq!(current_view_info.definition, definition);
2434            assert_eq!(current_view_info.columns, columns);
2435            assert_eq!(current_view_info.plan_columns, plan_columns);
2436            // assert table info
2437            let current_table_info = table_metadata_manager
2438                .table_info_manager()
2439                .get(view_id)
2440                .await
2441                .unwrap()
2442                .unwrap()
2443                .into_inner();
2444            assert_eq!(current_table_info.table_info, view_info);
2445        }
2446
2447        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2448        let new_table_names = {
2449            let mut set = HashSet::new();
2450            set.insert(TableName {
2451                catalog_name: "greptime".to_string(),
2452                schema_name: "public".to_string(),
2453                table_name: "b_table".to_string(),
2454            });
2455            set.insert(TableName {
2456                catalog_name: "greptime".to_string(),
2457                schema_name: "public".to_string(),
2458                table_name: "c_table".to_string(),
2459            });
2460            set
2461        };
2462        let new_columns = vec!["b".to_string()];
2463        let new_plan_columns = vec!["number2".to_string()];
2464        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2465
2466        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2467            logical_plan.clone(),
2468            table_names,
2469            columns,
2470            plan_columns,
2471            definition.to_string(),
2472        ));
2473        // should be ok.
2474        table_metadata_manager
2475            .update_view_info(
2476                view_id,
2477                &current_view_info_value,
2478                new_logical_plan.clone(),
2479                new_table_names.clone(),
2480                new_columns.clone(),
2481                new_plan_columns.clone(),
2482                new_definition.to_string(),
2483            )
2484            .await
2485            .unwrap();
2486        // if table info was updated, it should be ok.
2487        table_metadata_manager
2488            .update_view_info(
2489                view_id,
2490                &current_view_info_value,
2491                new_logical_plan.clone(),
2492                new_table_names.clone(),
2493                new_columns.clone(),
2494                new_plan_columns.clone(),
2495                new_definition.to_string(),
2496            )
2497            .await
2498            .unwrap();
2499
2500        // updated view_info should equal the `new_logical_plan`
2501        let updated_view_info = table_metadata_manager
2502            .view_info_manager()
2503            .get(view_id)
2504            .await
2505            .unwrap()
2506            .unwrap()
2507            .into_inner();
2508        assert_eq!(updated_view_info.view_info, new_logical_plan);
2509        assert_eq!(updated_view_info.table_names, new_table_names);
2510        assert_eq!(updated_view_info.definition, new_definition);
2511        assert_eq!(updated_view_info.columns, new_columns);
2512        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2513
2514        let wrong_view_info = logical_plan.clone();
2515        let wrong_definition = "wrong_definition";
2516        let wrong_view_info_value =
2517            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2518                wrong_view_info,
2519                new_table_names.clone(),
2520                new_columns.clone(),
2521                new_plan_columns.clone(),
2522                wrong_definition.to_string(),
2523            ));
2524        // if the current_view_info_value is wrong, it should return an error.
2525        // The ABA problem.
2526        assert!(
2527            table_metadata_manager
2528                .update_view_info(
2529                    view_id,
2530                    &wrong_view_info_value,
2531                    new_logical_plan.clone(),
2532                    new_table_names.clone(),
2533                    vec!["c".to_string()],
2534                    vec!["number3".to_string()],
2535                    wrong_definition.to_string(),
2536                )
2537                .await
2538                .is_err()
2539        );
2540
2541        // The view_info is not changed.
2542        let current_view_info = table_metadata_manager
2543            .view_info_manager()
2544            .get(view_id)
2545            .await
2546            .unwrap()
2547            .unwrap()
2548            .into_inner();
2549        assert_eq!(current_view_info.view_info, new_logical_plan);
2550        assert_eq!(current_view_info.table_names, new_table_names);
2551        assert_eq!(current_view_info.definition, new_definition);
2552        assert_eq!(current_view_info.columns, new_columns);
2553        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2554    }
2555
2556    #[test]
2557    fn test_region_role_set_deserialize() {
2558        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2559        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2560        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2561        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2562
2563        let s = r#"[1, 2, 3]"#;
2564        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2565        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2566        assert!(region_role_set.follower_regions.is_empty());
2567    }
2568
2569    #[test]
2570    fn test_region_distribution_deserialize() {
2571        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2572        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2573        assert_eq!(region_distribution.len(), 2);
2574        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2575        assert!(region_distribution[&1].follower_regions.is_empty());
2576        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2577        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2578    }
2579}