Skip to main content

common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use futures_util::TryStreamExt;
135use lazy_static::lazy_static;
136use regex::Regex;
137pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
138use serde::de::DeserializeOwned;
139use serde::{Deserialize, Serialize};
140use snafu::{OptionExt, ResultExt, ensure};
141use store_api::storage::RegionNumber;
142use table::metadata::{TableId, TableInfo};
143use table::table_name::TableName;
144use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
145use table_name::{TableNameKey, TableNameManager, TableNameValue};
146use topic_name::TopicNameManager;
147use topic_region::{TopicRegionKey, TopicRegionManager};
148use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
149
150use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
151use self::datanode_table::RegionInfo;
152use self::flow::flow_info::FlowInfoValue;
153use self::flow::flow_name::FlowNameValue;
154use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
155use self::table_route::{TableRouteManager, TableRouteValue};
156use self::tombstone::TombstoneManager;
157use crate::DatanodeId;
158use crate::error::{self, Result, SerdeJsonSnafu};
159use crate::key::flow::flow_state::FlowStateValue;
160use crate::key::node_address::NodeAddressValue;
161use crate::key::table_repart::{TableRepartKey, TableRepartManager};
162use crate::key::table_route::TableRouteKey;
163use crate::key::topic_region::TopicRegionValue;
164use crate::key::txn_helper::TxnOpGetResponseSet;
165use crate::kv_backend::KvBackendRef;
166use crate::kv_backend::txn::{Txn, TxnOp};
167use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
168use crate::rpc::store::BatchDeleteRequest;
169use crate::state_store::PoisonValue;
170
171pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
172pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
173pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
174pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
175pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
176
177pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
178pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
179pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
180pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
181pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
182pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
183pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
184pub const TABLE_REPART_PREFIX: &str = "__table_repart";
185pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
186pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
187// The legacy topic key prefix is used to store the topic name in previous versions.
188pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
189pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
190
191/// The election key.
192pub const ELECTION_KEY: &str = "__metasrv_election";
193/// The root key of metasrv election candidates.
194pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
195
196/// The keys with these prefixes will be loaded into the cache when the leader starts.
197pub const CACHE_KEY_PREFIXES: [&str; 5] = [
198    TABLE_NAME_KEY_PREFIX,
199    CATALOG_NAME_KEY_PREFIX,
200    SCHEMA_NAME_KEY_PREFIX,
201    TABLE_ROUTE_PREFIX,
202    NODE_ADDRESS_PREFIX,
203];
204
205/// A set of regions with the same role.
206#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
207pub struct RegionRoleSet {
208    /// Leader regions.
209    pub leader_regions: Vec<RegionNumber>,
210    /// Follower regions.
211    pub follower_regions: Vec<RegionNumber>,
212}
213
214impl<'de> Deserialize<'de> for RegionRoleSet {
215    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
216    where
217        D: serde::Deserializer<'de>,
218    {
219        #[derive(Deserialize)]
220        #[serde(untagged)]
221        enum RegionRoleSetOrLeaderOnly {
222            Full {
223                leader_regions: Vec<RegionNumber>,
224                follower_regions: Vec<RegionNumber>,
225            },
226            LeaderOnly(Vec<RegionNumber>),
227        }
228        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
229            RegionRoleSetOrLeaderOnly::Full {
230                leader_regions,
231                follower_regions,
232            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
233            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
234                Ok(RegionRoleSet::new(leader_regions, vec![]))
235            }
236        }
237    }
238}
239
240impl RegionRoleSet {
241    /// Create a new region role set.
242    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
243        Self {
244            leader_regions,
245            follower_regions,
246        }
247    }
248
249    /// Add a leader region to the set.
250    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
251        self.leader_regions.push(region_number);
252    }
253
254    /// Add a follower region to the set.
255    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
256        self.follower_regions.push(region_number);
257    }
258
259    /// Sort the regions.
260    pub fn sort(&mut self) {
261        self.follower_regions.sort();
262        self.leader_regions.sort();
263    }
264}
265
266/// The distribution of regions.
267///
268/// The key is the datanode id, the value is the region role set.
269pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
270
271/// The id of flow.
272pub type FlowId = u32;
273/// The partition of flow.
274pub type FlowPartitionId = u32;
275
276lazy_static! {
277    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
278}
279
280lazy_static! {
281    static ref TABLE_INFO_KEY_PATTERN: Regex =
282        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
283}
284
285lazy_static! {
286    static ref VIEW_INFO_KEY_PATTERN: Regex =
287        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
288}
289
290lazy_static! {
291    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
292        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
293}
294
295lazy_static! {
296    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
297        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
298}
299
300lazy_static! {
301    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
302        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
303}
304
305lazy_static! {
306    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
307        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
308    ))
309    .unwrap();
310}
311
312lazy_static! {
313    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
314    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
315        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
316    ))
317        .unwrap();
318}
319
320lazy_static! {
321    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
322    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
323        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
324    ))
325        .unwrap();
326}
327
328lazy_static! {
329    static ref NODE_ADDRESS_PATTERN: Regex =
330        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
331}
332
333lazy_static! {
334    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
335        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
336}
337
338lazy_static! {
339    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
340        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
341    ))
342    .unwrap();
343}
344
345/// The key of metadata.
346pub trait MetadataKey<'a, T> {
347    fn to_bytes(&self) -> Vec<u8>;
348
349    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
350}
351
352#[derive(Debug, Clone, PartialEq)]
353pub struct BytesAdapter(Vec<u8>);
354
355impl From<Vec<u8>> for BytesAdapter {
356    fn from(value: Vec<u8>) -> Self {
357        Self(value)
358    }
359}
360
361impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
362    fn to_bytes(&self) -> Vec<u8> {
363        self.0.clone()
364    }
365
366    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
367        Ok(BytesAdapter(bytes.to_vec()))
368    }
369}
370
371pub(crate) trait MetadataKeyGetTxnOp {
372    fn build_get_op(
373        &self,
374    ) -> (
375        TxnOp,
376        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
377    );
378}
379
380pub trait MetadataValue {
381    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
382    where
383        Self: Sized;
384
385    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
386}
387
388pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
389
390pub struct TableMetadataManager {
391    table_name_manager: TableNameManager,
392    table_info_manager: TableInfoManager,
393    view_info_manager: ViewInfoManager,
394    datanode_table_manager: DatanodeTableManager,
395    catalog_manager: CatalogManager,
396    schema_manager: SchemaManager,
397    table_route_manager: TableRouteManager,
398    table_repart_manager: TableRepartManager,
399    tombstone_manager: TombstoneManager,
400    topic_name_manager: TopicNameManager,
401    topic_region_manager: TopicRegionManager,
402    kv_backend: KvBackendRef,
403}
404
405#[macro_export]
406macro_rules! ensure_values {
407    ($got:expr, $expected_value:expr, $name:expr) => {
408        ensure!(
409            $got == $expected_value,
410            error::UnexpectedSnafu {
411                err_msg: format!(
412                    "Reads the different value: {:?} during {}, expected: {:?}",
413                    $got, $name, $expected_value
414                )
415            }
416        );
417    };
418}
419
420/// A struct containing a deserialized value(`inner`) and an original bytes.
421///
422/// - Serialize behaviors:
423///
424/// The `inner` field will be ignored.
425///
426/// - Deserialize behaviors:
427///
428/// The `inner` field will be deserialized from the `bytes` field.
429pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
430    // The original bytes of the inner.
431    bytes: Bytes,
432    // The value was deserialized from the original bytes.
433    inner: T,
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub struct DroppedTableName {
438    /// Table id stored in the tombstoned table-name mapping.
439    pub table_id: TableId,
440    /// Original fully qualified table name.
441    pub table_name: TableName,
442}
443
444#[derive(Debug, Clone)]
445pub struct DroppedTableMetadata {
446    /// Table id of the dropped table.
447    pub table_id: TableId,
448    /// Original fully qualified table name.
449    pub table_name: TableName,
450    /// Tombstoned table info value.
451    pub table_info_value: TableInfoValue,
452    /// Tombstoned table route value.
453    pub table_route_value: TableRouteValue,
454    /// Per-region WAL options recovered from tombstoned datanode metadata.
455    pub region_wal_options: HashMap<RegionNumber, WalOptions>,
456}
457
458impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
459    type Target = T;
460
461    fn deref(&self) -> &Self::Target {
462        &self.inner
463    }
464}
465
466impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
467    fn deref_mut(&mut self) -> &mut Self::Target {
468        &mut self.inner
469    }
470}
471
472impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        write!(
475            f,
476            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
477            self.inner, self.bytes
478        )
479    }
480}
481
482impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
483    /// - Serialize behaviors:
484    ///
485    /// The `inner` field will be ignored.
486    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
487    where
488        S: serde::Serializer,
489    {
490        // Safety: The original bytes are always JSON encoded.
491        // It's more efficiently than `serialize_bytes`.
492        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
493    }
494}
495
496impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
497    for DeserializedValueWithBytes<T>
498{
499    /// - Deserialize behaviors:
500    ///
501    /// The `inner` field will be deserialized from the `bytes` field.
502    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
503    where
504        D: serde::Deserializer<'de>,
505    {
506        let buf = String::deserialize(deserializer)?;
507        let bytes = Bytes::from(buf);
508
509        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
510            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
511
512        Ok(value)
513    }
514}
515
516impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
517    fn clone(&self) -> Self {
518        Self {
519            bytes: self.bytes.clone(),
520            inner: self.inner.clone(),
521        }
522    }
523}
524
525impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
526    /// Returns a struct containing a deserialized value and an original `bytes`.
527    /// It accepts original bytes of inner.
528    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
529        let inner = T::try_from_raw_value(&bytes)?;
530        Ok(Self { bytes, inner })
531    }
532
533    /// Returns a struct containing a deserialized value and an original `bytes`.
534    /// It accepts original bytes of inner.
535    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
536        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
537    }
538
539    pub fn into_inner(self) -> T {
540        self.inner
541    }
542
543    pub fn get_inner_ref(&self) -> &T {
544        &self.inner
545    }
546
547    /// Returns original `bytes`
548    pub fn get_raw_bytes(&self) -> Vec<u8> {
549        self.bytes.to_vec()
550    }
551
552    #[cfg(any(test, feature = "testing"))]
553    pub fn from_inner(inner: T) -> Self {
554        let bytes = serde_json::to_vec(&inner).unwrap();
555
556        Self {
557            bytes: Bytes::from(bytes),
558            inner,
559        }
560    }
561}
562
563impl TableMetadataManager {
564    pub fn new(kv_backend: KvBackendRef) -> Self {
565        TableMetadataManager {
566            table_name_manager: TableNameManager::new(kv_backend.clone()),
567            table_info_manager: TableInfoManager::new(kv_backend.clone()),
568            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
569            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
570            catalog_manager: CatalogManager::new(kv_backend.clone()),
571            schema_manager: SchemaManager::new(kv_backend.clone()),
572            table_route_manager: TableRouteManager::new(kv_backend.clone()),
573            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
574            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
575            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
576            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
577            kv_backend,
578        }
579    }
580
581    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
582    pub fn new_with_custom_tombstone_prefix(
583        kv_backend: KvBackendRef,
584        tombstone_prefix: &str,
585    ) -> Self {
586        Self {
587            table_name_manager: TableNameManager::new(kv_backend.clone()),
588            table_info_manager: TableInfoManager::new(kv_backend.clone()),
589            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
590            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
591            catalog_manager: CatalogManager::new(kv_backend.clone()),
592            schema_manager: SchemaManager::new(kv_backend.clone()),
593            table_route_manager: TableRouteManager::new(kv_backend.clone()),
594            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
595            tombstone_manager: TombstoneManager::new_with_prefix(
596                kv_backend.clone(),
597                tombstone_prefix,
598            ),
599            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
600            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
601            kv_backend,
602        }
603    }
604
605    pub async fn init(&self) -> Result<()> {
606        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
607
608        self.catalog_manager().create(catalog_name, true).await?;
609
610        let internal_schemas = [
611            DEFAULT_SCHEMA_NAME,
612            INFORMATION_SCHEMA_NAME,
613            DEFAULT_PRIVATE_SCHEMA_NAME,
614        ];
615
616        for schema_name in internal_schemas {
617            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
618
619            self.schema_manager().create(schema_key, None, true).await?;
620        }
621
622        Ok(())
623    }
624
625    pub fn table_name_manager(&self) -> &TableNameManager {
626        &self.table_name_manager
627    }
628
629    pub fn table_info_manager(&self) -> &TableInfoManager {
630        &self.table_info_manager
631    }
632
633    pub fn view_info_manager(&self) -> &ViewInfoManager {
634        &self.view_info_manager
635    }
636
637    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
638        &self.datanode_table_manager
639    }
640
641    pub fn catalog_manager(&self) -> &CatalogManager {
642        &self.catalog_manager
643    }
644
645    pub fn schema_manager(&self) -> &SchemaManager {
646        &self.schema_manager
647    }
648
649    pub fn table_route_manager(&self) -> &TableRouteManager {
650        &self.table_route_manager
651    }
652
653    pub fn table_repart_manager(&self) -> &TableRepartManager {
654        &self.table_repart_manager
655    }
656
657    pub fn topic_name_manager(&self) -> &TopicNameManager {
658        &self.topic_name_manager
659    }
660
661    pub fn topic_region_manager(&self) -> &TopicRegionManager {
662        &self.topic_region_manager
663    }
664
665    pub fn kv_backend(&self) -> &KvBackendRef {
666        &self.kv_backend
667    }
668
669    pub async fn get_full_table_info(
670        &self,
671        table_id: TableId,
672    ) -> Result<(
673        Option<DeserializedValueWithBytes<TableInfoValue>>,
674        Option<DeserializedValueWithBytes<TableRouteValue>>,
675    )> {
676        let table_info_key = TableInfoKey::new(table_id);
677        let table_route_key = TableRouteKey::new(table_id);
678        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
679        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
680
681        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
682        let mut res = self.kv_backend.txn(txn).await?;
683        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
684        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
685        let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
686        if let Some(table_route_value) = &mut table_route_value {
687            self.table_route_manager()
688                .table_route_storage()
689                .remap_table_route(table_route_value)
690                .await?;
691        }
692        Ok((table_info_value, table_route_value))
693    }
694
695    /// Creates metadata for view and returns an error if different metadata exists.
696    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
697    /// Parameters include:
698    /// - `view_info`: the encoded logical plan
699    /// - `table_names`: the resolved fully table names in logical plan
700    /// - `columns`: the view columns
701    /// - `plan_columns`: the original plan columns
702    /// - `definition`: The SQL to create the view
703    ///
704    pub async fn create_view_metadata(
705        &self,
706        view_info: TableInfo,
707        raw_logical_plan: Vec<u8>,
708        table_names: HashSet<TableName>,
709        columns: Vec<String>,
710        plan_columns: Vec<String>,
711        definition: String,
712    ) -> Result<()> {
713        let view_id = view_info.ident.table_id;
714
715        // Creates view name.
716        let view_name = TableNameKey::new(
717            &view_info.catalog_name,
718            &view_info.schema_name,
719            &view_info.name,
720        );
721        let create_table_name_txn = self
722            .table_name_manager()
723            .build_create_txn(&view_name, view_id)?;
724
725        // Creates table info.
726        let table_info_value = TableInfoValue::new(view_info);
727
728        let (create_table_info_txn, on_create_table_info_failure) = self
729            .table_info_manager()
730            .build_create_txn(view_id, &table_info_value)?;
731
732        // Creates view info
733        let view_info_value = ViewInfoValue::new(
734            raw_logical_plan.into(),
735            table_names,
736            columns,
737            plan_columns,
738            definition,
739        );
740        let (create_view_info_txn, on_create_view_info_failure) = self
741            .view_info_manager()
742            .build_create_txn(view_id, &view_info_value)?;
743
744        let txn = Txn::merge_all(vec![
745            create_table_name_txn,
746            create_table_info_txn,
747            create_view_info_txn,
748        ]);
749
750        let mut r = self.kv_backend.txn(txn).await?;
751
752        // Checks whether metadata was already created.
753        if !r.succeeded {
754            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
755            let remote_table_info = on_create_table_info_failure(&mut set)?
756                .context(error::UnexpectedSnafu {
757                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
758                })?
759                .into_inner();
760
761            let remote_view_info = on_create_view_info_failure(&mut set)?
762                .context(error::UnexpectedSnafu {
763                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
764                })?
765                .into_inner();
766
767            let op_name = "the creating view metadata";
768            ensure_values!(remote_table_info, table_info_value, op_name);
769            ensure_values!(remote_view_info, view_info_value, op_name);
770        }
771
772        Ok(())
773    }
774
775    /// Creates metadata for table and returns an error if different metadata exists.
776    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
777    pub async fn create_table_metadata(
778        &self,
779        table_info: TableInfo,
780        table_route_value: TableRouteValue,
781        region_wal_options: HashMap<RegionNumber, String>,
782    ) -> Result<()> {
783        let table_id = table_info.ident.table_id;
784        let engine = table_info.meta.engine.clone();
785
786        // Creates table name.
787        let table_name = TableNameKey::new(
788            &table_info.catalog_name,
789            &table_info.schema_name,
790            &table_info.name,
791        );
792        let create_table_name_txn = self
793            .table_name_manager()
794            .build_create_txn(&table_name, table_id)?;
795
796        let region_options = table_info.to_region_options();
797        // Creates table info.
798        let table_info_value = TableInfoValue::new(table_info);
799        let (create_table_info_txn, on_create_table_info_failure) = self
800            .table_info_manager()
801            .build_create_txn(table_id, &table_info_value)?;
802
803        let (create_table_route_txn, on_create_table_route_failure) = self
804            .table_route_manager()
805            .table_route_storage()
806            .build_create_txn(table_id, &table_route_value)?;
807
808        let create_topic_region_txn = self
809            .topic_region_manager
810            .build_create_txn(table_id, &region_wal_options)?;
811
812        let mut txn = Txn::merge_all(vec![
813            create_table_name_txn,
814            create_table_info_txn,
815            create_table_route_txn,
816            create_topic_region_txn,
817        ]);
818
819        if let TableRouteValue::Physical(x) = &table_route_value {
820            let region_storage_path = table_info_value.region_storage_path();
821            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
822                table_id,
823                &engine,
824                &region_storage_path,
825                region_options,
826                region_wal_options,
827                region_distribution(&x.region_routes),
828            )?;
829            txn = txn.merge(create_datanode_table_txn);
830        }
831
832        let mut r = self.kv_backend.txn(txn).await?;
833
834        // Checks whether metadata was already created.
835        if !r.succeeded {
836            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
837            let remote_table_info = on_create_table_info_failure(&mut set)?
838                .context(error::UnexpectedSnafu {
839                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
840                })?
841                .into_inner();
842
843            let remote_table_route = on_create_table_route_failure(&mut set)?
844                .context(error::UnexpectedSnafu {
845                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
846                })?
847                .into_inner();
848
849            let op_name = "the creating table metadata";
850            ensure_values!(remote_table_info, table_info_value, op_name);
851            ensure_values!(remote_table_route, table_route_value, op_name);
852        }
853
854        Ok(())
855    }
856
857    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
858        // The batch size is max_txn_size / 3 because the size of the `tables_data`
859        // is 3 times the size of the `tables_data`.
860        self.kv_backend.max_txn_ops() / 3
861    }
862
863    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
864    pub async fn create_logical_tables_metadata(
865        &self,
866        tables_data: Vec<(TableInfo, TableRouteValue)>,
867    ) -> Result<()> {
868        let len = tables_data.len();
869        let mut txns = Vec::with_capacity(3 * len);
870        struct OnFailure<F1, R1, F2, R2>
871        where
872            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
873            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
874        {
875            table_info_value: TableInfoValue,
876            on_create_table_info_failure: F1,
877            table_route_value: TableRouteValue,
878            on_create_table_route_failure: F2,
879        }
880        let mut on_failures = Vec::with_capacity(len);
881        for (table_info, table_route_value) in tables_data {
882            let table_id = table_info.ident.table_id;
883
884            // Creates table name.
885            let table_name = TableNameKey::new(
886                &table_info.catalog_name,
887                &table_info.schema_name,
888                &table_info.name,
889            );
890            let create_table_name_txn = self
891                .table_name_manager()
892                .build_create_txn(&table_name, table_id)?;
893            txns.push(create_table_name_txn);
894
895            // Creates table info.
896            let table_info_value = TableInfoValue::new(table_info);
897            let (create_table_info_txn, on_create_table_info_failure) =
898                self.table_info_manager()
899                    .build_create_txn(table_id, &table_info_value)?;
900            txns.push(create_table_info_txn);
901
902            let (create_table_route_txn, on_create_table_route_failure) = self
903                .table_route_manager()
904                .table_route_storage()
905                .build_create_txn(table_id, &table_route_value)?;
906            txns.push(create_table_route_txn);
907
908            on_failures.push(OnFailure {
909                table_info_value,
910                on_create_table_info_failure,
911                table_route_value,
912                on_create_table_route_failure,
913            });
914        }
915
916        let txn = Txn::merge_all(txns);
917        let mut r = self.kv_backend.txn(txn).await?;
918
919        // Checks whether metadata was already created.
920        if !r.succeeded {
921            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
922            for on_failure in on_failures {
923                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
924                    .context(error::UnexpectedSnafu {
925                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
926                    })?
927                    .into_inner();
928
929                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
930                    .context(error::UnexpectedSnafu {
931                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
932                    })?
933                    .into_inner();
934
935                let op_name = "the creating logical tables metadata";
936                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
937                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
938            }
939        }
940
941        Ok(())
942    }
943
944    fn table_metadata_keys(
945        &self,
946        table_id: TableId,
947        table_name: &TableName,
948        table_route_value: &TableRouteValue,
949        region_wal_options: &HashMap<RegionNumber, WalOptions>,
950    ) -> Result<Vec<Vec<u8>>> {
951        // Builds keys
952        let datanode_ids = if table_route_value.is_physical() {
953            region_distribution(table_route_value.region_routes()?)
954                .into_keys()
955                .collect()
956        } else {
957            vec![]
958        };
959        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
960        let table_name = TableNameKey::new(
961            &table_name.catalog_name,
962            &table_name.schema_name,
963            &table_name.table_name,
964        );
965        let table_info_key = TableInfoKey::new(table_id);
966        let table_route_key = TableRouteKey::new(table_id);
967        let table_repart_key = TableRepartKey::new(table_id);
968        let datanode_table_keys = datanode_ids
969            .into_iter()
970            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
971            .collect::<HashSet<_>>();
972        let topic_region_map = self
973            .topic_region_manager
974            .get_topic_region_mapping(table_id, region_wal_options);
975        let topic_region_keys = topic_region_map
976            .iter()
977            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
978            .collect::<Vec<_>>();
979        keys.push(table_name.to_bytes());
980        keys.push(table_info_key.to_bytes());
981        keys.push(table_route_key.to_bytes());
982        keys.push(table_repart_key.to_bytes());
983        for key in &datanode_table_keys {
984            keys.push(key.to_bytes());
985        }
986        for key in topic_region_keys {
987            keys.push(key.to_bytes());
988        }
989        Ok(keys)
990    }
991
992    /// Deletes metadata for table **logically**.
993    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
994    pub async fn delete_table_metadata(
995        &self,
996        table_id: TableId,
997        table_name: &TableName,
998        table_route_value: &TableRouteValue,
999        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1000    ) -> Result<()> {
1001        let keys =
1002            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1003        self.tombstone_manager.create(keys).await.map(|_| ())
1004    }
1005
1006    /// Lists dropped tables from tombstoned table-name entries.
1007    pub async fn list_dropped_tables(&self) -> Result<Vec<DroppedTableName>> {
1008        let mut stream = self.tombstone_manager.tombstoned_table_names();
1009        let mut dropped_tables = Vec::new();
1010
1011        while let Some(kv) = stream.try_next().await? {
1012            let raw_key = self.tombstone_manager.strip_tombstone_prefix(&kv.key)?;
1013            let table_name = TableNameKey::from_bytes(raw_key)?.into();
1014            let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1015            dropped_tables.push(DroppedTableName {
1016                table_id,
1017                table_name,
1018            });
1019        }
1020
1021        Ok(dropped_tables)
1022    }
1023
1024    /// Gets dropped table metadata by its original full table name.
1025    pub async fn get_dropped_table(
1026        &self,
1027        table_name: &TableName,
1028    ) -> Result<Option<DroppedTableMetadata>> {
1029        let table_name_key = TableNameKey::from(table_name);
1030        let Some(kv) = self
1031            .tombstone_manager
1032            .get(&table_name_key.to_bytes())
1033            .await?
1034        else {
1035            return Ok(None);
1036        };
1037
1038        let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1039        self.get_dropped_table_metadata(table_id, table_name.clone())
1040            .await
1041    }
1042
1043    /// Gets dropped table metadata by table id.
1044    pub async fn get_dropped_table_by_id(
1045        &self,
1046        table_id: TableId,
1047    ) -> Result<Option<DroppedTableMetadata>> {
1048        self.get_dropped_table_metadata(table_id, None).await
1049    }
1050
1051    /// Deletes metadata tombstone for table **permanently**.
1052    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1053    pub async fn delete_table_metadata_tombstone(
1054        &self,
1055        table_id: TableId,
1056        table_name: &TableName,
1057        table_route_value: &TableRouteValue,
1058        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1059    ) -> Result<()> {
1060        let table_metadata_keys =
1061            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1062        self.tombstone_manager
1063            .delete(table_metadata_keys)
1064            .await
1065            .map(|_| ())
1066    }
1067
1068    /// Restores metadata for table.
1069    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1070    pub async fn restore_table_metadata(
1071        &self,
1072        table_id: TableId,
1073        table_name: &TableName,
1074        table_route_value: &TableRouteValue,
1075        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1076    ) -> Result<()> {
1077        let keys =
1078            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1079        self.tombstone_manager.restore(keys).await.map(|_| ())
1080    }
1081
1082    /// Deletes metadata for table **permanently**.
1083    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1084    pub async fn destroy_table_metadata(
1085        &self,
1086        table_id: TableId,
1087        table_name: &TableName,
1088        table_route_value: &TableRouteValue,
1089        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1090    ) -> Result<()> {
1091        let keys =
1092            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1093        let _ = self
1094            .kv_backend
1095            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1096            .await?;
1097        Ok(())
1098    }
1099
1100    /// Rebuilds dropped table metadata from tombstoned keys.
1101    async fn get_dropped_table_metadata<T>(
1102        &self,
1103        table_id: TableId,
1104        table_name: T,
1105    ) -> Result<Option<DroppedTableMetadata>>
1106    where
1107        T: Into<Option<TableName>>,
1108    {
1109        let table_info_key = TableInfoKey::new(table_id);
1110        let Some(table_info_kv) = self
1111            .tombstone_manager
1112            .get(&table_info_key.to_bytes())
1113            .await?
1114        else {
1115            return Ok(None);
1116        };
1117
1118        let table_info_value = TableInfoValue::try_from_raw_value(&table_info_kv.value)?;
1119        let table_name = table_name
1120            .into()
1121            .unwrap_or_else(|| table_info_value.table_name());
1122
1123        let table_route_key = TableRouteKey::new(table_id);
1124        let table_route_kv = self
1125            .tombstone_manager
1126            .get(&table_route_key.to_bytes())
1127            .await?
1128            .with_context(|| error::UnexpectedSnafu {
1129                err_msg: format!("Missing tombstoned table route metadata for table id {table_id}"),
1130            })?;
1131        let mut table_route_value = TableRouteValue::try_from_raw_value(&table_route_kv.value)?;
1132        self.table_route_manager
1133            .table_route_storage()
1134            .remap_table_route(&mut table_route_value)
1135            .await?;
1136
1137        let region_wal_options = self
1138            .dropped_region_wal_options(table_id, &table_route_value)
1139            .await?;
1140
1141        Ok(Some(DroppedTableMetadata {
1142            table_id,
1143            table_name,
1144            table_info_value,
1145            table_route_value,
1146            region_wal_options,
1147        }))
1148    }
1149
1150    /// Rebuilds region WAL options from tombstoned datanode-table entries.
1151    async fn dropped_region_wal_options(
1152        &self,
1153        table_id: TableId,
1154        table_route_value: &TableRouteValue,
1155    ) -> Result<HashMap<RegionNumber, WalOptions>> {
1156        let mut region_wal_options = HashMap::new();
1157        let datanode_table_keys = region_distribution(table_route_value.region_routes()?)
1158            .into_keys()
1159            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
1160            .collect::<Vec<_>>();
1161        let datanode_table_key_bytes = datanode_table_keys
1162            .iter()
1163            .map(|key| key.to_bytes())
1164            .collect::<Vec<_>>();
1165        let datanode_table_values = self
1166            .tombstone_manager
1167            .batch_get(&datanode_table_key_bytes)
1168            .await?;
1169
1170        for datanode_table_key in datanode_table_keys {
1171            let Some(kv) = datanode_table_values.get(&datanode_table_key.to_bytes()) else {
1172                continue;
1173            };
1174
1175            let datanode_table_value = DatanodeTableValue::try_from_raw_value(&kv.value)?;
1176            for (region_number, wal_options) in &datanode_table_value.region_info.region_wal_options
1177            {
1178                region_wal_options.insert(
1179                    *region_number,
1180                    serde_json::from_str(wal_options).context(error::SerdeJsonSnafu)?,
1181                );
1182            }
1183        }
1184
1185        Ok(region_wal_options)
1186    }
1187
1188    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1189        let mut keys = Vec::with_capacity(3);
1190        let view_name = TableNameKey::new(
1191            &view_name.catalog_name,
1192            &view_name.schema_name,
1193            &view_name.table_name,
1194        );
1195        let table_info_key = TableInfoKey::new(view_id);
1196        let view_info_key = ViewInfoKey::new(view_id);
1197        keys.push(view_name.to_bytes());
1198        keys.push(table_info_key.to_bytes());
1199        keys.push(view_info_key.to_bytes());
1200
1201        Ok(keys)
1202    }
1203
1204    /// Deletes metadata for view **permanently**.
1205    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1206    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1207        let keys = self.view_info_keys(view_id, view_name)?;
1208        let _ = self
1209            .kv_backend
1210            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1211            .await?;
1212        Ok(())
1213    }
1214
1215    /// Renames the table name and returns an error if different metadata exists.
1216    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1217    /// and the new `TableNameKey` MUST be empty.
1218    pub async fn rename_table(
1219        &self,
1220        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1221        new_table_name: String,
1222    ) -> Result<()> {
1223        let current_table_info = &current_table_info_value.table_info;
1224        let table_id = current_table_info.ident.table_id;
1225
1226        let table_name_key = TableNameKey::new(
1227            &current_table_info.catalog_name,
1228            &current_table_info.schema_name,
1229            &current_table_info.name,
1230        );
1231
1232        let new_table_name_key = TableNameKey::new(
1233            &current_table_info.catalog_name,
1234            &current_table_info.schema_name,
1235            &new_table_name,
1236        );
1237
1238        // Updates table name.
1239        let update_table_name_txn = self.table_name_manager().build_update_txn(
1240            &table_name_key,
1241            &new_table_name_key,
1242            table_id,
1243        )?;
1244
1245        let new_table_info_value = current_table_info_value
1246            .inner
1247            .with_update(move |table_info| {
1248                table_info.name = new_table_name;
1249            });
1250
1251        // Updates table info.
1252        let (update_table_info_txn, on_update_table_info_failure) = self
1253            .table_info_manager()
1254            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1255
1256        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1257
1258        let mut r = self.kv_backend.txn(txn).await?;
1259
1260        // Checks whether metadata was already updated.
1261        if !r.succeeded {
1262            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1263            let remote_table_info = on_update_table_info_failure(&mut set)?
1264                .context(error::UnexpectedSnafu {
1265                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1266                })?
1267                .into_inner();
1268
1269            let op_name = "the renaming table metadata";
1270            ensure_values!(remote_table_info, new_table_info_value, op_name);
1271        }
1272
1273        Ok(())
1274    }
1275
1276    /// Updates table info and returns an error if different metadata exists.
1277    /// And cascade-ly update all redundant table options for each region
1278    /// if region_distribution is present.
1279    pub async fn update_table_info(
1280        &self,
1281        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1282        region_distribution: Option<RegionDistribution>,
1283        new_table_info: TableInfo,
1284    ) -> Result<()> {
1285        let table_id = current_table_info_value.table_info.ident.table_id;
1286        let new_table_info_value = current_table_info_value.update(new_table_info);
1287
1288        // Updates table info.
1289        let (update_table_info_txn, on_update_table_info_failure) = self
1290            .table_info_manager()
1291            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1292
1293        let txn = if let Some(region_distribution) = region_distribution {
1294            // region options induced from table info.
1295            let new_region_options = new_table_info_value.table_info.to_region_options();
1296            let update_datanode_table_options_txn = self
1297                .datanode_table_manager
1298                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1299                .await?;
1300            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1301        } else {
1302            update_table_info_txn
1303        };
1304
1305        let mut r = self.kv_backend.txn(txn).await?;
1306        // Checks whether metadata was already updated.
1307        if !r.succeeded {
1308            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1309            let remote_table_info = on_update_table_info_failure(&mut set)?
1310                .context(error::UnexpectedSnafu {
1311                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1312                })?
1313                .into_inner();
1314
1315            let op_name = "the updating table info";
1316            ensure_values!(remote_table_info, new_table_info_value, op_name);
1317        }
1318        Ok(())
1319    }
1320
1321    /// Updates view info and returns an error if different metadata exists.
1322    /// Parameters include:
1323    /// - `view_id`: the view id
1324    /// - `current_view_info_value`: the current view info for CAS checking
1325    /// - `new_view_info`: the encoded logical plan
1326    /// - `table_names`: the resolved fully table names in logical plan
1327    /// - `columns`: the view columns
1328    /// - `plan_columns`: the original plan columns
1329    /// - `definition`: The SQL to create the view
1330    ///
1331    #[allow(clippy::too_many_arguments)]
1332    pub async fn update_view_info(
1333        &self,
1334        view_id: TableId,
1335        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1336        new_view_info: Vec<u8>,
1337        table_names: HashSet<TableName>,
1338        columns: Vec<String>,
1339        plan_columns: Vec<String>,
1340        definition: String,
1341    ) -> Result<()> {
1342        let new_view_info_value = current_view_info_value.update(
1343            new_view_info.into(),
1344            table_names,
1345            columns,
1346            plan_columns,
1347            definition,
1348        );
1349
1350        // Updates view info.
1351        let (update_view_info_txn, on_update_view_info_failure) = self
1352            .view_info_manager()
1353            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1354
1355        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1356
1357        // Checks whether metadata was already updated.
1358        if !r.succeeded {
1359            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1360            let remote_view_info = on_update_view_info_failure(&mut set)?
1361                .context(error::UnexpectedSnafu {
1362                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1363                })?
1364                .into_inner();
1365
1366            let op_name = "the updating view info";
1367            ensure_values!(remote_view_info, new_view_info_value, op_name);
1368        }
1369        Ok(())
1370    }
1371
1372    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1373        self.kv_backend.max_txn_ops()
1374    }
1375
1376    pub async fn batch_update_table_info_values(
1377        &self,
1378        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1379    ) -> Result<()> {
1380        let len = table_info_value_pairs.len();
1381        let mut txns = Vec::with_capacity(len);
1382        struct OnFailure<F, R>
1383        where
1384            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1385        {
1386            table_info_value: TableInfoValue,
1387            on_update_table_info_failure: F,
1388        }
1389        let mut on_failures = Vec::with_capacity(len);
1390
1391        for (table_info_value, new_table_info) in table_info_value_pairs {
1392            let table_id = table_info_value.table_info.ident.table_id;
1393
1394            let new_table_info_value = table_info_value.update(new_table_info);
1395
1396            let (update_table_info_txn, on_update_table_info_failure) =
1397                self.table_info_manager().build_update_txn(
1398                    table_id,
1399                    &table_info_value,
1400                    &new_table_info_value,
1401                )?;
1402
1403            txns.push(update_table_info_txn);
1404
1405            on_failures.push(OnFailure {
1406                table_info_value: new_table_info_value,
1407                on_update_table_info_failure,
1408            });
1409        }
1410
1411        let txn = Txn::merge_all(txns);
1412        let mut r = self.kv_backend.txn(txn).await?;
1413
1414        if !r.succeeded {
1415            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1416            for on_failure in on_failures {
1417                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1418                    .context(error::UnexpectedSnafu {
1419                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1420                    })?
1421                    .into_inner();
1422
1423                let op_name = "the batch updating table info";
1424                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1425            }
1426        }
1427
1428        Ok(())
1429    }
1430
1431    pub async fn update_table_route(
1432        &self,
1433        table_id: TableId,
1434        region_info: RegionInfo,
1435        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1436        new_region_routes: Vec<RegionRoute>,
1437        new_region_options: &HashMap<String, String>,
1438        new_region_wal_options: &HashMap<RegionNumber, String>,
1439    ) -> Result<()> {
1440        // Updates the datanode table key value pairs.
1441        let current_region_distribution =
1442            region_distribution(current_table_route_value.region_routes()?);
1443        let new_region_distribution = region_distribution(&new_region_routes);
1444
1445        let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1446            table_id,
1447            &region_info.region_wal_options,
1448            new_region_wal_options,
1449        )?;
1450        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1451            table_id,
1452            region_info,
1453            current_region_distribution,
1454            new_region_distribution,
1455            new_region_options,
1456            new_region_wal_options,
1457        )?;
1458
1459        // Updates the table_route.
1460        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1461        let (update_table_route_txn, on_update_table_route_failure) = self
1462            .table_route_manager()
1463            .table_route_storage()
1464            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1465
1466        let txn = Txn::merge_all(vec![
1467            update_datanode_table_txn,
1468            update_table_route_txn,
1469            update_topic_region_txn,
1470        ]);
1471
1472        let mut r = self.kv_backend.txn(txn).await?;
1473
1474        // Checks whether metadata was already updated.
1475        if !r.succeeded {
1476            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1477            let remote_table_route = on_update_table_route_failure(&mut set)?
1478                .context(error::UnexpectedSnafu {
1479                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1480                })?
1481                .into_inner();
1482
1483            let op_name = "the updating table route";
1484            ensure_values!(remote_table_route, new_table_route_value, op_name);
1485        }
1486
1487        Ok(())
1488    }
1489
1490    /// Updates the leader status of the [RegionRoute].
1491    pub async fn update_leader_region_status<F>(
1492        &self,
1493        table_id: TableId,
1494        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1495        next_region_route_status: F,
1496    ) -> Result<()>
1497    where
1498        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1499    {
1500        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1501
1502        let mut updated = 0;
1503        for route in &mut new_region_routes {
1504            if let Some(state) = next_region_route_status(route)
1505                && route.set_leader_state(state)
1506            {
1507                updated += 1;
1508            }
1509        }
1510
1511        if updated == 0 {
1512            warn!("No leader status updated");
1513            return Ok(());
1514        }
1515
1516        // Updates the table_route.
1517        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1518
1519        let (update_table_route_txn, on_update_table_route_failure) = self
1520            .table_route_manager()
1521            .table_route_storage()
1522            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1523
1524        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1525
1526        // Checks whether metadata was already updated.
1527        if !r.succeeded {
1528            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1529            let remote_table_route = on_update_table_route_failure(&mut set)?
1530                .context(error::UnexpectedSnafu {
1531                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1532                })?
1533                .into_inner();
1534
1535            let op_name = "the updating leader region status";
1536            ensure_values!(remote_table_route, new_table_route_value, op_name);
1537        }
1538
1539        Ok(())
1540    }
1541}
1542
1543#[macro_export]
1544macro_rules! impl_metadata_value {
1545    ($($val_ty: ty), *) => {
1546        $(
1547            impl $crate::key::MetadataValue for $val_ty {
1548                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1549                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1550                }
1551
1552                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1553                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1554                }
1555            }
1556        )*
1557    }
1558}
1559
1560macro_rules! impl_metadata_key_get_txn_op {
1561    ($($key: ty), *) => {
1562        $(
1563            impl $crate::key::MetadataKeyGetTxnOp for $key {
1564                /// Returns a [TxnOp] to retrieve the corresponding value
1565                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1566                fn build_get_op(
1567                    &self,
1568                ) -> (
1569                    TxnOp,
1570                    impl for<'a> FnMut(
1571                        &'a mut TxnOpGetResponseSet,
1572                    ) -> Option<Vec<u8>>,
1573                ) {
1574                    let raw_key = self.to_bytes();
1575                    (
1576                        TxnOp::Get(raw_key.clone()),
1577                        TxnOpGetResponseSet::filter(raw_key),
1578                    )
1579                }
1580            }
1581        )*
1582    }
1583}
1584
1585impl_metadata_key_get_txn_op! {
1586    TableNameKey<'_>,
1587    TableInfoKey,
1588    ViewInfoKey,
1589    TableRouteKey,
1590    DatanodeTableKey
1591}
1592
1593#[macro_export]
1594macro_rules! impl_optional_metadata_value {
1595    ($($val_ty: ty), *) => {
1596        $(
1597            impl $val_ty {
1598                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1599                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1600                }
1601
1602                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1603                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1604                }
1605            }
1606        )*
1607    }
1608}
1609
1610impl_metadata_value! {
1611    TableNameValue,
1612    TableInfoValue,
1613    ViewInfoValue,
1614    DatanodeTableValue,
1615    FlowInfoValue,
1616    FlowNameValue,
1617    FlowRouteValue,
1618    TableFlowValue,
1619    NodeAddressValue,
1620    SchemaNameValue,
1621    FlowStateValue,
1622    PoisonValue,
1623    TopicRegionValue
1624}
1625
1626impl_optional_metadata_value! {
1627    CatalogNameValue,
1628    SchemaNameValue
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633    use std::collections::{BTreeMap, HashMap, HashSet};
1634    use std::sync::Arc;
1635
1636    use bytes::Bytes;
1637    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1638    use common_time::util::current_time_millis;
1639    use common_wal::options::{KafkaWalOptions, WalOptions};
1640    use futures::TryStreamExt;
1641    use store_api::storage::{RegionId, RegionNumber};
1642    use table::metadata::TableInfo;
1643    use table::table_name::TableName;
1644
1645    use super::datanode_table::DatanodeTableKey;
1646    use super::test_utils;
1647    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1648    use crate::ddl::test_util::create_table::test_create_table_task;
1649    use crate::ddl::utils::region_storage_path;
1650    use crate::error::Result;
1651    use crate::key::datanode_table::RegionInfo;
1652    use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1653    use crate::key::table_info::TableInfoValue;
1654    use crate::key::table_name::TableNameKey;
1655    use crate::key::table_route::TableRouteValue;
1656    use crate::key::topic_region::TopicRegionKey;
1657    use crate::key::{
1658        DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1659        TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1660    };
1661    use crate::kv_backend::KvBackend;
1662    use crate::kv_backend::memory::MemoryKvBackend;
1663    use crate::kv_backend::read_only::ReadOnlyKvBackend;
1664    use crate::peer::Peer;
1665    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1666    use crate::rpc::store::{PutRequest, RangeRequest};
1667    use crate::wal_provider::WalProvider;
1668
1669    #[test]
1670    fn test_deserialized_value_with_bytes() {
1671        let region_route = new_test_region_route();
1672        let region_routes = vec![region_route.clone()];
1673
1674        let expected_region_routes =
1675            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1676        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1677
1678        // Serialize behaviors:
1679        // The inner field will be ignored.
1680        let value = DeserializedValueWithBytes {
1681            // ignored
1682            inner: TableRouteValue::physical(region_routes.clone()),
1683            bytes: Bytes::from(expected.clone()),
1684        };
1685
1686        let encoded = serde_json::to_vec(&value).unwrap();
1687
1688        // Deserialize behaviors:
1689        // The inner field will be deserialized from the bytes field.
1690        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1691            serde_json::from_slice(&encoded).unwrap();
1692
1693        assert_eq!(decoded.inner, expected_region_routes);
1694        assert_eq!(decoded.bytes, expected);
1695    }
1696
1697    fn new_test_region_route() -> RegionRoute {
1698        new_region_route(1, 2)
1699    }
1700
1701    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1702        RegionRoute {
1703            region: Region {
1704                id: region_id.into(),
1705                name: "r1".to_string(),
1706                attrs: BTreeMap::new(),
1707                partition_expr: Default::default(),
1708            },
1709            leader_peer: Some(Peer::new(datanode, "a2")),
1710            follower_peers: vec![],
1711            leader_state: None,
1712            leader_down_since: None,
1713            write_route_policy: None,
1714        }
1715    }
1716
1717    fn new_test_table_info() -> TableInfo {
1718        test_utils::new_test_table_info(10)
1719    }
1720
1721    fn new_test_table_names() -> HashSet<TableName> {
1722        let mut set = HashSet::new();
1723        set.insert(TableName {
1724            catalog_name: "greptime".to_string(),
1725            schema_name: "public".to_string(),
1726            table_name: "a_table".to_string(),
1727        });
1728        set.insert(TableName {
1729            catalog_name: "greptime".to_string(),
1730            schema_name: "public".to_string(),
1731            table_name: "b_table".to_string(),
1732        });
1733        set
1734    }
1735
1736    async fn create_physical_table_metadata(
1737        table_metadata_manager: &TableMetadataManager,
1738        table_info: TableInfo,
1739        region_routes: Vec<RegionRoute>,
1740        region_wal_options: HashMap<RegionNumber, String>,
1741    ) -> Result<()> {
1742        table_metadata_manager
1743            .create_table_metadata(
1744                table_info,
1745                TableRouteValue::physical(region_routes),
1746                region_wal_options,
1747            )
1748            .await
1749    }
1750
1751    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1752        let topics = (0..2)
1753            .map(|i| format!("greptimedb_topic{}", i))
1754            .collect::<Vec<_>>();
1755        let wal_options = topics
1756            .iter()
1757            .map(|topic| {
1758                WalOptions::Kafka(KafkaWalOptions {
1759                    topic: topic.clone(),
1760                })
1761            })
1762            .collect::<Vec<_>>();
1763
1764        (0..16)
1765            .enumerate()
1766            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1767            .collect()
1768    }
1769
1770    fn create_mixed_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1771        HashMap::from([
1772            (
1773                0,
1774                WalOptions::Kafka(KafkaWalOptions {
1775                    topic: "greptimedb_topic0".to_string(),
1776                }),
1777            ),
1778            (1, WalOptions::RaftEngine),
1779            (2, WalOptions::Noop),
1780            (
1781                3,
1782                WalOptions::Kafka(KafkaWalOptions {
1783                    topic: "greptimedb_topic1".to_string(),
1784                }),
1785            ),
1786        ])
1787    }
1788
1789    #[tokio::test]
1790    async fn test_raft_engine_topic_region_map() {
1791        let mem_kv = Arc::new(MemoryKvBackend::default());
1792        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1793        let region_route = new_test_region_route();
1794        let region_routes = &vec![region_route.clone()];
1795        let table_info = new_test_table_info();
1796        let wal_provider = WalProvider::RaftEngine;
1797        let regions: Vec<_> = (0..16).collect();
1798        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1799        create_physical_table_metadata(
1800            &table_metadata_manager,
1801            table_info.clone(),
1802            region_routes.clone(),
1803            region_wal_options.clone(),
1804        )
1805        .await
1806        .unwrap();
1807
1808        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1809        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1810        let resp = mem_kv.range(range_req).await.unwrap();
1811        // Should be empty because the topic region map is empty for raft engine.
1812        assert!(resp.kvs.is_empty());
1813    }
1814
1815    #[tokio::test]
1816    async fn test_create_table_metadata() {
1817        let mem_kv = Arc::new(MemoryKvBackend::default());
1818        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1819        let region_route = new_test_region_route();
1820        let region_routes = &vec![region_route.clone()];
1821        let table_info = new_test_table_info();
1822        let region_wal_options = create_mock_region_wal_options()
1823            .into_iter()
1824            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1825            .collect::<HashMap<_, _>>();
1826
1827        // creates metadata.
1828        create_physical_table_metadata(
1829            &table_metadata_manager,
1830            table_info.clone(),
1831            region_routes.clone(),
1832            region_wal_options.clone(),
1833        )
1834        .await
1835        .unwrap();
1836
1837        // if metadata was already created, it should be ok.
1838        assert!(
1839            create_physical_table_metadata(
1840                &table_metadata_manager,
1841                table_info.clone(),
1842                region_routes.clone(),
1843                region_wal_options.clone(),
1844            )
1845            .await
1846            .is_ok()
1847        );
1848
1849        let mut modified_region_routes = region_routes.clone();
1850        modified_region_routes.push(region_route.clone());
1851        // if remote metadata was exists, it should return an error.
1852        assert!(
1853            create_physical_table_metadata(
1854                &table_metadata_manager,
1855                table_info.clone(),
1856                modified_region_routes,
1857                region_wal_options.clone(),
1858            )
1859            .await
1860            .is_err()
1861        );
1862
1863        let (remote_table_info, remote_table_route) = table_metadata_manager
1864            .get_full_table_info(10)
1865            .await
1866            .unwrap();
1867
1868        assert_eq!(
1869            remote_table_info.unwrap().into_inner().table_info,
1870            table_info
1871        );
1872        assert_eq!(
1873            remote_table_route
1874                .unwrap()
1875                .into_inner()
1876                .region_routes()
1877                .unwrap(),
1878            region_routes
1879        );
1880
1881        for i in 0..2 {
1882            let region_number = i as u32;
1883            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1884            let topic = format!("greptimedb_topic{}", i);
1885            let regions = table_metadata_manager
1886                .topic_region_manager
1887                .regions(&topic)
1888                .await
1889                .unwrap()
1890                .into_keys()
1891                .collect::<Vec<_>>();
1892            assert_eq!(regions.len(), 8);
1893            assert!(regions.contains(&region_id));
1894        }
1895    }
1896
1897    #[tokio::test]
1898    async fn test_get_full_table_info_remaps_route_address() {
1899        let mem_kv = Arc::new(MemoryKvBackend::default());
1900        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1901
1902        let mut region_route = new_test_region_route();
1903        region_route.follower_peers = vec![Peer::empty(3)];
1904        let region_routes = vec![region_route];
1905        let table_info = new_test_table_info();
1906        let table_id = table_info.ident.table_id;
1907
1908        create_physical_table_metadata(
1909            &table_metadata_manager,
1910            table_info,
1911            region_routes,
1912            HashMap::new(),
1913        )
1914        .await
1915        .unwrap();
1916
1917        mem_kv
1918            .put(PutRequest {
1919                key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1920                value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1921                    .try_as_raw_value()
1922                    .unwrap(),
1923                ..Default::default()
1924            })
1925            .await
1926            .unwrap();
1927        mem_kv
1928            .put(PutRequest {
1929                key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1930                value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1931                    .try_as_raw_value()
1932                    .unwrap(),
1933                ..Default::default()
1934            })
1935            .await
1936            .unwrap();
1937
1938        let (_, table_route) = table_metadata_manager
1939            .get_full_table_info(table_id)
1940            .await
1941            .unwrap();
1942        let table_route = table_route.unwrap().into_inner();
1943        let region_routes = table_route.region_routes().unwrap();
1944
1945        assert_eq!(
1946            region_routes[0].leader_peer.as_ref().unwrap().addr,
1947            "new-a2"
1948        );
1949        assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1950    }
1951
1952    #[tokio::test]
1953    async fn test_get_full_table_info_with_read_only_kv_backend() {
1954        let mem_kv = Arc::new(MemoryKvBackend::default());
1955        let writable_manager = TableMetadataManager::new(mem_kv.clone());
1956
1957        let region_routes = vec![new_test_region_route()];
1958        let table_info = new_test_table_info();
1959        let table_id = table_info.ident.table_id;
1960
1961        create_physical_table_metadata(
1962            &writable_manager,
1963            table_info.clone(),
1964            region_routes.clone(),
1965            HashMap::new(),
1966        )
1967        .await
1968        .unwrap();
1969
1970        let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
1971        let read_only_manager = TableMetadataManager::new(read_only_kv);
1972
1973        let (remote_table_info, remote_table_route) = read_only_manager
1974            .get_full_table_info(table_id)
1975            .await
1976            .unwrap();
1977
1978        assert_eq!(
1979            remote_table_info.unwrap().into_inner().table_info,
1980            table_info
1981        );
1982        assert_eq!(
1983            remote_table_route
1984                .unwrap()
1985                .into_inner()
1986                .region_routes()
1987                .unwrap(),
1988            &region_routes
1989        );
1990    }
1991
1992    #[tokio::test]
1993    async fn test_create_logic_tables_metadata() {
1994        let mem_kv = Arc::new(MemoryKvBackend::default());
1995        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1996        let region_route = new_test_region_route();
1997        let region_routes = vec![region_route.clone()];
1998        let table_info = new_test_table_info();
1999        let table_id = table_info.ident.table_id;
2000        let table_route_value = TableRouteValue::physical(region_routes.clone());
2001
2002        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
2003        // creates metadata.
2004        table_metadata_manager
2005            .create_logical_tables_metadata(tables_data.clone())
2006            .await
2007            .unwrap();
2008
2009        // if metadata was already created, it should be ok.
2010        assert!(
2011            table_metadata_manager
2012                .create_logical_tables_metadata(tables_data)
2013                .await
2014                .is_ok()
2015        );
2016
2017        let mut modified_region_routes = region_routes.clone();
2018        modified_region_routes.push(new_region_route(2, 3));
2019        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
2020        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
2021        // if remote metadata was exists, it should return an error.
2022        assert!(
2023            table_metadata_manager
2024                .create_logical_tables_metadata(modified_tables_data)
2025                .await
2026                .is_err()
2027        );
2028
2029        let (remote_table_info, remote_table_route) = table_metadata_manager
2030            .get_full_table_info(table_id)
2031            .await
2032            .unwrap();
2033
2034        assert_eq!(
2035            remote_table_info.unwrap().into_inner().table_info,
2036            table_info
2037        );
2038        assert_eq!(
2039            remote_table_route
2040                .unwrap()
2041                .into_inner()
2042                .region_routes()
2043                .unwrap(),
2044            &region_routes
2045        );
2046    }
2047
2048    #[tokio::test]
2049    async fn test_create_many_logical_tables_metadata() {
2050        let kv_backend = Arc::new(MemoryKvBackend::default());
2051        let table_metadata_manager = TableMetadataManager::new(kv_backend);
2052
2053        let mut tables_data = vec![];
2054        for i in 0..128 {
2055            let table_id = i + 1;
2056            let regin_number = table_id * 3;
2057            let region_id = RegionId::new(table_id, regin_number);
2058            let region_route = new_region_route(region_id.as_u64(), 2);
2059            let region_routes = vec![region_route.clone()];
2060            let table_info = test_utils::new_test_table_info_with_name(
2061                table_id,
2062                &format!("my_table_{}", table_id),
2063            );
2064            let table_route_value = TableRouteValue::physical(region_routes.clone());
2065
2066            tables_data.push((table_info, table_route_value));
2067        }
2068
2069        // creates metadata.
2070        table_metadata_manager
2071            .create_logical_tables_metadata(tables_data)
2072            .await
2073            .unwrap();
2074    }
2075
2076    #[tokio::test]
2077    async fn test_delete_table_metadata() {
2078        let mem_kv = Arc::new(MemoryKvBackend::default());
2079        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2080        let region_route = new_test_region_route();
2081        let region_routes = &vec![region_route.clone()];
2082        let table_info = new_test_table_info();
2083        let table_id = table_info.ident.table_id;
2084        let datanode_id = 2;
2085        let region_wal_options = create_mock_region_wal_options();
2086        let serialized_region_wal_options = region_wal_options
2087            .iter()
2088            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2089            .collect::<HashMap<_, _>>();
2090
2091        // creates metadata.
2092        create_physical_table_metadata(
2093            &table_metadata_manager,
2094            table_info.clone(),
2095            region_routes.clone(),
2096            serialized_region_wal_options,
2097        )
2098        .await
2099        .unwrap();
2100
2101        let table_name = TableName::new(
2102            table_info.catalog_name,
2103            table_info.schema_name,
2104            table_info.name,
2105        );
2106        let table_route_value = &TableRouteValue::physical(region_routes.clone());
2107        // deletes metadata.
2108        table_metadata_manager
2109            .delete_table_metadata(
2110                table_id,
2111                &table_name,
2112                table_route_value,
2113                &region_wal_options,
2114            )
2115            .await
2116            .unwrap();
2117        // Should be ignored.
2118        table_metadata_manager
2119            .delete_table_metadata(
2120                table_id,
2121                &table_name,
2122                table_route_value,
2123                &region_wal_options,
2124            )
2125            .await
2126            .unwrap();
2127        assert!(
2128            table_metadata_manager
2129                .table_info_manager()
2130                .get(table_id)
2131                .await
2132                .unwrap()
2133                .is_none()
2134        );
2135        assert!(
2136            table_metadata_manager
2137                .table_route_manager()
2138                .table_route_storage()
2139                .get(table_id)
2140                .await
2141                .unwrap()
2142                .is_none()
2143        );
2144        assert!(
2145            table_metadata_manager
2146                .datanode_table_manager()
2147                .tables(datanode_id)
2148                .try_collect::<Vec<_>>()
2149                .await
2150                .unwrap()
2151                .is_empty()
2152        );
2153        // Checks removed values
2154        let table_info = table_metadata_manager
2155            .table_info_manager()
2156            .get(table_id)
2157            .await
2158            .unwrap();
2159        assert!(table_info.is_none());
2160        let table_route = table_metadata_manager
2161            .table_route_manager()
2162            .table_route_storage()
2163            .get(table_id)
2164            .await
2165            .unwrap();
2166        assert!(table_route.is_none());
2167        // Logical delete removes the topic region mapping as well.
2168        let regions = table_metadata_manager
2169            .topic_region_manager
2170            .regions("greptimedb_topic0")
2171            .await
2172            .unwrap();
2173        assert_eq!(regions.len(), 0);
2174        let regions = table_metadata_manager
2175            .topic_region_manager
2176            .regions("greptimedb_topic1")
2177            .await
2178            .unwrap();
2179        assert_eq!(regions.len(), 0);
2180    }
2181
2182    #[tokio::test]
2183    async fn test_rename_table() {
2184        let mem_kv = Arc::new(MemoryKvBackend::default());
2185        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2186        let region_route = new_test_region_route();
2187        let region_routes = vec![region_route.clone()];
2188        let table_info = new_test_table_info();
2189        let table_id = table_info.ident.table_id;
2190        // creates metadata.
2191        create_physical_table_metadata(
2192            &table_metadata_manager,
2193            table_info.clone(),
2194            region_routes.clone(),
2195            HashMap::new(),
2196        )
2197        .await
2198        .unwrap();
2199
2200        let new_table_name = "another_name".to_string();
2201        let table_info_value =
2202            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2203
2204        table_metadata_manager
2205            .rename_table(&table_info_value, new_table_name.clone())
2206            .await
2207            .unwrap();
2208        // if remote metadata was updated, it should be ok.
2209        table_metadata_manager
2210            .rename_table(&table_info_value, new_table_name.clone())
2211            .await
2212            .unwrap();
2213        let mut modified_table_info = table_info.clone();
2214        modified_table_info.name = "hi".to_string();
2215        let modified_table_info_value =
2216            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2217        // if the table_info_value is wrong, it should return an error.
2218        // The ABA problem.
2219        assert!(
2220            table_metadata_manager
2221                .rename_table(&modified_table_info_value, new_table_name.clone())
2222                .await
2223                .is_err()
2224        );
2225
2226        let old_table_name = TableNameKey::new(
2227            &table_info.catalog_name,
2228            &table_info.schema_name,
2229            &table_info.name,
2230        );
2231        let new_table_name = TableNameKey::new(
2232            &table_info.catalog_name,
2233            &table_info.schema_name,
2234            &new_table_name,
2235        );
2236
2237        assert!(
2238            table_metadata_manager
2239                .table_name_manager()
2240                .get(old_table_name)
2241                .await
2242                .unwrap()
2243                .is_none()
2244        );
2245
2246        assert_eq!(
2247            table_metadata_manager
2248                .table_name_manager()
2249                .get(new_table_name)
2250                .await
2251                .unwrap()
2252                .unwrap()
2253                .table_id(),
2254            table_id
2255        );
2256    }
2257
2258    #[tokio::test]
2259    async fn test_update_table_info() {
2260        let mem_kv = Arc::new(MemoryKvBackend::default());
2261        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2262        let region_route = new_test_region_route();
2263        let region_routes = vec![region_route.clone()];
2264        let table_info = new_test_table_info();
2265        let table_id = table_info.ident.table_id;
2266        // creates metadata.
2267        create_physical_table_metadata(
2268            &table_metadata_manager,
2269            table_info.clone(),
2270            region_routes.clone(),
2271            HashMap::new(),
2272        )
2273        .await
2274        .unwrap();
2275
2276        let mut new_table_info = table_info.clone();
2277        new_table_info.name = "hi".to_string();
2278        let current_table_info_value =
2279            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2280        // should be ok.
2281        table_metadata_manager
2282            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2283            .await
2284            .unwrap();
2285        // if table info was updated, it should be ok.
2286        table_metadata_manager
2287            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2288            .await
2289            .unwrap();
2290
2291        // updated table_info should equal the `new_table_info`
2292        let updated_table_info = table_metadata_manager
2293            .table_info_manager()
2294            .get(table_id)
2295            .await
2296            .unwrap()
2297            .unwrap()
2298            .into_inner();
2299        assert_eq!(updated_table_info.table_info, new_table_info);
2300
2301        let mut wrong_table_info = table_info.clone();
2302        wrong_table_info.name = "wrong".to_string();
2303        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2304            current_table_info_value.update(wrong_table_info),
2305        );
2306        // if the current_table_info_value is wrong, it should return an error.
2307        // The ABA problem.
2308        assert!(
2309            table_metadata_manager
2310                .update_table_info(&wrong_table_info_value, None, new_table_info)
2311                .await
2312                .is_err()
2313        )
2314    }
2315
2316    #[tokio::test]
2317    async fn test_update_table_leader_region_status() {
2318        let mem_kv = Arc::new(MemoryKvBackend::default());
2319        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2320        let datanode = 1;
2321        let region_routes = vec![
2322            RegionRoute {
2323                region: Region {
2324                    id: 1.into(),
2325                    name: "r1".to_string(),
2326                    attrs: BTreeMap::new(),
2327                    partition_expr: Default::default(),
2328                },
2329                leader_peer: Some(Peer::new(datanode, "a2")),
2330                leader_state: Some(LeaderState::Downgrading),
2331                follower_peers: vec![],
2332                leader_down_since: Some(current_time_millis()),
2333                write_route_policy: None,
2334            },
2335            RegionRoute {
2336                region: Region {
2337                    id: 2.into(),
2338                    name: "r2".to_string(),
2339                    attrs: BTreeMap::new(),
2340                    partition_expr: Default::default(),
2341                },
2342                leader_peer: Some(Peer::new(datanode, "a1")),
2343                leader_state: None,
2344                follower_peers: vec![],
2345                leader_down_since: None,
2346                write_route_policy: None,
2347            },
2348        ];
2349        let table_info = new_test_table_info();
2350        let table_id = table_info.ident.table_id;
2351        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2352            TableRouteValue::physical(region_routes.clone()),
2353        );
2354
2355        // creates metadata.
2356        create_physical_table_metadata(
2357            &table_metadata_manager,
2358            table_info.clone(),
2359            region_routes.clone(),
2360            HashMap::new(),
2361        )
2362        .await
2363        .unwrap();
2364
2365        table_metadata_manager
2366            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2367                if region_route.leader_state.is_some() {
2368                    None
2369                } else {
2370                    Some(Some(LeaderState::Downgrading))
2371                }
2372            })
2373            .await
2374            .unwrap();
2375
2376        let updated_route_value = table_metadata_manager
2377            .table_route_manager()
2378            .table_route_storage()
2379            .get(table_id)
2380            .await
2381            .unwrap()
2382            .unwrap();
2383
2384        assert_eq!(
2385            updated_route_value.region_routes().unwrap()[0].leader_state,
2386            Some(LeaderState::Downgrading)
2387        );
2388
2389        assert!(
2390            updated_route_value.region_routes().unwrap()[0]
2391                .leader_down_since
2392                .is_some()
2393        );
2394
2395        assert_eq!(
2396            updated_route_value.region_routes().unwrap()[1].leader_state,
2397            Some(LeaderState::Downgrading)
2398        );
2399        assert!(
2400            updated_route_value.region_routes().unwrap()[1]
2401                .leader_down_since
2402                .is_some()
2403        );
2404    }
2405
2406    async fn assert_datanode_table(
2407        table_metadata_manager: &TableMetadataManager,
2408        table_id: u32,
2409        region_routes: &[RegionRoute],
2410    ) {
2411        let region_distribution = region_distribution(region_routes);
2412        for (datanode, regions) in region_distribution {
2413            let got = table_metadata_manager
2414                .datanode_table_manager()
2415                .get(&DatanodeTableKey::new(datanode, table_id))
2416                .await
2417                .unwrap()
2418                .unwrap();
2419
2420            assert_eq!(got.regions, regions.leader_regions);
2421            assert_eq!(got.follower_regions, regions.follower_regions);
2422        }
2423    }
2424
2425    #[tokio::test]
2426    async fn test_update_table_route() {
2427        let mem_kv = Arc::new(MemoryKvBackend::default());
2428        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2429        let region_route = new_test_region_route();
2430        let region_routes = vec![region_route.clone()];
2431        let table_info = new_test_table_info();
2432        let table_id = table_info.ident.table_id;
2433        let engine = table_info.meta.engine.as_str();
2434        let region_storage_path =
2435            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2436        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2437            TableRouteValue::physical(region_routes.clone()),
2438        );
2439
2440        // creates metadata.
2441        create_physical_table_metadata(
2442            &table_metadata_manager,
2443            table_info.clone(),
2444            region_routes.clone(),
2445            HashMap::new(),
2446        )
2447        .await
2448        .unwrap();
2449
2450        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2451        let new_region_routes = vec![
2452            new_region_route(1, 1),
2453            new_region_route(2, 2),
2454            new_region_route(3, 3),
2455        ];
2456        // it should be ok.
2457        table_metadata_manager
2458            .update_table_route(
2459                table_id,
2460                RegionInfo {
2461                    engine: engine.to_string(),
2462                    region_storage_path: region_storage_path.clone(),
2463                    region_options: HashMap::new(),
2464                    region_wal_options: HashMap::new(),
2465                },
2466                &current_table_route_value,
2467                new_region_routes.clone(),
2468                &HashMap::new(),
2469                &HashMap::new(),
2470            )
2471            .await
2472            .unwrap();
2473        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2474
2475        // if the table route was updated. it should be ok.
2476        table_metadata_manager
2477            .update_table_route(
2478                table_id,
2479                RegionInfo {
2480                    engine: engine.to_string(),
2481                    region_storage_path: region_storage_path.clone(),
2482                    region_options: HashMap::new(),
2483                    region_wal_options: HashMap::new(),
2484                },
2485                &current_table_route_value,
2486                new_region_routes.clone(),
2487                &HashMap::new(),
2488                &HashMap::new(),
2489            )
2490            .await
2491            .unwrap();
2492
2493        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2494            current_table_route_value
2495                .inner
2496                .update(new_region_routes.clone())
2497                .unwrap(),
2498        );
2499        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2500        // it should be ok.
2501        table_metadata_manager
2502            .update_table_route(
2503                table_id,
2504                RegionInfo {
2505                    engine: engine.to_string(),
2506                    region_storage_path: region_storage_path.clone(),
2507                    region_options: HashMap::new(),
2508                    region_wal_options: HashMap::new(),
2509                },
2510                &current_table_route_value,
2511                new_region_routes.clone(),
2512                &HashMap::new(),
2513                &HashMap::new(),
2514            )
2515            .await
2516            .unwrap();
2517        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2518
2519        // if the current_table_route_value is wrong, it should return an error.
2520        // The ABA problem.
2521        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2522            current_table_route_value
2523                .update(vec![
2524                    new_region_route(1, 1),
2525                    new_region_route(2, 2),
2526                    new_region_route(3, 3),
2527                    new_region_route(4, 4),
2528                ])
2529                .unwrap(),
2530        );
2531        assert!(
2532            table_metadata_manager
2533                .update_table_route(
2534                    table_id,
2535                    RegionInfo {
2536                        engine: engine.to_string(),
2537                        region_storage_path: region_storage_path.clone(),
2538                        region_options: HashMap::new(),
2539                        region_wal_options: HashMap::new(),
2540                    },
2541                    &wrong_table_route_value,
2542                    new_region_routes,
2543                    &HashMap::new(),
2544                    &HashMap::new(),
2545                )
2546                .await
2547                .is_err()
2548        );
2549    }
2550
2551    #[tokio::test]
2552    async fn test_update_table_route_with_topic_region_mapping() {
2553        let mem_kv = Arc::new(MemoryKvBackend::default());
2554        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2555        let region_route = new_test_region_route();
2556        let region_routes = vec![region_route.clone()];
2557        let table_info = new_test_table_info();
2558        let table_id = table_info.ident.table_id;
2559        let engine = table_info.meta.engine.as_str();
2560        let region_storage_path =
2561            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2562
2563        // Create initial metadata with Kafka WAL options
2564        let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2565            (
2566                1,
2567                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2568                    topic: "topic_1".to_string(),
2569                }))
2570                .unwrap(),
2571            ),
2572            (
2573                2,
2574                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2575                    topic: "topic_2".to_string(),
2576                }))
2577                .unwrap(),
2578            ),
2579        ]
2580        .into_iter()
2581        .collect();
2582
2583        create_physical_table_metadata(
2584            &table_metadata_manager,
2585            table_info.clone(),
2586            region_routes.clone(),
2587            old_region_wal_options.clone(),
2588        )
2589        .await
2590        .unwrap();
2591
2592        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2593            TableRouteValue::physical(region_routes.clone()),
2594        );
2595
2596        // Verify initial topic region mappings exist
2597        let region_id_1 = RegionId::new(table_id, 1);
2598        let region_id_2 = RegionId::new(table_id, 2);
2599        let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2600        let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2601        assert!(
2602            table_metadata_manager
2603                .topic_region_manager
2604                .get(topic_1_key.clone())
2605                .await
2606                .unwrap()
2607                .is_some()
2608        );
2609        assert!(
2610            table_metadata_manager
2611                .topic_region_manager
2612                .get(topic_2_key.clone())
2613                .await
2614                .unwrap()
2615                .is_some()
2616        );
2617
2618        // Test 1: Add new region with new topic
2619        let new_region_routes = vec![
2620            new_region_route(1, 1),
2621            new_region_route(2, 2),
2622            new_region_route(3, 3), // New region
2623        ];
2624        let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2625            (
2626                1,
2627                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2628                    topic: "topic_1".to_string(), // Unchanged
2629                }))
2630                .unwrap(),
2631            ),
2632            (
2633                2,
2634                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2635                    topic: "topic_2".to_string(), // Unchanged
2636                }))
2637                .unwrap(),
2638            ),
2639            (
2640                3,
2641                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2642                    topic: "topic_3".to_string(), // New topic
2643                }))
2644                .unwrap(),
2645            ),
2646        ]
2647        .into_iter()
2648        .collect();
2649        let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2650            current_table_route_value
2651                .inner
2652                .update(new_region_routes.clone())
2653                .unwrap(),
2654        );
2655        table_metadata_manager
2656            .update_table_route(
2657                table_id,
2658                RegionInfo {
2659                    engine: engine.to_string(),
2660                    region_storage_path: region_storage_path.clone(),
2661                    region_options: HashMap::new(),
2662                    region_wal_options: old_region_wal_options.clone(),
2663                },
2664                &current_table_route_value,
2665                new_region_routes.clone(),
2666                &HashMap::new(),
2667                &new_region_wal_options,
2668            )
2669            .await
2670            .unwrap();
2671        // Verify new topic region mapping was created
2672        let region_id_3 = RegionId::new(table_id, 3);
2673        let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2674        assert!(
2675            table_metadata_manager
2676                .topic_region_manager
2677                .get(topic_3_key)
2678                .await
2679                .unwrap()
2680                .is_some()
2681        );
2682        // Test 2: Remove a region and change topic for another
2683        let newer_region_routes = vec![
2684            new_region_route(1, 1),
2685            // Region 2 removed
2686            // Region 3 now has different topic
2687        ];
2688        let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2689            (
2690                1,
2691                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2692                    topic: "topic_1".to_string(), // Unchanged
2693                }))
2694                .unwrap(),
2695            ),
2696            (
2697                3,
2698                serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2699                    topic: "topic_3_new".to_string(), // Changed topic
2700                }))
2701                .unwrap(),
2702            ),
2703        ]
2704        .into_iter()
2705        .collect();
2706        table_metadata_manager
2707            .update_table_route(
2708                table_id,
2709                RegionInfo {
2710                    engine: engine.to_string(),
2711                    region_storage_path: region_storage_path.clone(),
2712                    region_options: HashMap::new(),
2713                    region_wal_options: new_region_wal_options.clone(),
2714                },
2715                &current_table_route_value_updated,
2716                newer_region_routes.clone(),
2717                &HashMap::new(),
2718                &newer_region_wal_options,
2719            )
2720            .await
2721            .unwrap();
2722        // Verify region 2 mapping was deleted
2723        let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2724        assert!(
2725            table_metadata_manager
2726                .topic_region_manager
2727                .get(topic_2_key_new)
2728                .await
2729                .unwrap()
2730                .is_none()
2731        );
2732        // Verify region 3 old topic mapping was deleted
2733        let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2734        assert!(
2735            table_metadata_manager
2736                .topic_region_manager
2737                .get(topic_3_key_old)
2738                .await
2739                .unwrap()
2740                .is_none()
2741        );
2742        // Verify region 3 new topic mapping was created
2743        let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2744        assert!(
2745            table_metadata_manager
2746                .topic_region_manager
2747                .get(topic_3_key_new)
2748                .await
2749                .unwrap()
2750                .is_some()
2751        );
2752        // Verify region 1 mapping still exists (unchanged)
2753        assert!(
2754            table_metadata_manager
2755                .topic_region_manager
2756                .get(topic_1_key)
2757                .await
2758                .unwrap()
2759                .is_some()
2760        );
2761    }
2762
2763    #[tokio::test]
2764    async fn test_destroy_table_metadata() {
2765        let mem_kv = Arc::new(MemoryKvBackend::default());
2766        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2767        let table_id = 1025;
2768        let table_name = "foo";
2769        let task = test_create_table_task(table_name, table_id);
2770        let options = create_mixed_region_wal_options();
2771        let serialized_options = options
2772            .iter()
2773            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2774            .collect::<HashMap<_, _>>();
2775        table_metadata_manager
2776            .create_table_metadata(
2777                task.table_info,
2778                TableRouteValue::physical(vec![
2779                    RegionRoute {
2780                        region: Region::new_test(RegionId::new(table_id, 1)),
2781                        leader_peer: Some(Peer::empty(1)),
2782                        follower_peers: vec![Peer::empty(5)],
2783                        leader_state: None,
2784                        leader_down_since: None,
2785                        write_route_policy: None,
2786                    },
2787                    RegionRoute {
2788                        region: Region::new_test(RegionId::new(table_id, 2)),
2789                        leader_peer: Some(Peer::empty(2)),
2790                        follower_peers: vec![Peer::empty(4)],
2791                        leader_state: None,
2792                        leader_down_since: None,
2793                        write_route_policy: None,
2794                    },
2795                    RegionRoute {
2796                        region: Region::new_test(RegionId::new(table_id, 3)),
2797                        leader_peer: Some(Peer::empty(3)),
2798                        follower_peers: vec![],
2799                        leader_state: None,
2800                        leader_down_since: None,
2801                        write_route_policy: None,
2802                    },
2803                ]),
2804                serialized_options,
2805            )
2806            .await
2807            .unwrap();
2808        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2809        let table_route_value = table_metadata_manager
2810            .table_route_manager
2811            .table_route_storage()
2812            .get_with_raw_bytes(table_id)
2813            .await
2814            .unwrap()
2815            .unwrap();
2816        table_metadata_manager
2817            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2818            .await
2819            .unwrap();
2820        assert!(mem_kv.is_empty());
2821    }
2822
2823    #[tokio::test]
2824    async fn test_restore_table_metadata() {
2825        let mem_kv = Arc::new(MemoryKvBackend::default());
2826        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2827        let table_id = 1025;
2828        let table_name = "foo";
2829        let task = test_create_table_task(table_name, table_id);
2830        let options = create_mixed_region_wal_options();
2831        let serialized_options = options
2832            .iter()
2833            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2834            .collect::<HashMap<_, _>>();
2835        table_metadata_manager
2836            .create_table_metadata(
2837                task.table_info,
2838                TableRouteValue::physical(vec![
2839                    RegionRoute {
2840                        region: Region::new_test(RegionId::new(table_id, 1)),
2841                        leader_peer: Some(Peer::empty(1)),
2842                        follower_peers: vec![Peer::empty(5)],
2843                        leader_state: None,
2844                        leader_down_since: None,
2845                        write_route_policy: None,
2846                    },
2847                    RegionRoute {
2848                        region: Region::new_test(RegionId::new(table_id, 2)),
2849                        leader_peer: Some(Peer::empty(2)),
2850                        follower_peers: vec![Peer::empty(4)],
2851                        leader_state: None,
2852                        leader_down_since: None,
2853                        write_route_policy: None,
2854                    },
2855                    RegionRoute {
2856                        region: Region::new_test(RegionId::new(table_id, 3)),
2857                        leader_peer: Some(Peer::empty(3)),
2858                        follower_peers: vec![],
2859                        leader_state: None,
2860                        leader_down_since: None,
2861                        write_route_policy: None,
2862                    },
2863                ]),
2864                serialized_options,
2865            )
2866            .await
2867            .unwrap();
2868        let expected_result = mem_kv.dump();
2869        let table_route_value = table_metadata_manager
2870            .table_route_manager
2871            .table_route_storage()
2872            .get_with_raw_bytes(table_id)
2873            .await
2874            .unwrap()
2875            .unwrap();
2876        let region_routes = table_route_value.region_routes().unwrap();
2877        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2878        let table_route_value = TableRouteValue::physical(region_routes.clone());
2879        table_metadata_manager
2880            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2881            .await
2882            .unwrap();
2883        table_metadata_manager
2884            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2885            .await
2886            .unwrap();
2887        let kvs = mem_kv.dump();
2888        assert_eq!(kvs, expected_result);
2889        // Should be ignored.
2890        table_metadata_manager
2891            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2892            .await
2893            .unwrap();
2894        let kvs = mem_kv.dump();
2895        assert_eq!(kvs, expected_result);
2896    }
2897
2898    #[tokio::test]
2899    async fn test_dropped_table_metadata_enumeration_and_lookup() {
2900        let mem_kv = Arc::new(MemoryKvBackend::default());
2901        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2902        let table_id = 1025;
2903        let table_name = "foo";
2904        let task = test_create_table_task(table_name, table_id);
2905        let table_info = task.table_info.clone();
2906        let options = create_mixed_region_wal_options();
2907        let serialized_options = options
2908            .iter()
2909            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2910            .collect::<HashMap<_, _>>();
2911        table_metadata_manager
2912            .create_table_metadata(
2913                table_info.clone(),
2914                TableRouteValue::physical(vec![
2915                    RegionRoute {
2916                        region: Region::new_test(RegionId::new(table_id, 1)),
2917                        leader_peer: Some(Peer::empty(1)),
2918                        follower_peers: vec![Peer::empty(5)],
2919                        leader_state: None,
2920                        leader_down_since: None,
2921                        write_route_policy: None,
2922                    },
2923                    RegionRoute {
2924                        region: Region::new_test(RegionId::new(table_id, 2)),
2925                        leader_peer: Some(Peer::empty(2)),
2926                        follower_peers: vec![Peer::empty(4)],
2927                        leader_state: None,
2928                        leader_down_since: None,
2929                        write_route_policy: None,
2930                    },
2931                    RegionRoute {
2932                        region: Region::new_test(RegionId::new(table_id, 3)),
2933                        leader_peer: Some(Peer::empty(3)),
2934                        follower_peers: vec![],
2935                        leader_state: None,
2936                        leader_down_since: None,
2937                        write_route_policy: None,
2938                    },
2939                ]),
2940                serialized_options,
2941            )
2942            .await
2943            .unwrap();
2944        let table_route_value = table_metadata_manager
2945            .table_route_manager
2946            .table_route_storage()
2947            .get_with_raw_bytes(table_id)
2948            .await
2949            .unwrap()
2950            .unwrap();
2951        let region_routes = table_route_value.region_routes().unwrap();
2952        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2953        let table_route_value = TableRouteValue::physical(region_routes.clone());
2954
2955        table_metadata_manager
2956            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2957            .await
2958            .unwrap();
2959
2960        let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
2961        assert_eq!(dropped_tables.len(), 1);
2962        assert_eq!(dropped_tables[0].table_id, table_id);
2963        assert_eq!(dropped_tables[0].table_name, table_name);
2964
2965        let dropped_table = table_metadata_manager
2966            .get_dropped_table(&table_name)
2967            .await
2968            .unwrap()
2969            .unwrap();
2970        assert_eq!(dropped_table.table_id, table_id);
2971        assert_eq!(dropped_table.table_name, table_name);
2972        assert_eq!(dropped_table.table_info_value.table_info, table_info);
2973        assert_eq!(
2974            dropped_table.table_route_value.region_routes().unwrap(),
2975            region_routes
2976        );
2977        assert_eq!(dropped_table.region_wal_options, options);
2978
2979        let dropped_table_by_id = table_metadata_manager
2980            .get_dropped_table_by_id(table_id)
2981            .await
2982            .unwrap()
2983            .unwrap();
2984        assert_eq!(dropped_table_by_id.table_id, table_id);
2985        assert_eq!(dropped_table_by_id.table_name, table_name);
2986        assert_eq!(dropped_table_by_id.table_info_value.table_info, table_info);
2987        assert_eq!(
2988            dropped_table_by_id
2989                .table_route_value
2990                .region_routes()
2991                .unwrap(),
2992            region_routes
2993        );
2994        assert_eq!(dropped_table_by_id.region_wal_options, options);
2995    }
2996
2997    #[tokio::test]
2998    async fn test_dropped_table_lookup_survives_live_name_recreation() {
2999        let mem_kv = Arc::new(MemoryKvBackend::default());
3000        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3001        let dropped_table_id = 1025;
3002        let recreated_table_id = 1026;
3003        let table_name = "foo";
3004        let dropped_task = test_create_table_task(table_name, dropped_table_id);
3005        let dropped_table_info = dropped_task.table_info.clone();
3006        let options = create_mock_region_wal_options();
3007        let serialized_options = options
3008            .iter()
3009            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
3010            .collect::<HashMap<_, _>>();
3011        table_metadata_manager
3012            .create_table_metadata(
3013                dropped_table_info.clone(),
3014                TableRouteValue::physical(vec![
3015                    RegionRoute {
3016                        region: Region::new_test(RegionId::new(dropped_table_id, 1)),
3017                        leader_peer: Some(Peer::empty(1)),
3018                        follower_peers: vec![Peer::empty(5)],
3019                        leader_state: None,
3020                        leader_down_since: None,
3021                        write_route_policy: None,
3022                    },
3023                    RegionRoute {
3024                        region: Region::new_test(RegionId::new(dropped_table_id, 2)),
3025                        leader_peer: Some(Peer::empty(2)),
3026                        follower_peers: vec![Peer::empty(4)],
3027                        leader_state: None,
3028                        leader_down_since: None,
3029                        write_route_policy: None,
3030                    },
3031                ]),
3032                serialized_options.clone(),
3033            )
3034            .await
3035            .unwrap();
3036
3037        let dropped_table_name =
3038            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3039        let dropped_table_route = table_metadata_manager
3040            .table_route_manager
3041            .table_route_storage()
3042            .get_with_raw_bytes(dropped_table_id)
3043            .await
3044            .unwrap()
3045            .unwrap();
3046        let dropped_table_route =
3047            TableRouteValue::physical(dropped_table_route.region_routes().unwrap().clone());
3048        table_metadata_manager
3049            .delete_table_metadata(
3050                dropped_table_id,
3051                &dropped_table_name,
3052                &dropped_table_route,
3053                &options,
3054            )
3055            .await
3056            .unwrap();
3057
3058        let recreated_task = test_create_table_task(table_name, recreated_table_id);
3059        table_metadata_manager
3060            .create_table_metadata(
3061                recreated_task.table_info,
3062                TableRouteValue::physical(vec![RegionRoute {
3063                    region: Region::new_test(RegionId::new(recreated_table_id, 1)),
3064                    leader_peer: Some(Peer::empty(4)),
3065                    follower_peers: vec![],
3066                    leader_state: None,
3067                    leader_down_since: None,
3068                    write_route_policy: None,
3069                }]),
3070                serialized_options,
3071            )
3072            .await
3073            .unwrap();
3074
3075        assert_eq!(
3076            table_metadata_manager
3077                .table_name_manager()
3078                .get(TableNameKey::from(&dropped_table_name))
3079                .await
3080                .unwrap()
3081                .unwrap()
3082                .table_id(),
3083            recreated_table_id
3084        );
3085
3086        let dropped_table = table_metadata_manager
3087            .get_dropped_table(&dropped_table_name)
3088            .await
3089            .unwrap()
3090            .unwrap();
3091        assert_eq!(dropped_table.table_id, dropped_table_id);
3092        assert_eq!(dropped_table.table_name, dropped_table_name);
3093        assert_eq!(
3094            dropped_table.table_info_value.table_info,
3095            dropped_table_info
3096        );
3097
3098        let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
3099        assert_eq!(dropped_tables.len(), 1);
3100        assert_eq!(dropped_tables[0].table_id, dropped_table_id);
3101        assert_eq!(dropped_tables[0].table_name, dropped_table_name);
3102    }
3103
3104    #[tokio::test]
3105    async fn test_dropped_table_lookup_ignores_unrelated_malformed_datanode_tombstones() {
3106        let mem_kv = Arc::new(MemoryKvBackend::default());
3107        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3108        let table_id = 1025;
3109        let table_name = "foo";
3110        let task = test_create_table_task(table_name, table_id);
3111        let table_info = task.table_info.clone();
3112        let options = create_mixed_region_wal_options();
3113        let serialized_options = options
3114            .iter()
3115            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
3116            .collect::<HashMap<_, _>>();
3117        table_metadata_manager
3118            .create_table_metadata(
3119                table_info.clone(),
3120                TableRouteValue::physical(vec![
3121                    RegionRoute {
3122                        region: Region::new_test(RegionId::new(table_id, 1)),
3123                        leader_peer: Some(Peer::empty(1)),
3124                        follower_peers: vec![Peer::empty(5)],
3125                        leader_state: None,
3126                        leader_down_since: None,
3127                        write_route_policy: None,
3128                    },
3129                    RegionRoute {
3130                        region: Region::new_test(RegionId::new(table_id, 2)),
3131                        leader_peer: Some(Peer::empty(2)),
3132                        follower_peers: vec![Peer::empty(4)],
3133                        leader_state: None,
3134                        leader_down_since: None,
3135                        write_route_policy: None,
3136                    },
3137                ]),
3138                serialized_options,
3139            )
3140            .await
3141            .unwrap();
3142
3143        let table_route_value = table_metadata_manager
3144            .table_route_manager
3145            .table_route_storage()
3146            .get_with_raw_bytes(table_id)
3147            .await
3148            .unwrap()
3149            .unwrap();
3150        let region_routes = table_route_value.region_routes().unwrap();
3151        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3152        let table_route_value = TableRouteValue::physical(region_routes.clone());
3153
3154        table_metadata_manager
3155            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
3156            .await
3157            .unwrap();
3158
3159        mem_kv
3160            .put(
3161                PutRequest::new()
3162                    .with_key("__tombstone/__dn_table/not-a-datanode-table-key")
3163                    .with_value("malformed"),
3164            )
3165            .await
3166            .unwrap();
3167
3168        let dropped_table = table_metadata_manager
3169            .get_dropped_table(&table_name)
3170            .await
3171            .unwrap()
3172            .unwrap();
3173        assert_eq!(dropped_table.table_id, table_id);
3174        assert_eq!(dropped_table.table_name, table_name);
3175        assert_eq!(dropped_table.table_info_value.table_info, table_info);
3176        assert_eq!(dropped_table.region_wal_options, options);
3177    }
3178
3179    #[tokio::test]
3180    async fn test_create_update_view_info() {
3181        let mem_kv = Arc::new(MemoryKvBackend::default());
3182        let table_metadata_manager = TableMetadataManager::new(mem_kv);
3183
3184        let view_info = new_test_table_info();
3185
3186        let view_id = view_info.ident.table_id;
3187
3188        let logical_plan: Vec<u8> = vec![1, 2, 3];
3189        let columns = vec!["a".to_string()];
3190        let plan_columns = vec!["number".to_string()];
3191        let table_names = new_test_table_names();
3192        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
3193
3194        // Create metadata
3195        table_metadata_manager
3196            .create_view_metadata(
3197                view_info.clone(),
3198                logical_plan.clone(),
3199                table_names.clone(),
3200                columns.clone(),
3201                plan_columns.clone(),
3202                definition.to_string(),
3203            )
3204            .await
3205            .unwrap();
3206
3207        {
3208            // assert view info
3209            let current_view_info = table_metadata_manager
3210                .view_info_manager()
3211                .get(view_id)
3212                .await
3213                .unwrap()
3214                .unwrap()
3215                .into_inner();
3216            assert_eq!(current_view_info.view_info, logical_plan);
3217            assert_eq!(current_view_info.table_names, table_names);
3218            assert_eq!(current_view_info.definition, definition);
3219            assert_eq!(current_view_info.columns, columns);
3220            assert_eq!(current_view_info.plan_columns, plan_columns);
3221            // assert table info
3222            let current_table_info = table_metadata_manager
3223                .table_info_manager()
3224                .get(view_id)
3225                .await
3226                .unwrap()
3227                .unwrap()
3228                .into_inner();
3229            assert_eq!(current_table_info.table_info, view_info);
3230        }
3231
3232        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
3233        let new_table_names = {
3234            let mut set = HashSet::new();
3235            set.insert(TableName {
3236                catalog_name: "greptime".to_string(),
3237                schema_name: "public".to_string(),
3238                table_name: "b_table".to_string(),
3239            });
3240            set.insert(TableName {
3241                catalog_name: "greptime".to_string(),
3242                schema_name: "public".to_string(),
3243                table_name: "c_table".to_string(),
3244            });
3245            set
3246        };
3247        let new_columns = vec!["b".to_string()];
3248        let new_plan_columns = vec!["number2".to_string()];
3249        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
3250
3251        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
3252            logical_plan.clone().into(),
3253            table_names,
3254            columns,
3255            plan_columns,
3256            definition.to_string(),
3257        ));
3258        // should be ok.
3259        table_metadata_manager
3260            .update_view_info(
3261                view_id,
3262                &current_view_info_value,
3263                new_logical_plan.clone(),
3264                new_table_names.clone(),
3265                new_columns.clone(),
3266                new_plan_columns.clone(),
3267                new_definition.to_string(),
3268            )
3269            .await
3270            .unwrap();
3271        // if table info was updated, it should be ok.
3272        table_metadata_manager
3273            .update_view_info(
3274                view_id,
3275                &current_view_info_value,
3276                new_logical_plan.clone(),
3277                new_table_names.clone(),
3278                new_columns.clone(),
3279                new_plan_columns.clone(),
3280                new_definition.to_string(),
3281            )
3282            .await
3283            .unwrap();
3284
3285        // updated view_info should equal the `new_logical_plan`
3286        let updated_view_info = table_metadata_manager
3287            .view_info_manager()
3288            .get(view_id)
3289            .await
3290            .unwrap()
3291            .unwrap()
3292            .into_inner();
3293        assert_eq!(updated_view_info.view_info, new_logical_plan);
3294        assert_eq!(updated_view_info.table_names, new_table_names);
3295        assert_eq!(updated_view_info.definition, new_definition);
3296        assert_eq!(updated_view_info.columns, new_columns);
3297        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
3298
3299        let wrong_view_info = logical_plan.clone();
3300        let wrong_definition = "wrong_definition";
3301        let wrong_view_info_value =
3302            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
3303                wrong_view_info.into(),
3304                new_table_names.clone(),
3305                new_columns.clone(),
3306                new_plan_columns.clone(),
3307                wrong_definition.to_string(),
3308            ));
3309        // if the current_view_info_value is wrong, it should return an error.
3310        // The ABA problem.
3311        assert!(
3312            table_metadata_manager
3313                .update_view_info(
3314                    view_id,
3315                    &wrong_view_info_value,
3316                    new_logical_plan.clone(),
3317                    new_table_names.clone(),
3318                    vec!["c".to_string()],
3319                    vec!["number3".to_string()],
3320                    wrong_definition.to_string(),
3321                )
3322                .await
3323                .is_err()
3324        );
3325
3326        // The view_info is not changed.
3327        let current_view_info = table_metadata_manager
3328            .view_info_manager()
3329            .get(view_id)
3330            .await
3331            .unwrap()
3332            .unwrap()
3333            .into_inner();
3334        assert_eq!(current_view_info.view_info, new_logical_plan);
3335        assert_eq!(current_view_info.table_names, new_table_names);
3336        assert_eq!(current_view_info.definition, new_definition);
3337        assert_eq!(current_view_info.columns, new_columns);
3338        assert_eq!(current_view_info.plan_columns, new_plan_columns);
3339    }
3340
3341    #[test]
3342    fn test_region_role_set_deserialize() {
3343        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
3344        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3345        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3346        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
3347
3348        let s = r#"[1, 2, 3]"#;
3349        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3350        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3351        assert!(region_role_set.follower_regions.is_empty());
3352    }
3353
3354    #[test]
3355    fn test_region_distribution_deserialize() {
3356        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
3357        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
3358        assert_eq!(region_distribution.len(), 2);
3359        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
3360        assert!(region_distribution[&1].follower_regions.is_empty());
3361        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
3362        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
3363    }
3364}