Skip to main content

common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use futures_util::TryStreamExt;
135use lazy_static::lazy_static;
136use regex::Regex;
137pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
138use serde::de::DeserializeOwned;
139use serde::{Deserialize, Serialize};
140use snafu::{OptionExt, ResultExt, ensure};
141use store_api::storage::RegionNumber;
142use table::metadata::{TableId, TableInfo};
143use table::table_name::TableName;
144use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
145use table_name::{TableNameKey, TableNameManager, TableNameValue};
146use topic_name::TopicNameManager;
147use topic_region::{TopicRegionKey, TopicRegionManager};
148use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
149
150use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
151use self::datanode_table::RegionInfo;
152use self::flow::flow_info::FlowInfoValue;
153use self::flow::flow_name::FlowNameValue;
154use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
155use self::table_route::{TableRouteManager, TableRouteValue};
156use self::tombstone::TombstoneManager;
157use crate::DatanodeId;
158use crate::error::{self, Result, SerdeJsonSnafu};
159use crate::key::flow::flow_state::FlowStateValue;
160use crate::key::node_address::NodeAddressValue;
161use crate::key::table_repart::{TableRepartKey, TableRepartManager};
162use crate::key::table_route::TableRouteKey;
163use crate::key::topic_region::TopicRegionValue;
164use crate::key::txn_helper::TxnOpGetResponseSet;
165use crate::kv_backend::KvBackendRef;
166use crate::kv_backend::txn::{Txn, TxnOp};
167use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
168use crate::rpc::store::BatchDeleteRequest;
169use crate::state_store::PoisonValue;
170use crate::wal_provider::RegionWalOptions;
171
172pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
173pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
174pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
175pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
176pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
177
178pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
179pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
180pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
181pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
182pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
183pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
184pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
185pub const TABLE_REPART_PREFIX: &str = "__table_repart";
186pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
187pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
188// The legacy topic key prefix is used to store the topic name in previous versions.
189pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
190pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
191
192/// The election key.
193pub const ELECTION_KEY: &str = "__metasrv_election";
194/// The root key of metasrv election candidates.
195pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
196
197/// The keys with these prefixes will be loaded into the cache when the leader starts.
198pub const CACHE_KEY_PREFIXES: [&str; 5] = [
199    TABLE_NAME_KEY_PREFIX,
200    CATALOG_NAME_KEY_PREFIX,
201    SCHEMA_NAME_KEY_PREFIX,
202    TABLE_ROUTE_PREFIX,
203    NODE_ADDRESS_PREFIX,
204];
205
206/// A set of regions with the same role.
207#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
208pub struct RegionRoleSet {
209    /// Leader regions.
210    pub leader_regions: Vec<RegionNumber>,
211    /// Follower regions.
212    pub follower_regions: Vec<RegionNumber>,
213}
214
215impl<'de> Deserialize<'de> for RegionRoleSet {
216    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
217    where
218        D: serde::Deserializer<'de>,
219    {
220        #[derive(Deserialize)]
221        #[serde(untagged)]
222        enum RegionRoleSetOrLeaderOnly {
223            Full {
224                leader_regions: Vec<RegionNumber>,
225                follower_regions: Vec<RegionNumber>,
226            },
227            LeaderOnly(Vec<RegionNumber>),
228        }
229        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
230            RegionRoleSetOrLeaderOnly::Full {
231                leader_regions,
232                follower_regions,
233            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
234            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
235                Ok(RegionRoleSet::new(leader_regions, vec![]))
236            }
237        }
238    }
239}
240
241impl RegionRoleSet {
242    /// Create a new region role set.
243    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
244        Self {
245            leader_regions,
246            follower_regions,
247        }
248    }
249
250    /// Add a leader region to the set.
251    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
252        self.leader_regions.push(region_number);
253    }
254
255    /// Add a follower region to the set.
256    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
257        self.follower_regions.push(region_number);
258    }
259
260    /// Sort the regions.
261    pub fn sort(&mut self) {
262        self.follower_regions.sort();
263        self.leader_regions.sort();
264    }
265}
266
267/// The distribution of regions.
268///
269/// The key is the datanode id, the value is the region role set.
270pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
271
272/// The id of flow.
273pub type FlowId = u32;
274/// The partition of flow.
275pub type FlowPartitionId = u32;
276
277lazy_static! {
278    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
279}
280
281lazy_static! {
282    static ref TABLE_INFO_KEY_PATTERN: Regex =
283        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287    static ref VIEW_INFO_KEY_PATTERN: Regex =
288        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
289}
290
291lazy_static! {
292    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
293        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
294}
295
296lazy_static! {
297    pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
298        Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
299}
300
301lazy_static! {
302    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
303        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
304}
305
306lazy_static! {
307    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
308        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
309    ))
310    .unwrap();
311}
312
313lazy_static! {
314    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
315    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
316        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
317    ))
318        .unwrap();
319}
320
321lazy_static! {
322    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
323    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
324        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
325    ))
326        .unwrap();
327}
328
329lazy_static! {
330    static ref NODE_ADDRESS_PATTERN: Regex =
331        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
332}
333
334lazy_static! {
335    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
336        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
337}
338
339lazy_static! {
340    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
341        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
342    ))
343    .unwrap();
344}
345
346/// The key of metadata.
347pub trait MetadataKey<'a, T> {
348    fn to_bytes(&self) -> Vec<u8>;
349
350    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
351}
352
353#[derive(Debug, Clone, PartialEq)]
354pub struct BytesAdapter(Vec<u8>);
355
356impl From<Vec<u8>> for BytesAdapter {
357    fn from(value: Vec<u8>) -> Self {
358        Self(value)
359    }
360}
361
362impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
363    fn to_bytes(&self) -> Vec<u8> {
364        self.0.clone()
365    }
366
367    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
368        Ok(BytesAdapter(bytes.to_vec()))
369    }
370}
371
372pub(crate) trait MetadataKeyGetTxnOp {
373    fn build_get_op(
374        &self,
375    ) -> (
376        TxnOp,
377        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
378    );
379}
380
381pub trait MetadataValue {
382    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
383    where
384        Self: Sized;
385
386    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
387}
388
389pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
390
391pub struct TableMetadataManager {
392    table_name_manager: TableNameManager,
393    table_info_manager: TableInfoManager,
394    view_info_manager: ViewInfoManager,
395    datanode_table_manager: DatanodeTableManager,
396    catalog_manager: CatalogManager,
397    schema_manager: SchemaManager,
398    table_route_manager: TableRouteManager,
399    table_repart_manager: TableRepartManager,
400    tombstone_manager: TombstoneManager,
401    topic_name_manager: TopicNameManager,
402    topic_region_manager: TopicRegionManager,
403    kv_backend: KvBackendRef,
404}
405
406#[macro_export]
407macro_rules! ensure_values {
408    ($got:expr, $expected_value:expr, $name:expr) => {
409        ensure!(
410            $got == $expected_value,
411            error::UnexpectedSnafu {
412                err_msg: format!(
413                    "Reads the different value: {:?} during {}, expected: {:?}",
414                    $got, $name, $expected_value
415                )
416            }
417        );
418    };
419}
420
421/// A struct containing a deserialized value(`inner`) and an original bytes.
422///
423/// - Serialize behaviors:
424///
425/// The `inner` field will be ignored.
426///
427/// - Deserialize behaviors:
428///
429/// The `inner` field will be deserialized from the `bytes` field.
430pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
431    // The original bytes of the inner.
432    bytes: Bytes,
433    // The value was deserialized from the original bytes.
434    inner: T,
435}
436
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub struct DroppedTableName {
439    /// Table id stored in the tombstoned table-name mapping.
440    pub table_id: TableId,
441    /// Original fully qualified table name.
442    pub table_name: TableName,
443}
444
445#[derive(Debug, Clone)]
446pub struct DroppedTableMetadata {
447    /// Table id of the dropped table.
448    pub table_id: TableId,
449    /// Original fully qualified table name.
450    pub table_name: TableName,
451    /// Tombstoned table info value.
452    pub table_info_value: TableInfoValue,
453    /// Tombstoned table route value.
454    pub table_route_value: TableRouteValue,
455    /// Per-region WAL options recovered from tombstoned datanode metadata.
456    pub region_wal_options: HashMap<RegionNumber, WalOptions>,
457}
458
459impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
460    type Target = T;
461
462    fn deref(&self) -> &Self::Target {
463        &self.inner
464    }
465}
466
467impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
468    fn deref_mut(&mut self) -> &mut Self::Target {
469        &mut self.inner
470    }
471}
472
473impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475        write!(
476            f,
477            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
478            self.inner, self.bytes
479        )
480    }
481}
482
483impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
484    /// - Serialize behaviors:
485    ///
486    /// The `inner` field will be ignored.
487    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
488    where
489        S: serde::Serializer,
490    {
491        // Safety: The original bytes are always JSON encoded.
492        // It's more efficiently than `serialize_bytes`.
493        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
494    }
495}
496
497impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
498    for DeserializedValueWithBytes<T>
499{
500    /// - Deserialize behaviors:
501    ///
502    /// The `inner` field will be deserialized from the `bytes` field.
503    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
504    where
505        D: serde::Deserializer<'de>,
506    {
507        let buf = String::deserialize(deserializer)?;
508        let bytes = Bytes::from(buf);
509
510        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
511            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
512
513        Ok(value)
514    }
515}
516
517impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
518    fn clone(&self) -> Self {
519        Self {
520            bytes: self.bytes.clone(),
521            inner: self.inner.clone(),
522        }
523    }
524}
525
526impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
527    /// Returns a struct containing a deserialized value and an original `bytes`.
528    /// It accepts original bytes of inner.
529    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
530        let inner = T::try_from_raw_value(&bytes)?;
531        Ok(Self { bytes, inner })
532    }
533
534    /// Returns a struct containing a deserialized value and an original `bytes`.
535    /// It accepts original bytes of inner.
536    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
537        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
538    }
539
540    pub fn into_inner(self) -> T {
541        self.inner
542    }
543
544    pub fn get_inner_ref(&self) -> &T {
545        &self.inner
546    }
547
548    /// Returns original `bytes`
549    pub fn get_raw_bytes(&self) -> Vec<u8> {
550        self.bytes.to_vec()
551    }
552
553    #[cfg(any(test, feature = "testing"))]
554    pub fn from_inner(inner: T) -> Self {
555        let bytes = serde_json::to_vec(&inner).unwrap();
556
557        Self {
558            bytes: Bytes::from(bytes),
559            inner,
560        }
561    }
562}
563
564impl TableMetadataManager {
565    pub fn new(kv_backend: KvBackendRef) -> Self {
566        TableMetadataManager {
567            table_name_manager: TableNameManager::new(kv_backend.clone()),
568            table_info_manager: TableInfoManager::new(kv_backend.clone()),
569            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
570            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
571            catalog_manager: CatalogManager::new(kv_backend.clone()),
572            schema_manager: SchemaManager::new(kv_backend.clone()),
573            table_route_manager: TableRouteManager::new(kv_backend.clone()),
574            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
575            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
576            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578            kv_backend,
579        }
580    }
581
582    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
583    pub fn new_with_custom_tombstone_prefix(
584        kv_backend: KvBackendRef,
585        tombstone_prefix: &str,
586    ) -> Self {
587        Self {
588            table_name_manager: TableNameManager::new(kv_backend.clone()),
589            table_info_manager: TableInfoManager::new(kv_backend.clone()),
590            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
591            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
592            catalog_manager: CatalogManager::new(kv_backend.clone()),
593            schema_manager: SchemaManager::new(kv_backend.clone()),
594            table_route_manager: TableRouteManager::new(kv_backend.clone()),
595            table_repart_manager: TableRepartManager::new(kv_backend.clone()),
596            tombstone_manager: TombstoneManager::new_with_prefix(
597                kv_backend.clone(),
598                tombstone_prefix,
599            ),
600            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
601            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
602            kv_backend,
603        }
604    }
605
606    pub async fn init(&self) -> Result<()> {
607        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
608
609        self.catalog_manager().create(catalog_name, true).await?;
610
611        let internal_schemas = [
612            DEFAULT_SCHEMA_NAME,
613            INFORMATION_SCHEMA_NAME,
614            DEFAULT_PRIVATE_SCHEMA_NAME,
615        ];
616
617        for schema_name in internal_schemas {
618            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
619
620            self.schema_manager().create(schema_key, None, true).await?;
621        }
622
623        Ok(())
624    }
625
626    pub fn table_name_manager(&self) -> &TableNameManager {
627        &self.table_name_manager
628    }
629
630    pub fn table_info_manager(&self) -> &TableInfoManager {
631        &self.table_info_manager
632    }
633
634    pub fn view_info_manager(&self) -> &ViewInfoManager {
635        &self.view_info_manager
636    }
637
638    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
639        &self.datanode_table_manager
640    }
641
642    pub fn catalog_manager(&self) -> &CatalogManager {
643        &self.catalog_manager
644    }
645
646    pub fn schema_manager(&self) -> &SchemaManager {
647        &self.schema_manager
648    }
649
650    pub fn table_route_manager(&self) -> &TableRouteManager {
651        &self.table_route_manager
652    }
653
654    pub fn table_repart_manager(&self) -> &TableRepartManager {
655        &self.table_repart_manager
656    }
657
658    pub fn topic_name_manager(&self) -> &TopicNameManager {
659        &self.topic_name_manager
660    }
661
662    pub fn topic_region_manager(&self) -> &TopicRegionManager {
663        &self.topic_region_manager
664    }
665
666    pub fn kv_backend(&self) -> &KvBackendRef {
667        &self.kv_backend
668    }
669
670    pub async fn get_full_table_info(
671        &self,
672        table_id: TableId,
673    ) -> Result<(
674        Option<DeserializedValueWithBytes<TableInfoValue>>,
675        Option<DeserializedValueWithBytes<TableRouteValue>>,
676    )> {
677        let table_info_key = TableInfoKey::new(table_id);
678        let table_route_key = TableRouteKey::new(table_id);
679        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
680        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
681
682        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
683        let mut res = self.kv_backend.txn(txn).await?;
684        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
685        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
686        let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
687        if let Some(table_route_value) = &mut table_route_value {
688            self.table_route_manager()
689                .table_route_storage()
690                .remap_table_route(table_route_value)
691                .await?;
692        }
693        Ok((table_info_value, table_route_value))
694    }
695
696    /// Creates metadata for view and returns an error if different metadata exists.
697    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
698    /// Parameters include:
699    /// - `view_info`: the encoded logical plan
700    /// - `table_names`: the resolved fully table names in logical plan
701    /// - `columns`: the view columns
702    /// - `plan_columns`: the original plan columns
703    /// - `definition`: The SQL to create the view
704    ///
705    pub async fn create_view_metadata(
706        &self,
707        view_info: TableInfo,
708        raw_logical_plan: Vec<u8>,
709        table_names: HashSet<TableName>,
710        columns: Vec<String>,
711        plan_columns: Vec<String>,
712        definition: String,
713    ) -> Result<()> {
714        let view_id = view_info.ident.table_id;
715
716        // Creates view name.
717        let view_name = TableNameKey::new(
718            &view_info.catalog_name,
719            &view_info.schema_name,
720            &view_info.name,
721        );
722        let create_table_name_txn = self
723            .table_name_manager()
724            .build_create_txn(&view_name, view_id)?;
725
726        // Creates table info.
727        let table_info_value = TableInfoValue::new(view_info);
728
729        let (create_table_info_txn, on_create_table_info_failure) = self
730            .table_info_manager()
731            .build_create_txn(view_id, &table_info_value)?;
732
733        // Creates view info
734        let view_info_value = ViewInfoValue::new(
735            raw_logical_plan.into(),
736            table_names,
737            columns,
738            plan_columns,
739            definition,
740        );
741        let (create_view_info_txn, on_create_view_info_failure) = self
742            .view_info_manager()
743            .build_create_txn(view_id, &view_info_value)?;
744
745        let txn = Txn::merge_all(vec![
746            create_table_name_txn,
747            create_table_info_txn,
748            create_view_info_txn,
749        ]);
750
751        let mut r = self.kv_backend.txn(txn).await?;
752
753        // Checks whether metadata was already created.
754        if !r.succeeded {
755            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
756            let remote_table_info = on_create_table_info_failure(&mut set)?
757                .context(error::UnexpectedSnafu {
758                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
759                })?
760                .into_inner();
761
762            let remote_view_info = on_create_view_info_failure(&mut set)?
763                .context(error::UnexpectedSnafu {
764                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
765                })?
766                .into_inner();
767
768            let op_name = "the creating view metadata";
769            ensure_values!(remote_table_info, table_info_value, op_name);
770            ensure_values!(remote_view_info, view_info_value, op_name);
771        }
772
773        Ok(())
774    }
775
776    /// Creates metadata for table and returns an error if different metadata exists.
777    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
778    pub async fn create_table_metadata(
779        &self,
780        table_info: TableInfo,
781        table_route_value: TableRouteValue,
782        region_wal_options: RegionWalOptions,
783    ) -> Result<()> {
784        let table_id = table_info.ident.table_id;
785        let engine = table_info.meta.engine.clone();
786
787        // Creates table name.
788        let table_name = TableNameKey::new(
789            &table_info.catalog_name,
790            &table_info.schema_name,
791            &table_info.name,
792        );
793        let create_table_name_txn = self
794            .table_name_manager()
795            .build_create_txn(&table_name, table_id)?;
796
797        let region_options = table_info.to_region_options();
798        // Creates table info.
799        let table_info_value = TableInfoValue::new(table_info);
800        let (create_table_info_txn, on_create_table_info_failure) = self
801            .table_info_manager()
802            .build_create_txn(table_id, &table_info_value)?;
803
804        let (create_table_route_txn, on_create_table_route_failure) = self
805            .table_route_manager()
806            .table_route_storage()
807            .build_create_txn(table_id, &table_route_value)?;
808
809        let create_topic_region_txn = self
810            .topic_region_manager
811            .build_create_txn(table_id, &region_wal_options)?;
812
813        let mut txn = Txn::merge_all(vec![
814            create_table_name_txn,
815            create_table_info_txn,
816            create_table_route_txn,
817            create_topic_region_txn,
818        ]);
819
820        if let TableRouteValue::Physical(x) = &table_route_value {
821            let region_storage_path = table_info_value.region_storage_path();
822            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
823                table_id,
824                &engine,
825                &region_storage_path,
826                region_options,
827                region_wal_options,
828                region_distribution(&x.region_routes),
829            )?;
830            txn = txn.merge(create_datanode_table_txn);
831        }
832
833        let mut r = self.kv_backend.txn(txn).await?;
834
835        // Checks whether metadata was already created.
836        if !r.succeeded {
837            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
838            let remote_table_info = on_create_table_info_failure(&mut set)?
839                .context(error::UnexpectedSnafu {
840                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
841                })?
842                .into_inner();
843
844            let remote_table_route = on_create_table_route_failure(&mut set)?
845                .context(error::UnexpectedSnafu {
846                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
847                })?
848                .into_inner();
849
850            let op_name = "the creating table metadata";
851            ensure_values!(remote_table_info, table_info_value, op_name);
852            ensure_values!(remote_table_route, table_route_value, op_name);
853        }
854
855        Ok(())
856    }
857
858    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
859        // The batch size is max_txn_size / 3 because the size of the `tables_data`
860        // is 3 times the size of the `tables_data`.
861        self.kv_backend.max_txn_ops() / 3
862    }
863
864    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
865    pub async fn create_logical_tables_metadata(
866        &self,
867        tables_data: Vec<(TableInfo, TableRouteValue)>,
868    ) -> Result<()> {
869        let len = tables_data.len();
870        let mut txns = Vec::with_capacity(3 * len);
871        struct OnFailure<F1, R1, F2, R2>
872        where
873            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
874            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
875        {
876            table_info_value: TableInfoValue,
877            on_create_table_info_failure: F1,
878            table_route_value: TableRouteValue,
879            on_create_table_route_failure: F2,
880        }
881        let mut on_failures = Vec::with_capacity(len);
882        for (table_info, table_route_value) in tables_data {
883            let table_id = table_info.ident.table_id;
884
885            // Creates table name.
886            let table_name = TableNameKey::new(
887                &table_info.catalog_name,
888                &table_info.schema_name,
889                &table_info.name,
890            );
891            let create_table_name_txn = self
892                .table_name_manager()
893                .build_create_txn(&table_name, table_id)?;
894            txns.push(create_table_name_txn);
895
896            // Creates table info.
897            let table_info_value = TableInfoValue::new(table_info);
898            let (create_table_info_txn, on_create_table_info_failure) =
899                self.table_info_manager()
900                    .build_create_txn(table_id, &table_info_value)?;
901            txns.push(create_table_info_txn);
902
903            let (create_table_route_txn, on_create_table_route_failure) = self
904                .table_route_manager()
905                .table_route_storage()
906                .build_create_txn(table_id, &table_route_value)?;
907            txns.push(create_table_route_txn);
908
909            on_failures.push(OnFailure {
910                table_info_value,
911                on_create_table_info_failure,
912                table_route_value,
913                on_create_table_route_failure,
914            });
915        }
916
917        let txn = Txn::merge_all(txns);
918        let mut r = self.kv_backend.txn(txn).await?;
919
920        // Checks whether metadata was already created.
921        if !r.succeeded {
922            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
923            for on_failure in on_failures {
924                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
925                    .context(error::UnexpectedSnafu {
926                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
927                    })?
928                    .into_inner();
929
930                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
931                    .context(error::UnexpectedSnafu {
932                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
933                    })?
934                    .into_inner();
935
936                let op_name = "the creating logical tables metadata";
937                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
938                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
939            }
940        }
941
942        Ok(())
943    }
944
945    fn table_metadata_keys(
946        &self,
947        table_id: TableId,
948        table_name: &TableName,
949        table_route_value: &TableRouteValue,
950        region_wal_options: &HashMap<RegionNumber, WalOptions>,
951    ) -> Result<Vec<Vec<u8>>> {
952        // Builds keys
953        let datanode_ids = if table_route_value.is_physical() {
954            region_distribution(table_route_value.region_routes()?)
955                .into_keys()
956                .collect()
957        } else {
958            vec![]
959        };
960        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
961        let table_name = TableNameKey::new(
962            &table_name.catalog_name,
963            &table_name.schema_name,
964            &table_name.table_name,
965        );
966        let table_info_key = TableInfoKey::new(table_id);
967        let table_route_key = TableRouteKey::new(table_id);
968        let table_repart_key = TableRepartKey::new(table_id);
969        let datanode_table_keys = datanode_ids
970            .into_iter()
971            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
972            .collect::<HashSet<_>>();
973        let topic_region_map = self
974            .topic_region_manager
975            .get_topic_region_mapping(table_id, region_wal_options);
976        let topic_region_keys = topic_region_map
977            .iter()
978            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
979            .collect::<Vec<_>>();
980        keys.push(table_name.to_bytes());
981        keys.push(table_info_key.to_bytes());
982        keys.push(table_route_key.to_bytes());
983        keys.push(table_repart_key.to_bytes());
984        for key in &datanode_table_keys {
985            keys.push(key.to_bytes());
986        }
987        for key in topic_region_keys {
988            keys.push(key.to_bytes());
989        }
990        Ok(keys)
991    }
992
993    /// Deletes metadata for table **logically**.
994    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
995    pub async fn delete_table_metadata(
996        &self,
997        table_id: TableId,
998        table_name: &TableName,
999        table_route_value: &TableRouteValue,
1000        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1001    ) -> Result<()> {
1002        let keys =
1003            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1004        self.tombstone_manager.create(keys).await.map(|_| ())
1005    }
1006
1007    /// Lists dropped tables from tombstoned table-name entries.
1008    pub async fn list_dropped_tables(&self) -> Result<Vec<DroppedTableName>> {
1009        let mut stream = self.tombstone_manager.tombstoned_table_names();
1010        let mut dropped_tables = Vec::new();
1011
1012        while let Some(kv) = stream.try_next().await? {
1013            let raw_key = self.tombstone_manager.strip_tombstone_prefix(&kv.key)?;
1014            let table_name = TableNameKey::from_bytes(raw_key)?.into();
1015            let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1016            dropped_tables.push(DroppedTableName {
1017                table_id,
1018                table_name,
1019            });
1020        }
1021
1022        Ok(dropped_tables)
1023    }
1024
1025    /// Gets dropped table metadata by its original full table name.
1026    pub async fn get_dropped_table(
1027        &self,
1028        table_name: &TableName,
1029    ) -> Result<Option<DroppedTableMetadata>> {
1030        let table_name_key = TableNameKey::from(table_name);
1031        let Some(kv) = self
1032            .tombstone_manager
1033            .get(&table_name_key.to_bytes())
1034            .await?
1035        else {
1036            return Ok(None);
1037        };
1038
1039        let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1040        self.get_dropped_table_metadata(table_id, table_name.clone())
1041            .await
1042    }
1043
1044    /// Gets dropped table metadata by table id.
1045    pub async fn get_dropped_table_by_id(
1046        &self,
1047        table_id: TableId,
1048    ) -> Result<Option<DroppedTableMetadata>> {
1049        self.get_dropped_table_metadata(table_id, None).await
1050    }
1051
1052    /// Deletes metadata tombstone for table **permanently**.
1053    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1054    pub async fn delete_table_metadata_tombstone(
1055        &self,
1056        table_id: TableId,
1057        table_name: &TableName,
1058        table_route_value: &TableRouteValue,
1059        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1060    ) -> Result<()> {
1061        let table_metadata_keys =
1062            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1063        self.tombstone_manager
1064            .delete(table_metadata_keys)
1065            .await
1066            .map(|_| ())
1067    }
1068
1069    /// Restores metadata for table.
1070    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1071    pub async fn restore_table_metadata(
1072        &self,
1073        table_id: TableId,
1074        table_name: &TableName,
1075        table_route_value: &TableRouteValue,
1076        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1077    ) -> Result<()> {
1078        let keys =
1079            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1080        self.tombstone_manager.restore(keys).await.map(|_| ())
1081    }
1082
1083    /// Deletes metadata for table **permanently**.
1084    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
1085    pub async fn destroy_table_metadata(
1086        &self,
1087        table_id: TableId,
1088        table_name: &TableName,
1089        table_route_value: &TableRouteValue,
1090        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1091    ) -> Result<()> {
1092        let keys =
1093            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1094        let _ = self
1095            .kv_backend
1096            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1097            .await?;
1098        Ok(())
1099    }
1100
1101    /// Rebuilds dropped table metadata from tombstoned keys.
1102    async fn get_dropped_table_metadata<T>(
1103        &self,
1104        table_id: TableId,
1105        table_name: T,
1106    ) -> Result<Option<DroppedTableMetadata>>
1107    where
1108        T: Into<Option<TableName>>,
1109    {
1110        let table_info_key = TableInfoKey::new(table_id);
1111        let Some(table_info_kv) = self
1112            .tombstone_manager
1113            .get(&table_info_key.to_bytes())
1114            .await?
1115        else {
1116            return Ok(None);
1117        };
1118
1119        let table_info_value = TableInfoValue::try_from_raw_value(&table_info_kv.value)?;
1120        let table_name = table_name
1121            .into()
1122            .unwrap_or_else(|| table_info_value.table_name());
1123
1124        let table_route_key = TableRouteKey::new(table_id);
1125        let table_route_kv = self
1126            .tombstone_manager
1127            .get(&table_route_key.to_bytes())
1128            .await?
1129            .with_context(|| error::UnexpectedSnafu {
1130                err_msg: format!("Missing tombstoned table route metadata for table id {table_id}"),
1131            })?;
1132        let mut table_route_value = TableRouteValue::try_from_raw_value(&table_route_kv.value)?;
1133        self.table_route_manager
1134            .table_route_storage()
1135            .remap_table_route(&mut table_route_value)
1136            .await?;
1137
1138        let region_wal_options = self
1139            .dropped_region_wal_options(table_id, &table_route_value)
1140            .await?;
1141
1142        Ok(Some(DroppedTableMetadata {
1143            table_id,
1144            table_name,
1145            table_info_value,
1146            table_route_value,
1147            region_wal_options,
1148        }))
1149    }
1150
1151    /// Rebuilds region WAL options from tombstoned datanode-table entries.
1152    async fn dropped_region_wal_options(
1153        &self,
1154        table_id: TableId,
1155        table_route_value: &TableRouteValue,
1156    ) -> Result<HashMap<RegionNumber, WalOptions>> {
1157        let mut region_wal_options = HashMap::new();
1158        let datanode_table_keys = region_distribution(table_route_value.region_routes()?)
1159            .into_keys()
1160            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
1161            .collect::<Vec<_>>();
1162        let datanode_table_key_bytes = datanode_table_keys
1163            .iter()
1164            .map(|key| key.to_bytes())
1165            .collect::<Vec<_>>();
1166        let datanode_table_values = self
1167            .tombstone_manager
1168            .batch_get(&datanode_table_key_bytes)
1169            .await?;
1170
1171        for datanode_table_key in datanode_table_keys {
1172            let Some(kv) = datanode_table_values.get(&datanode_table_key.to_bytes()) else {
1173                continue;
1174            };
1175
1176            let datanode_table_value = DatanodeTableValue::try_from_raw_value(&kv.value)?;
1177            for (region_number, wal_options) in &datanode_table_value.region_info.region_wal_options
1178            {
1179                region_wal_options.insert(*region_number, wal_options.clone());
1180            }
1181        }
1182
1183        Ok(region_wal_options)
1184    }
1185
1186    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1187        let mut keys = Vec::with_capacity(3);
1188        let view_name = TableNameKey::new(
1189            &view_name.catalog_name,
1190            &view_name.schema_name,
1191            &view_name.table_name,
1192        );
1193        let table_info_key = TableInfoKey::new(view_id);
1194        let view_info_key = ViewInfoKey::new(view_id);
1195        keys.push(view_name.to_bytes());
1196        keys.push(table_info_key.to_bytes());
1197        keys.push(view_info_key.to_bytes());
1198
1199        Ok(keys)
1200    }
1201
1202    /// Deletes metadata for view **permanently**.
1203    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1204    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1205        let keys = self.view_info_keys(view_id, view_name)?;
1206        let _ = self
1207            .kv_backend
1208            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1209            .await?;
1210        Ok(())
1211    }
1212
1213    /// Renames the table name and returns an error if different metadata exists.
1214    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1215    /// and the new `TableNameKey` MUST be empty.
1216    pub async fn rename_table(
1217        &self,
1218        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1219        new_table_name: String,
1220    ) -> Result<()> {
1221        let current_table_info = &current_table_info_value.table_info;
1222        let table_id = current_table_info.ident.table_id;
1223
1224        let table_name_key = TableNameKey::new(
1225            &current_table_info.catalog_name,
1226            &current_table_info.schema_name,
1227            &current_table_info.name,
1228        );
1229
1230        let new_table_name_key = TableNameKey::new(
1231            &current_table_info.catalog_name,
1232            &current_table_info.schema_name,
1233            &new_table_name,
1234        );
1235
1236        // Updates table name.
1237        let update_table_name_txn = self.table_name_manager().build_update_txn(
1238            &table_name_key,
1239            &new_table_name_key,
1240            table_id,
1241        )?;
1242
1243        let new_table_info_value = current_table_info_value
1244            .inner
1245            .with_update(move |table_info| {
1246                table_info.name = new_table_name;
1247            });
1248
1249        // Updates table info.
1250        let (update_table_info_txn, on_update_table_info_failure) = self
1251            .table_info_manager()
1252            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1253
1254        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1255
1256        let mut r = self.kv_backend.txn(txn).await?;
1257
1258        // Checks whether metadata was already updated.
1259        if !r.succeeded {
1260            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1261            let remote_table_info = on_update_table_info_failure(&mut set)?
1262                .context(error::UnexpectedSnafu {
1263                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1264                })?
1265                .into_inner();
1266
1267            let op_name = "the renaming table metadata";
1268            ensure_values!(remote_table_info, new_table_info_value, op_name);
1269        }
1270
1271        Ok(())
1272    }
1273
1274    /// Updates table info and returns an error if different metadata exists.
1275    /// And cascade-ly update all redundant table options for each region
1276    /// if region_distribution is present.
1277    pub async fn update_table_info(
1278        &self,
1279        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1280        region_distribution: Option<RegionDistribution>,
1281        new_table_info: TableInfo,
1282    ) -> Result<()> {
1283        let table_id = current_table_info_value.table_info.ident.table_id;
1284        let new_table_info_value = current_table_info_value.update(new_table_info);
1285
1286        // Updates table info.
1287        let (update_table_info_txn, on_update_table_info_failure) = self
1288            .table_info_manager()
1289            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1290
1291        let txn = if let Some(region_distribution) = region_distribution {
1292            // region options induced from table info.
1293            let new_region_options = new_table_info_value.table_info.to_region_options();
1294            let update_datanode_table_options_txn = self
1295                .datanode_table_manager
1296                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1297                .await?;
1298            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1299        } else {
1300            update_table_info_txn
1301        };
1302
1303        let mut r = self.kv_backend.txn(txn).await?;
1304        // Checks whether metadata was already updated.
1305        if !r.succeeded {
1306            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1307            let remote_table_info = on_update_table_info_failure(&mut set)?
1308                .context(error::UnexpectedSnafu {
1309                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1310                })?
1311                .into_inner();
1312
1313            let op_name = "the updating table info";
1314            ensure_values!(remote_table_info, new_table_info_value, op_name);
1315        }
1316        Ok(())
1317    }
1318
1319    /// Updates view info and returns an error if different metadata exists.
1320    /// Parameters include:
1321    /// - `view_id`: the view id
1322    /// - `current_view_info_value`: the current view info for CAS checking
1323    /// - `new_view_info`: the encoded logical plan
1324    /// - `table_names`: the resolved fully table names in logical plan
1325    /// - `columns`: the view columns
1326    /// - `plan_columns`: the original plan columns
1327    /// - `definition`: The SQL to create the view
1328    ///
1329    #[allow(clippy::too_many_arguments)]
1330    pub async fn update_view_info(
1331        &self,
1332        view_id: TableId,
1333        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1334        new_view_info: Vec<u8>,
1335        table_names: HashSet<TableName>,
1336        columns: Vec<String>,
1337        plan_columns: Vec<String>,
1338        definition: String,
1339    ) -> Result<()> {
1340        let new_view_info_value = current_view_info_value.update(
1341            new_view_info.into(),
1342            table_names,
1343            columns,
1344            plan_columns,
1345            definition,
1346        );
1347
1348        // Updates view info.
1349        let (update_view_info_txn, on_update_view_info_failure) = self
1350            .view_info_manager()
1351            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1352
1353        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1354
1355        // Checks whether metadata was already updated.
1356        if !r.succeeded {
1357            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1358            let remote_view_info = on_update_view_info_failure(&mut set)?
1359                .context(error::UnexpectedSnafu {
1360                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1361                })?
1362                .into_inner();
1363
1364            let op_name = "the updating view info";
1365            ensure_values!(remote_view_info, new_view_info_value, op_name);
1366        }
1367        Ok(())
1368    }
1369
1370    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1371        self.kv_backend.max_txn_ops()
1372    }
1373
1374    pub async fn batch_update_table_info_values(
1375        &self,
1376        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1377    ) -> Result<()> {
1378        let len = table_info_value_pairs.len();
1379        let mut txns = Vec::with_capacity(len);
1380        struct OnFailure<F, R>
1381        where
1382            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1383        {
1384            table_info_value: TableInfoValue,
1385            on_update_table_info_failure: F,
1386        }
1387        let mut on_failures = Vec::with_capacity(len);
1388
1389        for (table_info_value, new_table_info) in table_info_value_pairs {
1390            let table_id = table_info_value.table_info.ident.table_id;
1391
1392            let new_table_info_value = table_info_value.update(new_table_info);
1393
1394            let (update_table_info_txn, on_update_table_info_failure) =
1395                self.table_info_manager().build_update_txn(
1396                    table_id,
1397                    &table_info_value,
1398                    &new_table_info_value,
1399                )?;
1400
1401            txns.push(update_table_info_txn);
1402
1403            on_failures.push(OnFailure {
1404                table_info_value: new_table_info_value,
1405                on_update_table_info_failure,
1406            });
1407        }
1408
1409        let txn = Txn::merge_all(txns);
1410        let mut r = self.kv_backend.txn(txn).await?;
1411
1412        if !r.succeeded {
1413            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1414            for on_failure in on_failures {
1415                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1416                    .context(error::UnexpectedSnafu {
1417                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1418                    })?
1419                    .into_inner();
1420
1421                let op_name = "the batch updating table info";
1422                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1423            }
1424        }
1425
1426        Ok(())
1427    }
1428
1429    pub async fn update_table_route(
1430        &self,
1431        table_id: TableId,
1432        region_info: RegionInfo,
1433        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1434        new_region_routes: Vec<RegionRoute>,
1435        new_region_options: &HashMap<String, String>,
1436        new_region_wal_options: &RegionWalOptions,
1437    ) -> Result<()> {
1438        // Updates the datanode table key value pairs.
1439        let current_region_distribution =
1440            region_distribution(current_table_route_value.region_routes()?);
1441        let new_region_distribution = region_distribution(&new_region_routes);
1442
1443        let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1444            table_id,
1445            &region_info.region_wal_options,
1446            new_region_wal_options,
1447        )?;
1448        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1449            table_id,
1450            region_info,
1451            current_region_distribution,
1452            new_region_distribution,
1453            new_region_options,
1454            new_region_wal_options,
1455        )?;
1456
1457        // Updates the table_route.
1458        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1459        let (update_table_route_txn, on_update_table_route_failure) = self
1460            .table_route_manager()
1461            .table_route_storage()
1462            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1463
1464        let txn = Txn::merge_all(vec![
1465            update_datanode_table_txn,
1466            update_table_route_txn,
1467            update_topic_region_txn,
1468        ]);
1469
1470        let mut r = self.kv_backend.txn(txn).await?;
1471
1472        // Checks whether metadata was already updated.
1473        if !r.succeeded {
1474            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1475            let remote_table_route = on_update_table_route_failure(&mut set)?
1476                .context(error::UnexpectedSnafu {
1477                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1478                })?
1479                .into_inner();
1480
1481            let op_name = "the updating table route";
1482            ensure_values!(remote_table_route, new_table_route_value, op_name);
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Updates the leader status of the [RegionRoute].
1489    pub async fn update_leader_region_status<F>(
1490        &self,
1491        table_id: TableId,
1492        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1493        next_region_route_status: F,
1494    ) -> Result<()>
1495    where
1496        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1497    {
1498        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1499
1500        let mut updated = 0;
1501        for route in &mut new_region_routes {
1502            if let Some(state) = next_region_route_status(route)
1503                && route.set_leader_state(state)
1504            {
1505                updated += 1;
1506            }
1507        }
1508
1509        if updated == 0 {
1510            warn!("No leader status updated");
1511            return Ok(());
1512        }
1513
1514        // Updates the table_route.
1515        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1516
1517        let (update_table_route_txn, on_update_table_route_failure) = self
1518            .table_route_manager()
1519            .table_route_storage()
1520            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1521
1522        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1523
1524        // Checks whether metadata was already updated.
1525        if !r.succeeded {
1526            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1527            let remote_table_route = on_update_table_route_failure(&mut set)?
1528                .context(error::UnexpectedSnafu {
1529                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1530                })?
1531                .into_inner();
1532
1533            let op_name = "the updating leader region status";
1534            ensure_values!(remote_table_route, new_table_route_value, op_name);
1535        }
1536
1537        Ok(())
1538    }
1539}
1540
1541#[macro_export]
1542macro_rules! impl_metadata_value {
1543    ($($val_ty: ty), *) => {
1544        $(
1545            impl $crate::key::MetadataValue for $val_ty {
1546                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1547                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1548                }
1549
1550                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1551                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1552                }
1553            }
1554        )*
1555    }
1556}
1557
1558macro_rules! impl_metadata_key_get_txn_op {
1559    ($($key: ty), *) => {
1560        $(
1561            impl $crate::key::MetadataKeyGetTxnOp for $key {
1562                /// Returns a [TxnOp] to retrieve the corresponding value
1563                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1564                fn build_get_op(
1565                    &self,
1566                ) -> (
1567                    TxnOp,
1568                    impl for<'a> FnMut(
1569                        &'a mut TxnOpGetResponseSet,
1570                    ) -> Option<Vec<u8>>,
1571                ) {
1572                    let raw_key = self.to_bytes();
1573                    (
1574                        TxnOp::Get(raw_key.clone()),
1575                        TxnOpGetResponseSet::filter(raw_key),
1576                    )
1577                }
1578            }
1579        )*
1580    }
1581}
1582
1583impl_metadata_key_get_txn_op! {
1584    TableNameKey<'_>,
1585    TableInfoKey,
1586    ViewInfoKey,
1587    TableRouteKey,
1588    DatanodeTableKey
1589}
1590
1591#[macro_export]
1592macro_rules! impl_optional_metadata_value {
1593    ($($val_ty: ty), *) => {
1594        $(
1595            impl $val_ty {
1596                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1597                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1598                }
1599
1600                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1601                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1602                }
1603            }
1604        )*
1605    }
1606}
1607
1608impl_metadata_value! {
1609    TableNameValue,
1610    TableInfoValue,
1611    ViewInfoValue,
1612    DatanodeTableValue,
1613    FlowInfoValue,
1614    FlowNameValue,
1615    FlowRouteValue,
1616    TableFlowValue,
1617    NodeAddressValue,
1618    SchemaNameValue,
1619    FlowStateValue,
1620    PoisonValue,
1621    TopicRegionValue
1622}
1623
1624impl_optional_metadata_value! {
1625    CatalogNameValue,
1626    SchemaNameValue
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use std::collections::{BTreeMap, HashMap, HashSet};
1632    use std::sync::Arc;
1633
1634    use bytes::Bytes;
1635    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1636    use common_time::util::current_time_millis;
1637    use common_wal::options::{KafkaWalOptions, WalOptions};
1638    use futures::TryStreamExt;
1639    use store_api::storage::{RegionId, RegionNumber};
1640    use table::metadata::TableInfo;
1641    use table::table_name::TableName;
1642
1643    use super::datanode_table::DatanodeTableKey;
1644    use super::test_utils;
1645    use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1646    use crate::ddl::test_util::create_table::test_create_table_task;
1647    use crate::ddl::utils::region_storage_path;
1648    use crate::error::Result;
1649    use crate::key::datanode_table::RegionInfo;
1650    use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1651    use crate::key::table_info::TableInfoValue;
1652    use crate::key::table_name::TableNameKey;
1653    use crate::key::table_route::TableRouteValue;
1654    use crate::key::topic_region::TopicRegionKey;
1655    use crate::key::{
1656        DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1657        TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1658    };
1659    use crate::kv_backend::KvBackend;
1660    use crate::kv_backend::memory::MemoryKvBackend;
1661    use crate::kv_backend::read_only::ReadOnlyKvBackend;
1662    use crate::peer::Peer;
1663    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1664    use crate::rpc::store::{PutRequest, RangeRequest};
1665    use crate::wal_provider::{RegionWalOptions, WalProvider};
1666
1667    #[test]
1668    fn test_deserialized_value_with_bytes() {
1669        let region_route = new_test_region_route();
1670        let region_routes = vec![region_route.clone()];
1671
1672        let expected_region_routes =
1673            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1674        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1675
1676        // Serialize behaviors:
1677        // The inner field will be ignored.
1678        let value = DeserializedValueWithBytes {
1679            // ignored
1680            inner: TableRouteValue::physical(region_routes.clone()),
1681            bytes: Bytes::from(expected.clone()),
1682        };
1683
1684        let encoded = serde_json::to_vec(&value).unwrap();
1685
1686        // Deserialize behaviors:
1687        // The inner field will be deserialized from the bytes field.
1688        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1689            serde_json::from_slice(&encoded).unwrap();
1690
1691        assert_eq!(decoded.inner, expected_region_routes);
1692        assert_eq!(decoded.bytes, expected);
1693    }
1694
1695    fn new_test_region_route() -> RegionRoute {
1696        new_region_route(1, 2)
1697    }
1698
1699    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1700        RegionRoute {
1701            region: Region {
1702                id: region_id.into(),
1703                name: "r1".to_string(),
1704                attrs: BTreeMap::new(),
1705                partition_expr: Default::default(),
1706            },
1707            leader_peer: Some(Peer::new(datanode, "a2")),
1708            follower_peers: vec![],
1709            leader_state: None,
1710            leader_down_since: None,
1711            write_route_policy: None,
1712        }
1713    }
1714
1715    fn new_test_table_info() -> TableInfo {
1716        test_utils::new_test_table_info(10)
1717    }
1718
1719    fn new_test_table_names() -> HashSet<TableName> {
1720        let mut set = HashSet::new();
1721        set.insert(TableName {
1722            catalog_name: "greptime".to_string(),
1723            schema_name: "public".to_string(),
1724            table_name: "a_table".to_string(),
1725        });
1726        set.insert(TableName {
1727            catalog_name: "greptime".to_string(),
1728            schema_name: "public".to_string(),
1729            table_name: "b_table".to_string(),
1730        });
1731        set
1732    }
1733
1734    async fn create_physical_table_metadata(
1735        table_metadata_manager: &TableMetadataManager,
1736        table_info: TableInfo,
1737        region_routes: Vec<RegionRoute>,
1738        region_wal_options: RegionWalOptions,
1739    ) -> Result<()> {
1740        table_metadata_manager
1741            .create_table_metadata(
1742                table_info,
1743                TableRouteValue::physical(region_routes),
1744                region_wal_options,
1745            )
1746            .await
1747    }
1748
1749    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1750        let topics = (0..2)
1751            .map(|i| format!("greptimedb_topic{}", i))
1752            .collect::<Vec<_>>();
1753        let wal_options = topics
1754            .iter()
1755            .map(|topic| WalOptions::Kafka(KafkaWalOptions::new(topic.clone())))
1756            .collect::<Vec<_>>();
1757
1758        (0..16)
1759            .enumerate()
1760            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1761            .collect()
1762    }
1763
1764    fn create_mixed_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1765        HashMap::from([
1766            (
1767                0,
1768                WalOptions::Kafka(KafkaWalOptions::new("greptimedb_topic0".to_string())),
1769            ),
1770            (1, WalOptions::RaftEngine),
1771            (2, WalOptions::Noop),
1772            (
1773                3,
1774                WalOptions::Kafka(KafkaWalOptions::new("greptimedb_topic1".to_string())),
1775            ),
1776        ])
1777    }
1778
1779    #[tokio::test]
1780    async fn test_raft_engine_topic_region_map() {
1781        let mem_kv = Arc::new(MemoryKvBackend::default());
1782        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1783        let region_route = new_test_region_route();
1784        let region_routes = &vec![region_route.clone()];
1785        let table_info = new_test_table_info();
1786        let wal_provider = WalProvider::RaftEngine;
1787        let regions: Vec<_> = (0..16).collect();
1788        let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
1789        create_physical_table_metadata(
1790            &table_metadata_manager,
1791            table_info.clone(),
1792            region_routes.clone(),
1793            region_wal_options.clone(),
1794        )
1795        .await
1796        .unwrap();
1797
1798        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1799        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1800        let resp = mem_kv.range(range_req).await.unwrap();
1801        // Should be empty because the topic region map is empty for raft engine.
1802        assert!(resp.kvs.is_empty());
1803    }
1804
1805    #[tokio::test]
1806    async fn test_create_table_metadata() {
1807        let mem_kv = Arc::new(MemoryKvBackend::default());
1808        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1809        let region_route = new_test_region_route();
1810        let region_routes = &vec![region_route.clone()];
1811        let table_info = new_test_table_info();
1812        let region_wal_options = create_mock_region_wal_options();
1813
1814        // creates metadata.
1815        create_physical_table_metadata(
1816            &table_metadata_manager,
1817            table_info.clone(),
1818            region_routes.clone(),
1819            region_wal_options.clone(),
1820        )
1821        .await
1822        .unwrap();
1823
1824        // if metadata was already created, it should be ok.
1825        assert!(
1826            create_physical_table_metadata(
1827                &table_metadata_manager,
1828                table_info.clone(),
1829                region_routes.clone(),
1830                region_wal_options.clone(),
1831            )
1832            .await
1833            .is_ok()
1834        );
1835
1836        let mut modified_region_routes = region_routes.clone();
1837        modified_region_routes.push(region_route.clone());
1838        // if remote metadata was exists, it should return an error.
1839        assert!(
1840            create_physical_table_metadata(
1841                &table_metadata_manager,
1842                table_info.clone(),
1843                modified_region_routes,
1844                region_wal_options.clone(),
1845            )
1846            .await
1847            .is_err()
1848        );
1849
1850        let (remote_table_info, remote_table_route) = table_metadata_manager
1851            .get_full_table_info(10)
1852            .await
1853            .unwrap();
1854
1855        assert_eq!(
1856            remote_table_info.unwrap().into_inner().table_info,
1857            table_info
1858        );
1859        assert_eq!(
1860            remote_table_route
1861                .unwrap()
1862                .into_inner()
1863                .region_routes()
1864                .unwrap(),
1865            region_routes
1866        );
1867
1868        for i in 0..2 {
1869            let region_number = i as u32;
1870            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1871            let topic = format!("greptimedb_topic{}", i);
1872            let regions = table_metadata_manager
1873                .topic_region_manager
1874                .regions(&topic)
1875                .await
1876                .unwrap()
1877                .into_keys()
1878                .collect::<Vec<_>>();
1879            assert_eq!(regions.len(), 8);
1880            assert!(regions.contains(&region_id));
1881        }
1882    }
1883
1884    #[tokio::test]
1885    async fn test_get_full_table_info_remaps_route_address() {
1886        let mem_kv = Arc::new(MemoryKvBackend::default());
1887        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1888
1889        let mut region_route = new_test_region_route();
1890        region_route.follower_peers = vec![Peer::empty(3)];
1891        let region_routes = vec![region_route];
1892        let table_info = new_test_table_info();
1893        let table_id = table_info.ident.table_id;
1894
1895        create_physical_table_metadata(
1896            &table_metadata_manager,
1897            table_info,
1898            region_routes,
1899            HashMap::new(),
1900        )
1901        .await
1902        .unwrap();
1903
1904        mem_kv
1905            .put(PutRequest {
1906                key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1907                value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1908                    .try_as_raw_value()
1909                    .unwrap(),
1910                ..Default::default()
1911            })
1912            .await
1913            .unwrap();
1914        mem_kv
1915            .put(PutRequest {
1916                key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1917                value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1918                    .try_as_raw_value()
1919                    .unwrap(),
1920                ..Default::default()
1921            })
1922            .await
1923            .unwrap();
1924
1925        let (_, table_route) = table_metadata_manager
1926            .get_full_table_info(table_id)
1927            .await
1928            .unwrap();
1929        let table_route = table_route.unwrap().into_inner();
1930        let region_routes = table_route.region_routes().unwrap();
1931
1932        assert_eq!(
1933            region_routes[0].leader_peer.as_ref().unwrap().addr,
1934            "new-a2"
1935        );
1936        assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1937    }
1938
1939    #[tokio::test]
1940    async fn test_get_full_table_info_with_read_only_kv_backend() {
1941        let mem_kv = Arc::new(MemoryKvBackend::default());
1942        let writable_manager = TableMetadataManager::new(mem_kv.clone());
1943
1944        let region_routes = vec![new_test_region_route()];
1945        let table_info = new_test_table_info();
1946        let table_id = table_info.ident.table_id;
1947
1948        create_physical_table_metadata(
1949            &writable_manager,
1950            table_info.clone(),
1951            region_routes.clone(),
1952            HashMap::new(),
1953        )
1954        .await
1955        .unwrap();
1956
1957        let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
1958        let read_only_manager = TableMetadataManager::new(read_only_kv);
1959
1960        let (remote_table_info, remote_table_route) = read_only_manager
1961            .get_full_table_info(table_id)
1962            .await
1963            .unwrap();
1964
1965        assert_eq!(
1966            remote_table_info.unwrap().into_inner().table_info,
1967            table_info
1968        );
1969        assert_eq!(
1970            remote_table_route
1971                .unwrap()
1972                .into_inner()
1973                .region_routes()
1974                .unwrap(),
1975            &region_routes
1976        );
1977    }
1978
1979    #[tokio::test]
1980    async fn test_create_logic_tables_metadata() {
1981        let mem_kv = Arc::new(MemoryKvBackend::default());
1982        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1983        let region_route = new_test_region_route();
1984        let region_routes = vec![region_route.clone()];
1985        let table_info = new_test_table_info();
1986        let table_id = table_info.ident.table_id;
1987        let table_route_value = TableRouteValue::physical(region_routes.clone());
1988
1989        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1990        // creates metadata.
1991        table_metadata_manager
1992            .create_logical_tables_metadata(tables_data.clone())
1993            .await
1994            .unwrap();
1995
1996        // if metadata was already created, it should be ok.
1997        assert!(
1998            table_metadata_manager
1999                .create_logical_tables_metadata(tables_data)
2000                .await
2001                .is_ok()
2002        );
2003
2004        let mut modified_region_routes = region_routes.clone();
2005        modified_region_routes.push(new_region_route(2, 3));
2006        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
2007        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
2008        // if remote metadata was exists, it should return an error.
2009        assert!(
2010            table_metadata_manager
2011                .create_logical_tables_metadata(modified_tables_data)
2012                .await
2013                .is_err()
2014        );
2015
2016        let (remote_table_info, remote_table_route) = table_metadata_manager
2017            .get_full_table_info(table_id)
2018            .await
2019            .unwrap();
2020
2021        assert_eq!(
2022            remote_table_info.unwrap().into_inner().table_info,
2023            table_info
2024        );
2025        assert_eq!(
2026            remote_table_route
2027                .unwrap()
2028                .into_inner()
2029                .region_routes()
2030                .unwrap(),
2031            &region_routes
2032        );
2033    }
2034
2035    #[tokio::test]
2036    async fn test_create_many_logical_tables_metadata() {
2037        let kv_backend = Arc::new(MemoryKvBackend::default());
2038        let table_metadata_manager = TableMetadataManager::new(kv_backend);
2039
2040        let mut tables_data = vec![];
2041        for i in 0..128 {
2042            let table_id = i + 1;
2043            let regin_number = table_id * 3;
2044            let region_id = RegionId::new(table_id, regin_number);
2045            let region_route = new_region_route(region_id.as_u64(), 2);
2046            let region_routes = vec![region_route.clone()];
2047            let table_info = test_utils::new_test_table_info_with_name(
2048                table_id,
2049                &format!("my_table_{}", table_id),
2050            );
2051            let table_route_value = TableRouteValue::physical(region_routes.clone());
2052
2053            tables_data.push((table_info, table_route_value));
2054        }
2055
2056        // creates metadata.
2057        table_metadata_manager
2058            .create_logical_tables_metadata(tables_data)
2059            .await
2060            .unwrap();
2061    }
2062
2063    #[tokio::test]
2064    async fn test_delete_table_metadata() {
2065        let mem_kv = Arc::new(MemoryKvBackend::default());
2066        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2067        let region_route = new_test_region_route();
2068        let region_routes = &vec![region_route.clone()];
2069        let table_info = new_test_table_info();
2070        let table_id = table_info.ident.table_id;
2071        let datanode_id = 2;
2072        let region_wal_options = create_mock_region_wal_options();
2073        let serialized_region_wal_options = region_wal_options.clone();
2074
2075        // creates metadata.
2076        create_physical_table_metadata(
2077            &table_metadata_manager,
2078            table_info.clone(),
2079            region_routes.clone(),
2080            serialized_region_wal_options,
2081        )
2082        .await
2083        .unwrap();
2084
2085        let table_name = TableName::new(
2086            table_info.catalog_name,
2087            table_info.schema_name,
2088            table_info.name,
2089        );
2090        let table_route_value = &TableRouteValue::physical(region_routes.clone());
2091        // deletes metadata.
2092        table_metadata_manager
2093            .delete_table_metadata(
2094                table_id,
2095                &table_name,
2096                table_route_value,
2097                &region_wal_options,
2098            )
2099            .await
2100            .unwrap();
2101        // Should be ignored.
2102        table_metadata_manager
2103            .delete_table_metadata(
2104                table_id,
2105                &table_name,
2106                table_route_value,
2107                &region_wal_options,
2108            )
2109            .await
2110            .unwrap();
2111        assert!(
2112            table_metadata_manager
2113                .table_info_manager()
2114                .get(table_id)
2115                .await
2116                .unwrap()
2117                .is_none()
2118        );
2119        assert!(
2120            table_metadata_manager
2121                .table_route_manager()
2122                .table_route_storage()
2123                .get(table_id)
2124                .await
2125                .unwrap()
2126                .is_none()
2127        );
2128        assert!(
2129            table_metadata_manager
2130                .datanode_table_manager()
2131                .tables(datanode_id)
2132                .try_collect::<Vec<_>>()
2133                .await
2134                .unwrap()
2135                .is_empty()
2136        );
2137        // Checks removed values
2138        let table_info = table_metadata_manager
2139            .table_info_manager()
2140            .get(table_id)
2141            .await
2142            .unwrap();
2143        assert!(table_info.is_none());
2144        let table_route = table_metadata_manager
2145            .table_route_manager()
2146            .table_route_storage()
2147            .get(table_id)
2148            .await
2149            .unwrap();
2150        assert!(table_route.is_none());
2151        // Logical delete removes the topic region mapping as well.
2152        let regions = table_metadata_manager
2153            .topic_region_manager
2154            .regions("greptimedb_topic0")
2155            .await
2156            .unwrap();
2157        assert_eq!(regions.len(), 0);
2158        let regions = table_metadata_manager
2159            .topic_region_manager
2160            .regions("greptimedb_topic1")
2161            .await
2162            .unwrap();
2163        assert_eq!(regions.len(), 0);
2164    }
2165
2166    #[tokio::test]
2167    async fn test_rename_table() {
2168        let mem_kv = Arc::new(MemoryKvBackend::default());
2169        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2170        let region_route = new_test_region_route();
2171        let region_routes = vec![region_route.clone()];
2172        let table_info = new_test_table_info();
2173        let table_id = table_info.ident.table_id;
2174        // creates metadata.
2175        create_physical_table_metadata(
2176            &table_metadata_manager,
2177            table_info.clone(),
2178            region_routes.clone(),
2179            HashMap::new(),
2180        )
2181        .await
2182        .unwrap();
2183
2184        let new_table_name = "another_name".to_string();
2185        let table_info_value =
2186            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2187
2188        table_metadata_manager
2189            .rename_table(&table_info_value, new_table_name.clone())
2190            .await
2191            .unwrap();
2192        // if remote metadata was updated, it should be ok.
2193        table_metadata_manager
2194            .rename_table(&table_info_value, new_table_name.clone())
2195            .await
2196            .unwrap();
2197        let mut modified_table_info = table_info.clone();
2198        modified_table_info.name = "hi".to_string();
2199        let modified_table_info_value =
2200            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2201        // if the table_info_value is wrong, it should return an error.
2202        // The ABA problem.
2203        assert!(
2204            table_metadata_manager
2205                .rename_table(&modified_table_info_value, new_table_name.clone())
2206                .await
2207                .is_err()
2208        );
2209
2210        let old_table_name = TableNameKey::new(
2211            &table_info.catalog_name,
2212            &table_info.schema_name,
2213            &table_info.name,
2214        );
2215        let new_table_name = TableNameKey::new(
2216            &table_info.catalog_name,
2217            &table_info.schema_name,
2218            &new_table_name,
2219        );
2220
2221        assert!(
2222            table_metadata_manager
2223                .table_name_manager()
2224                .get(old_table_name)
2225                .await
2226                .unwrap()
2227                .is_none()
2228        );
2229
2230        assert_eq!(
2231            table_metadata_manager
2232                .table_name_manager()
2233                .get(new_table_name)
2234                .await
2235                .unwrap()
2236                .unwrap()
2237                .table_id(),
2238            table_id
2239        );
2240    }
2241
2242    #[tokio::test]
2243    async fn test_update_table_info() {
2244        let mem_kv = Arc::new(MemoryKvBackend::default());
2245        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2246        let region_route = new_test_region_route();
2247        let region_routes = vec![region_route.clone()];
2248        let table_info = new_test_table_info();
2249        let table_id = table_info.ident.table_id;
2250        // creates metadata.
2251        create_physical_table_metadata(
2252            &table_metadata_manager,
2253            table_info.clone(),
2254            region_routes.clone(),
2255            HashMap::new(),
2256        )
2257        .await
2258        .unwrap();
2259
2260        let mut new_table_info = table_info.clone();
2261        new_table_info.name = "hi".to_string();
2262        let current_table_info_value =
2263            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2264        // should be ok.
2265        table_metadata_manager
2266            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2267            .await
2268            .unwrap();
2269        // if table info was updated, it should be ok.
2270        table_metadata_manager
2271            .update_table_info(&current_table_info_value, None, new_table_info.clone())
2272            .await
2273            .unwrap();
2274
2275        // updated table_info should equal the `new_table_info`
2276        let updated_table_info = table_metadata_manager
2277            .table_info_manager()
2278            .get(table_id)
2279            .await
2280            .unwrap()
2281            .unwrap()
2282            .into_inner();
2283        assert_eq!(updated_table_info.table_info, new_table_info);
2284
2285        let mut wrong_table_info = table_info.clone();
2286        wrong_table_info.name = "wrong".to_string();
2287        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2288            current_table_info_value.update(wrong_table_info),
2289        );
2290        // if the current_table_info_value is wrong, it should return an error.
2291        // The ABA problem.
2292        assert!(
2293            table_metadata_manager
2294                .update_table_info(&wrong_table_info_value, None, new_table_info)
2295                .await
2296                .is_err()
2297        )
2298    }
2299
2300    #[tokio::test]
2301    async fn test_update_table_leader_region_status() {
2302        let mem_kv = Arc::new(MemoryKvBackend::default());
2303        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2304        let datanode = 1;
2305        let region_routes = vec![
2306            RegionRoute {
2307                region: Region {
2308                    id: 1.into(),
2309                    name: "r1".to_string(),
2310                    attrs: BTreeMap::new(),
2311                    partition_expr: Default::default(),
2312                },
2313                leader_peer: Some(Peer::new(datanode, "a2")),
2314                leader_state: Some(LeaderState::Downgrading),
2315                follower_peers: vec![],
2316                leader_down_since: Some(current_time_millis()),
2317                write_route_policy: None,
2318            },
2319            RegionRoute {
2320                region: Region {
2321                    id: 2.into(),
2322                    name: "r2".to_string(),
2323                    attrs: BTreeMap::new(),
2324                    partition_expr: Default::default(),
2325                },
2326                leader_peer: Some(Peer::new(datanode, "a1")),
2327                leader_state: None,
2328                follower_peers: vec![],
2329                leader_down_since: None,
2330                write_route_policy: None,
2331            },
2332        ];
2333        let table_info = new_test_table_info();
2334        let table_id = table_info.ident.table_id;
2335        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2336            TableRouteValue::physical(region_routes.clone()),
2337        );
2338
2339        // creates metadata.
2340        create_physical_table_metadata(
2341            &table_metadata_manager,
2342            table_info.clone(),
2343            region_routes.clone(),
2344            HashMap::new(),
2345        )
2346        .await
2347        .unwrap();
2348
2349        table_metadata_manager
2350            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2351                if region_route.leader_state.is_some() {
2352                    None
2353                } else {
2354                    Some(Some(LeaderState::Downgrading))
2355                }
2356            })
2357            .await
2358            .unwrap();
2359
2360        let updated_route_value = table_metadata_manager
2361            .table_route_manager()
2362            .table_route_storage()
2363            .get(table_id)
2364            .await
2365            .unwrap()
2366            .unwrap();
2367
2368        assert_eq!(
2369            updated_route_value.region_routes().unwrap()[0].leader_state,
2370            Some(LeaderState::Downgrading)
2371        );
2372
2373        assert!(
2374            updated_route_value.region_routes().unwrap()[0]
2375                .leader_down_since
2376                .is_some()
2377        );
2378
2379        assert_eq!(
2380            updated_route_value.region_routes().unwrap()[1].leader_state,
2381            Some(LeaderState::Downgrading)
2382        );
2383        assert!(
2384            updated_route_value.region_routes().unwrap()[1]
2385                .leader_down_since
2386                .is_some()
2387        );
2388    }
2389
2390    async fn assert_datanode_table(
2391        table_metadata_manager: &TableMetadataManager,
2392        table_id: u32,
2393        region_routes: &[RegionRoute],
2394    ) {
2395        let region_distribution = region_distribution(region_routes);
2396        for (datanode, regions) in region_distribution {
2397            let got = table_metadata_manager
2398                .datanode_table_manager()
2399                .get(&DatanodeTableKey::new(datanode, table_id))
2400                .await
2401                .unwrap()
2402                .unwrap();
2403
2404            assert_eq!(got.regions, regions.leader_regions);
2405            assert_eq!(got.follower_regions, regions.follower_regions);
2406        }
2407    }
2408
2409    #[tokio::test]
2410    async fn test_update_table_route() {
2411        let mem_kv = Arc::new(MemoryKvBackend::default());
2412        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2413        let region_route = new_test_region_route();
2414        let region_routes = vec![region_route.clone()];
2415        let table_info = new_test_table_info();
2416        let table_id = table_info.ident.table_id;
2417        let engine = table_info.meta.engine.as_str();
2418        let region_storage_path =
2419            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2420        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2421            TableRouteValue::physical(region_routes.clone()),
2422        );
2423
2424        // creates metadata.
2425        create_physical_table_metadata(
2426            &table_metadata_manager,
2427            table_info.clone(),
2428            region_routes.clone(),
2429            HashMap::new(),
2430        )
2431        .await
2432        .unwrap();
2433
2434        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2435        let new_region_routes = vec![
2436            new_region_route(1, 1),
2437            new_region_route(2, 2),
2438            new_region_route(3, 3),
2439        ];
2440        // it should be ok.
2441        table_metadata_manager
2442            .update_table_route(
2443                table_id,
2444                RegionInfo {
2445                    engine: engine.to_string(),
2446                    region_storage_path: region_storage_path.clone(),
2447                    region_options: HashMap::new(),
2448                    region_wal_options: HashMap::new(),
2449                },
2450                &current_table_route_value,
2451                new_region_routes.clone(),
2452                &HashMap::new(),
2453                &HashMap::new(),
2454            )
2455            .await
2456            .unwrap();
2457        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2458
2459        // if the table route was updated. it should be ok.
2460        table_metadata_manager
2461            .update_table_route(
2462                table_id,
2463                RegionInfo {
2464                    engine: engine.to_string(),
2465                    region_storage_path: region_storage_path.clone(),
2466                    region_options: HashMap::new(),
2467                    region_wal_options: HashMap::new(),
2468                },
2469                &current_table_route_value,
2470                new_region_routes.clone(),
2471                &HashMap::new(),
2472                &HashMap::new(),
2473            )
2474            .await
2475            .unwrap();
2476
2477        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2478            current_table_route_value
2479                .inner
2480                .update(new_region_routes.clone())
2481                .unwrap(),
2482        );
2483        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2484        // it should be ok.
2485        table_metadata_manager
2486            .update_table_route(
2487                table_id,
2488                RegionInfo {
2489                    engine: engine.to_string(),
2490                    region_storage_path: region_storage_path.clone(),
2491                    region_options: HashMap::new(),
2492                    region_wal_options: HashMap::new(),
2493                },
2494                &current_table_route_value,
2495                new_region_routes.clone(),
2496                &HashMap::new(),
2497                &HashMap::new(),
2498            )
2499            .await
2500            .unwrap();
2501        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2502
2503        // if the current_table_route_value is wrong, it should return an error.
2504        // The ABA problem.
2505        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2506            current_table_route_value
2507                .update(vec![
2508                    new_region_route(1, 1),
2509                    new_region_route(2, 2),
2510                    new_region_route(3, 3),
2511                    new_region_route(4, 4),
2512                ])
2513                .unwrap(),
2514        );
2515        assert!(
2516            table_metadata_manager
2517                .update_table_route(
2518                    table_id,
2519                    RegionInfo {
2520                        engine: engine.to_string(),
2521                        region_storage_path: region_storage_path.clone(),
2522                        region_options: HashMap::new(),
2523                        region_wal_options: HashMap::new(),
2524                    },
2525                    &wrong_table_route_value,
2526                    new_region_routes,
2527                    &HashMap::new(),
2528                    &HashMap::new(),
2529                )
2530                .await
2531                .is_err()
2532        );
2533    }
2534
2535    #[tokio::test]
2536    async fn test_update_table_route_with_topic_region_mapping() {
2537        let mem_kv = Arc::new(MemoryKvBackend::default());
2538        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2539        let region_route = new_test_region_route();
2540        let region_routes = vec![region_route.clone()];
2541        let table_info = new_test_table_info();
2542        let table_id = table_info.ident.table_id;
2543        let engine = table_info.meta.engine.as_str();
2544        let region_storage_path =
2545            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2546
2547        // Create initial metadata with Kafka WAL options
2548        let old_region_wal_options: RegionWalOptions = vec![
2549            (
2550                1,
2551                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
2552            ),
2553            (
2554                2,
2555                WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())),
2556            ),
2557        ]
2558        .into_iter()
2559        .collect();
2560
2561        create_physical_table_metadata(
2562            &table_metadata_manager,
2563            table_info.clone(),
2564            region_routes.clone(),
2565            old_region_wal_options.clone(),
2566        )
2567        .await
2568        .unwrap();
2569
2570        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2571            TableRouteValue::physical(region_routes.clone()),
2572        );
2573
2574        // Verify initial topic region mappings exist
2575        let region_id_1 = RegionId::new(table_id, 1);
2576        let region_id_2 = RegionId::new(table_id, 2);
2577        let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2578        let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2579        assert!(
2580            table_metadata_manager
2581                .topic_region_manager
2582                .get(topic_1_key.clone())
2583                .await
2584                .unwrap()
2585                .is_some()
2586        );
2587        assert!(
2588            table_metadata_manager
2589                .topic_region_manager
2590                .get(topic_2_key.clone())
2591                .await
2592                .unwrap()
2593                .is_some()
2594        );
2595
2596        // Test 1: Add new region with new topic
2597        let new_region_routes = vec![
2598            new_region_route(1, 1),
2599            new_region_route(2, 2),
2600            new_region_route(3, 3), // New region
2601        ];
2602        let new_region_wal_options: RegionWalOptions = vec![
2603            (
2604                1,
2605                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), // Unchanged
2606            ),
2607            (
2608                2,
2609                WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())), // Unchanged
2610            ),
2611            (
2612                3,
2613                WalOptions::Kafka(KafkaWalOptions::new("topic_3".to_string())), // New topic
2614            ),
2615        ]
2616        .into_iter()
2617        .collect();
2618        let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2619            current_table_route_value
2620                .inner
2621                .update(new_region_routes.clone())
2622                .unwrap(),
2623        );
2624        table_metadata_manager
2625            .update_table_route(
2626                table_id,
2627                RegionInfo {
2628                    engine: engine.to_string(),
2629                    region_storage_path: region_storage_path.clone(),
2630                    region_options: HashMap::new(),
2631                    region_wal_options: old_region_wal_options.clone(),
2632                },
2633                &current_table_route_value,
2634                new_region_routes.clone(),
2635                &HashMap::new(),
2636                &new_region_wal_options,
2637            )
2638            .await
2639            .unwrap();
2640        // Verify new topic region mapping was created
2641        let region_id_3 = RegionId::new(table_id, 3);
2642        let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2643        assert!(
2644            table_metadata_manager
2645                .topic_region_manager
2646                .get(topic_3_key)
2647                .await
2648                .unwrap()
2649                .is_some()
2650        );
2651        // Test 2: Remove a region and change topic for another
2652        let newer_region_routes = vec![
2653            new_region_route(1, 1),
2654            // Region 2 removed
2655            // Region 3 now has different topic
2656        ];
2657        let newer_region_wal_options: RegionWalOptions = vec![
2658            (
2659                1,
2660                WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), // Unchanged
2661            ),
2662            (
2663                3,
2664                WalOptions::Kafka(KafkaWalOptions::new("topic_3_new".to_string())), // Changed topic
2665            ),
2666        ]
2667        .into_iter()
2668        .collect();
2669        table_metadata_manager
2670            .update_table_route(
2671                table_id,
2672                RegionInfo {
2673                    engine: engine.to_string(),
2674                    region_storage_path: region_storage_path.clone(),
2675                    region_options: HashMap::new(),
2676                    region_wal_options: new_region_wal_options.clone(),
2677                },
2678                &current_table_route_value_updated,
2679                newer_region_routes.clone(),
2680                &HashMap::new(),
2681                &newer_region_wal_options,
2682            )
2683            .await
2684            .unwrap();
2685        // Verify region 2 mapping was deleted
2686        let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2687        assert!(
2688            table_metadata_manager
2689                .topic_region_manager
2690                .get(topic_2_key_new)
2691                .await
2692                .unwrap()
2693                .is_none()
2694        );
2695        // Verify region 3 old topic mapping was deleted
2696        let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2697        assert!(
2698            table_metadata_manager
2699                .topic_region_manager
2700                .get(topic_3_key_old)
2701                .await
2702                .unwrap()
2703                .is_none()
2704        );
2705        // Verify region 3 new topic mapping was created
2706        let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2707        assert!(
2708            table_metadata_manager
2709                .topic_region_manager
2710                .get(topic_3_key_new)
2711                .await
2712                .unwrap()
2713                .is_some()
2714        );
2715        // Verify region 1 mapping still exists (unchanged)
2716        assert!(
2717            table_metadata_manager
2718                .topic_region_manager
2719                .get(topic_1_key)
2720                .await
2721                .unwrap()
2722                .is_some()
2723        );
2724    }
2725
2726    #[tokio::test]
2727    async fn test_destroy_table_metadata() {
2728        let mem_kv = Arc::new(MemoryKvBackend::default());
2729        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2730        let table_id = 1025;
2731        let table_name = "foo";
2732        let task = test_create_table_task(table_name, table_id);
2733        let options = create_mixed_region_wal_options();
2734        let serialized_options = options.clone();
2735        table_metadata_manager
2736            .create_table_metadata(
2737                task.table_info,
2738                TableRouteValue::physical(vec![
2739                    RegionRoute {
2740                        region: Region::new_test(RegionId::new(table_id, 1)),
2741                        leader_peer: Some(Peer::empty(1)),
2742                        follower_peers: vec![Peer::empty(5)],
2743                        leader_state: None,
2744                        leader_down_since: None,
2745                        write_route_policy: None,
2746                    },
2747                    RegionRoute {
2748                        region: Region::new_test(RegionId::new(table_id, 2)),
2749                        leader_peer: Some(Peer::empty(2)),
2750                        follower_peers: vec![Peer::empty(4)],
2751                        leader_state: None,
2752                        leader_down_since: None,
2753                        write_route_policy: None,
2754                    },
2755                    RegionRoute {
2756                        region: Region::new_test(RegionId::new(table_id, 3)),
2757                        leader_peer: Some(Peer::empty(3)),
2758                        follower_peers: vec![],
2759                        leader_state: None,
2760                        leader_down_since: None,
2761                        write_route_policy: None,
2762                    },
2763                ]),
2764                serialized_options,
2765            )
2766            .await
2767            .unwrap();
2768        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2769        let table_route_value = table_metadata_manager
2770            .table_route_manager
2771            .table_route_storage()
2772            .get_with_raw_bytes(table_id)
2773            .await
2774            .unwrap()
2775            .unwrap();
2776        table_metadata_manager
2777            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2778            .await
2779            .unwrap();
2780        assert!(mem_kv.is_empty());
2781    }
2782
2783    #[tokio::test]
2784    async fn test_restore_table_metadata() {
2785        let mem_kv = Arc::new(MemoryKvBackend::default());
2786        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2787        let table_id = 1025;
2788        let table_name = "foo";
2789        let task = test_create_table_task(table_name, table_id);
2790        let options = create_mixed_region_wal_options();
2791        let serialized_options = options.clone();
2792        table_metadata_manager
2793            .create_table_metadata(
2794                task.table_info,
2795                TableRouteValue::physical(vec![
2796                    RegionRoute {
2797                        region: Region::new_test(RegionId::new(table_id, 1)),
2798                        leader_peer: Some(Peer::empty(1)),
2799                        follower_peers: vec![Peer::empty(5)],
2800                        leader_state: None,
2801                        leader_down_since: None,
2802                        write_route_policy: None,
2803                    },
2804                    RegionRoute {
2805                        region: Region::new_test(RegionId::new(table_id, 2)),
2806                        leader_peer: Some(Peer::empty(2)),
2807                        follower_peers: vec![Peer::empty(4)],
2808                        leader_state: None,
2809                        leader_down_since: None,
2810                        write_route_policy: None,
2811                    },
2812                    RegionRoute {
2813                        region: Region::new_test(RegionId::new(table_id, 3)),
2814                        leader_peer: Some(Peer::empty(3)),
2815                        follower_peers: vec![],
2816                        leader_state: None,
2817                        leader_down_since: None,
2818                        write_route_policy: None,
2819                    },
2820                ]),
2821                serialized_options,
2822            )
2823            .await
2824            .unwrap();
2825        let expected_result = mem_kv.dump();
2826        let table_route_value = table_metadata_manager
2827            .table_route_manager
2828            .table_route_storage()
2829            .get_with_raw_bytes(table_id)
2830            .await
2831            .unwrap()
2832            .unwrap();
2833        let region_routes = table_route_value.region_routes().unwrap();
2834        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2835        let table_route_value = TableRouteValue::physical(region_routes.clone());
2836        table_metadata_manager
2837            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2838            .await
2839            .unwrap();
2840        table_metadata_manager
2841            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2842            .await
2843            .unwrap();
2844        let kvs = mem_kv.dump();
2845        assert_eq!(kvs, expected_result);
2846        // Should be ignored.
2847        table_metadata_manager
2848            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2849            .await
2850            .unwrap();
2851        let kvs = mem_kv.dump();
2852        assert_eq!(kvs, expected_result);
2853    }
2854
2855    #[tokio::test]
2856    async fn test_dropped_table_metadata_enumeration_and_lookup() {
2857        let mem_kv = Arc::new(MemoryKvBackend::default());
2858        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2859        let table_id = 1025;
2860        let table_name = "foo";
2861        let task = test_create_table_task(table_name, table_id);
2862        let table_info = task.table_info.clone();
2863        let options = create_mixed_region_wal_options();
2864        let serialized_options = options.clone();
2865        table_metadata_manager
2866            .create_table_metadata(
2867                table_info.clone(),
2868                TableRouteValue::physical(vec![
2869                    RegionRoute {
2870                        region: Region::new_test(RegionId::new(table_id, 1)),
2871                        leader_peer: Some(Peer::empty(1)),
2872                        follower_peers: vec![Peer::empty(5)],
2873                        leader_state: None,
2874                        leader_down_since: None,
2875                        write_route_policy: None,
2876                    },
2877                    RegionRoute {
2878                        region: Region::new_test(RegionId::new(table_id, 2)),
2879                        leader_peer: Some(Peer::empty(2)),
2880                        follower_peers: vec![Peer::empty(4)],
2881                        leader_state: None,
2882                        leader_down_since: None,
2883                        write_route_policy: None,
2884                    },
2885                    RegionRoute {
2886                        region: Region::new_test(RegionId::new(table_id, 3)),
2887                        leader_peer: Some(Peer::empty(3)),
2888                        follower_peers: vec![],
2889                        leader_state: None,
2890                        leader_down_since: None,
2891                        write_route_policy: None,
2892                    },
2893                ]),
2894                serialized_options,
2895            )
2896            .await
2897            .unwrap();
2898        let table_route_value = table_metadata_manager
2899            .table_route_manager
2900            .table_route_storage()
2901            .get_with_raw_bytes(table_id)
2902            .await
2903            .unwrap()
2904            .unwrap();
2905        let region_routes = table_route_value.region_routes().unwrap();
2906        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2907        let table_route_value = TableRouteValue::physical(region_routes.clone());
2908
2909        table_metadata_manager
2910            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2911            .await
2912            .unwrap();
2913
2914        let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
2915        assert_eq!(dropped_tables.len(), 1);
2916        assert_eq!(dropped_tables[0].table_id, table_id);
2917        assert_eq!(dropped_tables[0].table_name, table_name);
2918
2919        let dropped_table = table_metadata_manager
2920            .get_dropped_table(&table_name)
2921            .await
2922            .unwrap()
2923            .unwrap();
2924        assert_eq!(dropped_table.table_id, table_id);
2925        assert_eq!(dropped_table.table_name, table_name);
2926        assert_eq!(dropped_table.table_info_value.table_info, table_info);
2927        assert_eq!(
2928            dropped_table.table_route_value.region_routes().unwrap(),
2929            region_routes
2930        );
2931        assert_eq!(dropped_table.region_wal_options, options);
2932
2933        let dropped_table_by_id = table_metadata_manager
2934            .get_dropped_table_by_id(table_id)
2935            .await
2936            .unwrap()
2937            .unwrap();
2938        assert_eq!(dropped_table_by_id.table_id, table_id);
2939        assert_eq!(dropped_table_by_id.table_name, table_name);
2940        assert_eq!(dropped_table_by_id.table_info_value.table_info, table_info);
2941        assert_eq!(
2942            dropped_table_by_id
2943                .table_route_value
2944                .region_routes()
2945                .unwrap(),
2946            region_routes
2947        );
2948        assert_eq!(dropped_table_by_id.region_wal_options, options);
2949    }
2950
2951    #[tokio::test]
2952    async fn test_dropped_table_lookup_survives_live_name_recreation() {
2953        let mem_kv = Arc::new(MemoryKvBackend::default());
2954        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2955        let dropped_table_id = 1025;
2956        let recreated_table_id = 1026;
2957        let table_name = "foo";
2958        let dropped_task = test_create_table_task(table_name, dropped_table_id);
2959        let dropped_table_info = dropped_task.table_info.clone();
2960        let options = create_mock_region_wal_options();
2961        let serialized_options = options.clone();
2962        table_metadata_manager
2963            .create_table_metadata(
2964                dropped_table_info.clone(),
2965                TableRouteValue::physical(vec![
2966                    RegionRoute {
2967                        region: Region::new_test(RegionId::new(dropped_table_id, 1)),
2968                        leader_peer: Some(Peer::empty(1)),
2969                        follower_peers: vec![Peer::empty(5)],
2970                        leader_state: None,
2971                        leader_down_since: None,
2972                        write_route_policy: None,
2973                    },
2974                    RegionRoute {
2975                        region: Region::new_test(RegionId::new(dropped_table_id, 2)),
2976                        leader_peer: Some(Peer::empty(2)),
2977                        follower_peers: vec![Peer::empty(4)],
2978                        leader_state: None,
2979                        leader_down_since: None,
2980                        write_route_policy: None,
2981                    },
2982                ]),
2983                serialized_options.clone(),
2984            )
2985            .await
2986            .unwrap();
2987
2988        let dropped_table_name =
2989            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2990        let dropped_table_route = table_metadata_manager
2991            .table_route_manager
2992            .table_route_storage()
2993            .get_with_raw_bytes(dropped_table_id)
2994            .await
2995            .unwrap()
2996            .unwrap();
2997        let dropped_table_route =
2998            TableRouteValue::physical(dropped_table_route.region_routes().unwrap().clone());
2999        table_metadata_manager
3000            .delete_table_metadata(
3001                dropped_table_id,
3002                &dropped_table_name,
3003                &dropped_table_route,
3004                &options,
3005            )
3006            .await
3007            .unwrap();
3008
3009        let recreated_task = test_create_table_task(table_name, recreated_table_id);
3010        table_metadata_manager
3011            .create_table_metadata(
3012                recreated_task.table_info,
3013                TableRouteValue::physical(vec![RegionRoute {
3014                    region: Region::new_test(RegionId::new(recreated_table_id, 1)),
3015                    leader_peer: Some(Peer::empty(4)),
3016                    follower_peers: vec![],
3017                    leader_state: None,
3018                    leader_down_since: None,
3019                    write_route_policy: None,
3020                }]),
3021                serialized_options,
3022            )
3023            .await
3024            .unwrap();
3025
3026        assert_eq!(
3027            table_metadata_manager
3028                .table_name_manager()
3029                .get(TableNameKey::from(&dropped_table_name))
3030                .await
3031                .unwrap()
3032                .unwrap()
3033                .table_id(),
3034            recreated_table_id
3035        );
3036
3037        let dropped_table = table_metadata_manager
3038            .get_dropped_table(&dropped_table_name)
3039            .await
3040            .unwrap()
3041            .unwrap();
3042        assert_eq!(dropped_table.table_id, dropped_table_id);
3043        assert_eq!(dropped_table.table_name, dropped_table_name);
3044        assert_eq!(
3045            dropped_table.table_info_value.table_info,
3046            dropped_table_info
3047        );
3048
3049        let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
3050        assert_eq!(dropped_tables.len(), 1);
3051        assert_eq!(dropped_tables[0].table_id, dropped_table_id);
3052        assert_eq!(dropped_tables[0].table_name, dropped_table_name);
3053    }
3054
3055    #[tokio::test]
3056    async fn test_dropped_table_lookup_ignores_unrelated_malformed_datanode_tombstones() {
3057        let mem_kv = Arc::new(MemoryKvBackend::default());
3058        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3059        let table_id = 1025;
3060        let table_name = "foo";
3061        let task = test_create_table_task(table_name, table_id);
3062        let table_info = task.table_info.clone();
3063        let options = create_mixed_region_wal_options();
3064        let serialized_options = options.clone();
3065        table_metadata_manager
3066            .create_table_metadata(
3067                table_info.clone(),
3068                TableRouteValue::physical(vec![
3069                    RegionRoute {
3070                        region: Region::new_test(RegionId::new(table_id, 1)),
3071                        leader_peer: Some(Peer::empty(1)),
3072                        follower_peers: vec![Peer::empty(5)],
3073                        leader_state: None,
3074                        leader_down_since: None,
3075                        write_route_policy: None,
3076                    },
3077                    RegionRoute {
3078                        region: Region::new_test(RegionId::new(table_id, 2)),
3079                        leader_peer: Some(Peer::empty(2)),
3080                        follower_peers: vec![Peer::empty(4)],
3081                        leader_state: None,
3082                        leader_down_since: None,
3083                        write_route_policy: None,
3084                    },
3085                ]),
3086                serialized_options,
3087            )
3088            .await
3089            .unwrap();
3090
3091        let table_route_value = table_metadata_manager
3092            .table_route_manager
3093            .table_route_storage()
3094            .get_with_raw_bytes(table_id)
3095            .await
3096            .unwrap()
3097            .unwrap();
3098        let region_routes = table_route_value.region_routes().unwrap();
3099        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3100        let table_route_value = TableRouteValue::physical(region_routes.clone());
3101
3102        table_metadata_manager
3103            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
3104            .await
3105            .unwrap();
3106
3107        mem_kv
3108            .put(
3109                PutRequest::new()
3110                    .with_key("__tombstone/__dn_table/not-a-datanode-table-key")
3111                    .with_value("malformed"),
3112            )
3113            .await
3114            .unwrap();
3115
3116        let dropped_table = table_metadata_manager
3117            .get_dropped_table(&table_name)
3118            .await
3119            .unwrap()
3120            .unwrap();
3121        assert_eq!(dropped_table.table_id, table_id);
3122        assert_eq!(dropped_table.table_name, table_name);
3123        assert_eq!(dropped_table.table_info_value.table_info, table_info);
3124        assert_eq!(dropped_table.region_wal_options, options);
3125    }
3126
3127    #[tokio::test]
3128    async fn test_create_update_view_info() {
3129        let mem_kv = Arc::new(MemoryKvBackend::default());
3130        let table_metadata_manager = TableMetadataManager::new(mem_kv);
3131
3132        let view_info = new_test_table_info();
3133
3134        let view_id = view_info.ident.table_id;
3135
3136        let logical_plan: Vec<u8> = vec![1, 2, 3];
3137        let columns = vec!["a".to_string()];
3138        let plan_columns = vec!["number".to_string()];
3139        let table_names = new_test_table_names();
3140        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
3141
3142        // Create metadata
3143        table_metadata_manager
3144            .create_view_metadata(
3145                view_info.clone(),
3146                logical_plan.clone(),
3147                table_names.clone(),
3148                columns.clone(),
3149                plan_columns.clone(),
3150                definition.to_string(),
3151            )
3152            .await
3153            .unwrap();
3154
3155        {
3156            // assert view info
3157            let current_view_info = table_metadata_manager
3158                .view_info_manager()
3159                .get(view_id)
3160                .await
3161                .unwrap()
3162                .unwrap()
3163                .into_inner();
3164            assert_eq!(current_view_info.view_info, logical_plan);
3165            assert_eq!(current_view_info.table_names, table_names);
3166            assert_eq!(current_view_info.definition, definition);
3167            assert_eq!(current_view_info.columns, columns);
3168            assert_eq!(current_view_info.plan_columns, plan_columns);
3169            // assert table info
3170            let current_table_info = table_metadata_manager
3171                .table_info_manager()
3172                .get(view_id)
3173                .await
3174                .unwrap()
3175                .unwrap()
3176                .into_inner();
3177            assert_eq!(current_table_info.table_info, view_info);
3178        }
3179
3180        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
3181        let new_table_names = {
3182            let mut set = HashSet::new();
3183            set.insert(TableName {
3184                catalog_name: "greptime".to_string(),
3185                schema_name: "public".to_string(),
3186                table_name: "b_table".to_string(),
3187            });
3188            set.insert(TableName {
3189                catalog_name: "greptime".to_string(),
3190                schema_name: "public".to_string(),
3191                table_name: "c_table".to_string(),
3192            });
3193            set
3194        };
3195        let new_columns = vec!["b".to_string()];
3196        let new_plan_columns = vec!["number2".to_string()];
3197        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
3198
3199        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
3200            logical_plan.clone().into(),
3201            table_names,
3202            columns,
3203            plan_columns,
3204            definition.to_string(),
3205        ));
3206        // should be ok.
3207        table_metadata_manager
3208            .update_view_info(
3209                view_id,
3210                &current_view_info_value,
3211                new_logical_plan.clone(),
3212                new_table_names.clone(),
3213                new_columns.clone(),
3214                new_plan_columns.clone(),
3215                new_definition.to_string(),
3216            )
3217            .await
3218            .unwrap();
3219        // if table info was updated, it should be ok.
3220        table_metadata_manager
3221            .update_view_info(
3222                view_id,
3223                &current_view_info_value,
3224                new_logical_plan.clone(),
3225                new_table_names.clone(),
3226                new_columns.clone(),
3227                new_plan_columns.clone(),
3228                new_definition.to_string(),
3229            )
3230            .await
3231            .unwrap();
3232
3233        // updated view_info should equal the `new_logical_plan`
3234        let updated_view_info = table_metadata_manager
3235            .view_info_manager()
3236            .get(view_id)
3237            .await
3238            .unwrap()
3239            .unwrap()
3240            .into_inner();
3241        assert_eq!(updated_view_info.view_info, new_logical_plan);
3242        assert_eq!(updated_view_info.table_names, new_table_names);
3243        assert_eq!(updated_view_info.definition, new_definition);
3244        assert_eq!(updated_view_info.columns, new_columns);
3245        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
3246
3247        let wrong_view_info = logical_plan.clone();
3248        let wrong_definition = "wrong_definition";
3249        let wrong_view_info_value =
3250            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
3251                wrong_view_info.into(),
3252                new_table_names.clone(),
3253                new_columns.clone(),
3254                new_plan_columns.clone(),
3255                wrong_definition.to_string(),
3256            ));
3257        // if the current_view_info_value is wrong, it should return an error.
3258        // The ABA problem.
3259        assert!(
3260            table_metadata_manager
3261                .update_view_info(
3262                    view_id,
3263                    &wrong_view_info_value,
3264                    new_logical_plan.clone(),
3265                    new_table_names.clone(),
3266                    vec!["c".to_string()],
3267                    vec!["number3".to_string()],
3268                    wrong_definition.to_string(),
3269                )
3270                .await
3271                .is_err()
3272        );
3273
3274        // The view_info is not changed.
3275        let current_view_info = table_metadata_manager
3276            .view_info_manager()
3277            .get(view_id)
3278            .await
3279            .unwrap()
3280            .unwrap()
3281            .into_inner();
3282        assert_eq!(current_view_info.view_info, new_logical_plan);
3283        assert_eq!(current_view_info.table_names, new_table_names);
3284        assert_eq!(current_view_info.definition, new_definition);
3285        assert_eq!(current_view_info.columns, new_columns);
3286        assert_eq!(current_view_info.plan_columns, new_plan_columns);
3287    }
3288
3289    #[test]
3290    fn test_region_role_set_deserialize() {
3291        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
3292        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3293        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3294        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
3295
3296        let s = r#"[1, 2, 3]"#;
3297        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3298        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3299        assert!(region_role_set.follower_regions.is_empty());
3300    }
3301
3302    #[test]
3303    fn test_region_distribution_deserialize() {
3304        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
3305        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
3306        assert_eq!(region_distribution.len(), 2);
3307        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
3308        assert!(region_distribution[&1].follower_regions.is_empty());
3309        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
3310        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
3311    }
3312}