common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
168pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
169pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
170pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
171
172pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
173pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
174pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
175pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
176pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
177pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
178pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
179pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
180pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
181// The legacy topic key prefix is used to store the topic name in previous versions.
182pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
183pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
184
185/// The election key.
186pub const ELECTION_KEY: &str = "__metasrv_election";
187/// The root key of metasrv election candidates.
188pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
189
190/// The keys with these prefixes will be loaded into the cache when the leader starts.
191pub const CACHE_KEY_PREFIXES: [&str; 5] = [
192    TABLE_NAME_KEY_PREFIX,
193    CATALOG_NAME_KEY_PREFIX,
194    SCHEMA_NAME_KEY_PREFIX,
195    TABLE_ROUTE_PREFIX,
196    NODE_ADDRESS_PREFIX,
197];
198
199/// A set of regions with the same role.
200#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
201pub struct RegionRoleSet {
202    /// Leader regions.
203    pub leader_regions: Vec<RegionNumber>,
204    /// Follower regions.
205    pub follower_regions: Vec<RegionNumber>,
206}
207
208impl<'de> Deserialize<'de> for RegionRoleSet {
209    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
210    where
211        D: serde::Deserializer<'de>,
212    {
213        #[derive(Deserialize)]
214        #[serde(untagged)]
215        enum RegionRoleSetOrLeaderOnly {
216            Full {
217                leader_regions: Vec<RegionNumber>,
218                follower_regions: Vec<RegionNumber>,
219            },
220            LeaderOnly(Vec<RegionNumber>),
221        }
222        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
223            RegionRoleSetOrLeaderOnly::Full {
224                leader_regions,
225                follower_regions,
226            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
227            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
228                Ok(RegionRoleSet::new(leader_regions, vec![]))
229            }
230        }
231    }
232}
233
234impl RegionRoleSet {
235    /// Create a new region role set.
236    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
237        Self {
238            leader_regions,
239            follower_regions,
240        }
241    }
242
243    /// Add a leader region to the set.
244    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
245        self.leader_regions.push(region_number);
246    }
247
248    /// Add a follower region to the set.
249    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
250        self.follower_regions.push(region_number);
251    }
252
253    /// Sort the regions.
254    pub fn sort(&mut self) {
255        self.follower_regions.sort();
256        self.leader_regions.sort();
257    }
258}
259
260/// The distribution of regions.
261///
262/// The key is the datanode id, the value is the region role set.
263pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
264
265/// The id of flow.
266pub type FlowId = u32;
267/// The partition of flow.
268pub type FlowPartitionId = u32;
269
270lazy_static! {
271    pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
272}
273
274lazy_static! {
275    static ref TABLE_INFO_KEY_PATTERN: Regex =
276        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
277}
278
279lazy_static! {
280    static ref VIEW_INFO_KEY_PATTERN: Regex =
281        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
286        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
291        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
296        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
297    ))
298    .unwrap();
299}
300
301lazy_static! {
302    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
303    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
304        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
305    ))
306        .unwrap();
307}
308
309lazy_static! {
310    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
311    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
312        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
313    ))
314        .unwrap();
315}
316
317lazy_static! {
318    static ref NODE_ADDRESS_PATTERN: Regex =
319        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
320}
321
322lazy_static! {
323    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
324        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
325}
326
327lazy_static! {
328    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
329        "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
330    ))
331    .unwrap();
332}
333
334/// The key of metadata.
335pub trait MetadataKey<'a, T> {
336    fn to_bytes(&self) -> Vec<u8>;
337
338    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
339}
340
341#[derive(Debug, Clone, PartialEq)]
342pub struct BytesAdapter(Vec<u8>);
343
344impl From<Vec<u8>> for BytesAdapter {
345    fn from(value: Vec<u8>) -> Self {
346        Self(value)
347    }
348}
349
350impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
351    fn to_bytes(&self) -> Vec<u8> {
352        self.0.clone()
353    }
354
355    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
356        Ok(BytesAdapter(bytes.to_vec()))
357    }
358}
359
360pub(crate) trait MetadataKeyGetTxnOp {
361    fn build_get_op(
362        &self,
363    ) -> (
364        TxnOp,
365        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
366    );
367}
368
369pub trait MetadataValue {
370    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
371    where
372        Self: Sized;
373
374    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
375}
376
377pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
378
379pub struct TableMetadataManager {
380    table_name_manager: TableNameManager,
381    table_info_manager: TableInfoManager,
382    view_info_manager: ViewInfoManager,
383    datanode_table_manager: DatanodeTableManager,
384    catalog_manager: CatalogManager,
385    schema_manager: SchemaManager,
386    table_route_manager: TableRouteManager,
387    tombstone_manager: TombstoneManager,
388    topic_name_manager: TopicNameManager,
389    topic_region_manager: TopicRegionManager,
390    kv_backend: KvBackendRef,
391}
392
393#[macro_export]
394macro_rules! ensure_values {
395    ($got:expr, $expected_value:expr, $name:expr) => {
396        ensure!(
397            $got == $expected_value,
398            error::UnexpectedSnafu {
399                err_msg: format!(
400                    "Reads the different value: {:?} during {}, expected: {:?}",
401                    $got, $name, $expected_value
402                )
403            }
404        );
405    };
406}
407
408/// A struct containing a deserialized value(`inner`) and an original bytes.
409///
410/// - Serialize behaviors:
411///
412/// The `inner` field will be ignored.
413///
414/// - Deserialize behaviors:
415///
416/// The `inner` field will be deserialized from the `bytes` field.
417pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
418    // The original bytes of the inner.
419    bytes: Bytes,
420    // The value was deserialized from the original bytes.
421    inner: T,
422}
423
424impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
425    type Target = T;
426
427    fn deref(&self) -> &Self::Target {
428        &self.inner
429    }
430}
431
432impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
433    fn deref_mut(&mut self) -> &mut Self::Target {
434        &mut self.inner
435    }
436}
437
438impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
439    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
440        write!(
441            f,
442            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
443            self.inner, self.bytes
444        )
445    }
446}
447
448impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
449    /// - Serialize behaviors:
450    ///
451    /// The `inner` field will be ignored.
452    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
453    where
454        S: serde::Serializer,
455    {
456        // Safety: The original bytes are always JSON encoded.
457        // It's more efficiently than `serialize_bytes`.
458        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
459    }
460}
461
462impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
463    for DeserializedValueWithBytes<T>
464{
465    /// - Deserialize behaviors:
466    ///
467    /// The `inner` field will be deserialized from the `bytes` field.
468    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
469    where
470        D: serde::Deserializer<'de>,
471    {
472        let buf = String::deserialize(deserializer)?;
473        let bytes = Bytes::from(buf);
474
475        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
476            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477
478        Ok(value)
479    }
480}
481
482impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
483    fn clone(&self) -> Self {
484        Self {
485            bytes: self.bytes.clone(),
486            inner: self.inner.clone(),
487        }
488    }
489}
490
491impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
492    /// Returns a struct containing a deserialized value and an original `bytes`.
493    /// It accepts original bytes of inner.
494    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
495        let inner = T::try_from_raw_value(&bytes)?;
496        Ok(Self { bytes, inner })
497    }
498
499    /// Returns a struct containing a deserialized value and an original `bytes`.
500    /// It accepts original bytes of inner.
501    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
502        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
503    }
504
505    pub fn into_inner(self) -> T {
506        self.inner
507    }
508
509    pub fn get_inner_ref(&self) -> &T {
510        &self.inner
511    }
512
513    /// Returns original `bytes`
514    pub fn get_raw_bytes(&self) -> Vec<u8> {
515        self.bytes.to_vec()
516    }
517
518    #[cfg(any(test, feature = "testing"))]
519    pub fn from_inner(inner: T) -> Self {
520        let bytes = serde_json::to_vec(&inner).unwrap();
521
522        Self {
523            bytes: Bytes::from(bytes),
524            inner,
525        }
526    }
527}
528
529impl TableMetadataManager {
530    pub fn new(kv_backend: KvBackendRef) -> Self {
531        TableMetadataManager {
532            table_name_manager: TableNameManager::new(kv_backend.clone()),
533            table_info_manager: TableInfoManager::new(kv_backend.clone()),
534            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
535            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
536            catalog_manager: CatalogManager::new(kv_backend.clone()),
537            schema_manager: SchemaManager::new(kv_backend.clone()),
538            table_route_manager: TableRouteManager::new(kv_backend.clone()),
539            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
540            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
541            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
542            kv_backend,
543        }
544    }
545
546    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
547    pub fn new_with_custom_tombstone_prefix(
548        kv_backend: KvBackendRef,
549        tombstone_prefix: &str,
550    ) -> Self {
551        Self {
552            table_name_manager: TableNameManager::new(kv_backend.clone()),
553            table_info_manager: TableInfoManager::new(kv_backend.clone()),
554            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
555            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
556            catalog_manager: CatalogManager::new(kv_backend.clone()),
557            schema_manager: SchemaManager::new(kv_backend.clone()),
558            table_route_manager: TableRouteManager::new(kv_backend.clone()),
559            tombstone_manager: TombstoneManager::new_with_prefix(
560                kv_backend.clone(),
561                tombstone_prefix,
562            ),
563            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
564            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
565            kv_backend,
566        }
567    }
568
569    pub async fn init(&self) -> Result<()> {
570        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
571
572        self.catalog_manager().create(catalog_name, true).await?;
573
574        let internal_schemas = [
575            DEFAULT_SCHEMA_NAME,
576            INFORMATION_SCHEMA_NAME,
577            DEFAULT_PRIVATE_SCHEMA_NAME,
578        ];
579
580        for schema_name in internal_schemas {
581            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
582
583            self.schema_manager().create(schema_key, None, true).await?;
584        }
585
586        Ok(())
587    }
588
589    pub fn table_name_manager(&self) -> &TableNameManager {
590        &self.table_name_manager
591    }
592
593    pub fn table_info_manager(&self) -> &TableInfoManager {
594        &self.table_info_manager
595    }
596
597    pub fn view_info_manager(&self) -> &ViewInfoManager {
598        &self.view_info_manager
599    }
600
601    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
602        &self.datanode_table_manager
603    }
604
605    pub fn catalog_manager(&self) -> &CatalogManager {
606        &self.catalog_manager
607    }
608
609    pub fn schema_manager(&self) -> &SchemaManager {
610        &self.schema_manager
611    }
612
613    pub fn table_route_manager(&self) -> &TableRouteManager {
614        &self.table_route_manager
615    }
616
617    pub fn topic_name_manager(&self) -> &TopicNameManager {
618        &self.topic_name_manager
619    }
620
621    pub fn topic_region_manager(&self) -> &TopicRegionManager {
622        &self.topic_region_manager
623    }
624
625    #[cfg(feature = "testing")]
626    pub fn kv_backend(&self) -> &KvBackendRef {
627        &self.kv_backend
628    }
629
630    pub async fn get_full_table_info(
631        &self,
632        table_id: TableId,
633    ) -> Result<(
634        Option<DeserializedValueWithBytes<TableInfoValue>>,
635        Option<DeserializedValueWithBytes<TableRouteValue>>,
636    )> {
637        let table_info_key = TableInfoKey::new(table_id);
638        let table_route_key = TableRouteKey::new(table_id);
639        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
640        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
641
642        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
643        let mut res = self.kv_backend.txn(txn).await?;
644        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
645        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
646        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
647        Ok((table_info_value, table_route_value))
648    }
649
650    /// Creates metadata for view and returns an error if different metadata exists.
651    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
652    /// Parameters include:
653    /// - `view_info`: the encoded logical plan
654    /// - `table_names`: the resolved fully table names in logical plan
655    /// - `columns`: the view columns
656    /// - `plan_columns`: the original plan columns
657    /// - `definition`: The SQL to create the view
658    ///
659    pub async fn create_view_metadata(
660        &self,
661        view_info: RawTableInfo,
662        raw_logical_plan: Vec<u8>,
663        table_names: HashSet<TableName>,
664        columns: Vec<String>,
665        plan_columns: Vec<String>,
666        definition: String,
667    ) -> Result<()> {
668        let view_id = view_info.ident.table_id;
669
670        // Creates view name.
671        let view_name = TableNameKey::new(
672            &view_info.catalog_name,
673            &view_info.schema_name,
674            &view_info.name,
675        );
676        let create_table_name_txn = self
677            .table_name_manager()
678            .build_create_txn(&view_name, view_id)?;
679
680        // Creates table info.
681        let table_info_value = TableInfoValue::new(view_info);
682
683        let (create_table_info_txn, on_create_table_info_failure) = self
684            .table_info_manager()
685            .build_create_txn(view_id, &table_info_value)?;
686
687        // Creates view info
688        let view_info_value = ViewInfoValue::new(
689            raw_logical_plan,
690            table_names,
691            columns,
692            plan_columns,
693            definition,
694        );
695        let (create_view_info_txn, on_create_view_info_failure) = self
696            .view_info_manager()
697            .build_create_txn(view_id, &view_info_value)?;
698
699        let txn = Txn::merge_all(vec![
700            create_table_name_txn,
701            create_table_info_txn,
702            create_view_info_txn,
703        ]);
704
705        let mut r = self.kv_backend.txn(txn).await?;
706
707        // Checks whether metadata was already created.
708        if !r.succeeded {
709            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
710            let remote_table_info = on_create_table_info_failure(&mut set)?
711                .context(error::UnexpectedSnafu {
712                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
713                })?
714                .into_inner();
715
716            let remote_view_info = on_create_view_info_failure(&mut set)?
717                .context(error::UnexpectedSnafu {
718                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
719                })?
720                .into_inner();
721
722            let op_name = "the creating view metadata";
723            ensure_values!(remote_table_info, table_info_value, op_name);
724            ensure_values!(remote_view_info, view_info_value, op_name);
725        }
726
727        Ok(())
728    }
729
730    /// Creates metadata for table and returns an error if different metadata exists.
731    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
732    pub async fn create_table_metadata(
733        &self,
734        mut table_info: RawTableInfo,
735        table_route_value: TableRouteValue,
736        region_wal_options: HashMap<RegionNumber, String>,
737    ) -> Result<()> {
738        let region_numbers = table_route_value.region_numbers();
739        table_info.meta.region_numbers = region_numbers;
740        let table_id = table_info.ident.table_id;
741        let engine = table_info.meta.engine.clone();
742
743        // Creates table name.
744        let table_name = TableNameKey::new(
745            &table_info.catalog_name,
746            &table_info.schema_name,
747            &table_info.name,
748        );
749        let create_table_name_txn = self
750            .table_name_manager()
751            .build_create_txn(&table_name, table_id)?;
752
753        let region_options = table_info.to_region_options();
754        // Creates table info.
755        let table_info_value = TableInfoValue::new(table_info);
756        let (create_table_info_txn, on_create_table_info_failure) = self
757            .table_info_manager()
758            .build_create_txn(table_id, &table_info_value)?;
759
760        let (create_table_route_txn, on_create_table_route_failure) = self
761            .table_route_manager()
762            .table_route_storage()
763            .build_create_txn(table_id, &table_route_value)?;
764
765        let create_topic_region_txn = self
766            .topic_region_manager
767            .build_create_txn(table_id, &region_wal_options)?;
768
769        let mut txn = Txn::merge_all(vec![
770            create_table_name_txn,
771            create_table_info_txn,
772            create_table_route_txn,
773            create_topic_region_txn,
774        ]);
775
776        if let TableRouteValue::Physical(x) = &table_route_value {
777            let region_storage_path = table_info_value.region_storage_path();
778            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
779                table_id,
780                &engine,
781                &region_storage_path,
782                region_options,
783                region_wal_options,
784                region_distribution(&x.region_routes),
785            )?;
786            txn = txn.merge(create_datanode_table_txn);
787        }
788
789        let mut r = self.kv_backend.txn(txn).await?;
790
791        // Checks whether metadata was already created.
792        if !r.succeeded {
793            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
794            let remote_table_info = on_create_table_info_failure(&mut set)?
795                .context(error::UnexpectedSnafu {
796                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
797                })?
798                .into_inner();
799
800            let remote_table_route = on_create_table_route_failure(&mut set)?
801                .context(error::UnexpectedSnafu {
802                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
803                })?
804                .into_inner();
805
806            let op_name = "the creating table metadata";
807            ensure_values!(remote_table_info, table_info_value, op_name);
808            ensure_values!(remote_table_route, table_route_value, op_name);
809        }
810
811        Ok(())
812    }
813
814    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
815        // The batch size is max_txn_size / 3 because the size of the `tables_data`
816        // is 3 times the size of the `tables_data`.
817        self.kv_backend.max_txn_ops() / 3
818    }
819
820    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
821    pub async fn create_logical_tables_metadata(
822        &self,
823        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
824    ) -> Result<()> {
825        let len = tables_data.len();
826        let mut txns = Vec::with_capacity(3 * len);
827        struct OnFailure<F1, R1, F2, R2>
828        where
829            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
830            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
831        {
832            table_info_value: TableInfoValue,
833            on_create_table_info_failure: F1,
834            table_route_value: TableRouteValue,
835            on_create_table_route_failure: F2,
836        }
837        let mut on_failures = Vec::with_capacity(len);
838        for (mut table_info, table_route_value) in tables_data {
839            table_info.meta.region_numbers = table_route_value.region_numbers();
840            let table_id = table_info.ident.table_id;
841
842            // Creates table name.
843            let table_name = TableNameKey::new(
844                &table_info.catalog_name,
845                &table_info.schema_name,
846                &table_info.name,
847            );
848            let create_table_name_txn = self
849                .table_name_manager()
850                .build_create_txn(&table_name, table_id)?;
851            txns.push(create_table_name_txn);
852
853            // Creates table info.
854            let table_info_value = TableInfoValue::new(table_info);
855            let (create_table_info_txn, on_create_table_info_failure) =
856                self.table_info_manager()
857                    .build_create_txn(table_id, &table_info_value)?;
858            txns.push(create_table_info_txn);
859
860            let (create_table_route_txn, on_create_table_route_failure) = self
861                .table_route_manager()
862                .table_route_storage()
863                .build_create_txn(table_id, &table_route_value)?;
864            txns.push(create_table_route_txn);
865
866            on_failures.push(OnFailure {
867                table_info_value,
868                on_create_table_info_failure,
869                table_route_value,
870                on_create_table_route_failure,
871            });
872        }
873
874        let txn = Txn::merge_all(txns);
875        let mut r = self.kv_backend.txn(txn).await?;
876
877        // Checks whether metadata was already created.
878        if !r.succeeded {
879            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
880            for on_failure in on_failures {
881                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
882                    .context(error::UnexpectedSnafu {
883                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
884                    })?
885                    .into_inner();
886
887                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
888                    .context(error::UnexpectedSnafu {
889                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
890                    })?
891                    .into_inner();
892
893                let op_name = "the creating logical tables metadata";
894                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
895                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
896            }
897        }
898
899        Ok(())
900    }
901
902    fn table_metadata_keys(
903        &self,
904        table_id: TableId,
905        table_name: &TableName,
906        table_route_value: &TableRouteValue,
907        region_wal_options: &HashMap<RegionNumber, WalOptions>,
908    ) -> Result<Vec<Vec<u8>>> {
909        // Builds keys
910        let datanode_ids = if table_route_value.is_physical() {
911            region_distribution(table_route_value.region_routes()?)
912                .into_keys()
913                .collect()
914        } else {
915            vec![]
916        };
917        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
918        let table_name = TableNameKey::new(
919            &table_name.catalog_name,
920            &table_name.schema_name,
921            &table_name.table_name,
922        );
923        let table_info_key = TableInfoKey::new(table_id);
924        let table_route_key = TableRouteKey::new(table_id);
925        let datanode_table_keys = datanode_ids
926            .into_iter()
927            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
928            .collect::<HashSet<_>>();
929        let topic_region_map = self
930            .topic_region_manager
931            .get_topic_region_mapping(table_id, region_wal_options);
932        let topic_region_keys = topic_region_map
933            .iter()
934            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
935            .collect::<Vec<_>>();
936        keys.push(table_name.to_bytes());
937        keys.push(table_info_key.to_bytes());
938        keys.push(table_route_key.to_bytes());
939        for key in &datanode_table_keys {
940            keys.push(key.to_bytes());
941        }
942        for key in topic_region_keys {
943            keys.push(key.to_bytes());
944        }
945        Ok(keys)
946    }
947
948    /// Deletes metadata for table **logically**.
949    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
950    pub async fn delete_table_metadata(
951        &self,
952        table_id: TableId,
953        table_name: &TableName,
954        table_route_value: &TableRouteValue,
955        region_wal_options: &HashMap<RegionNumber, WalOptions>,
956    ) -> Result<()> {
957        let keys =
958            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
959        self.tombstone_manager.create(keys).await.map(|_| ())
960    }
961
962    /// Deletes metadata tombstone for table **permanently**.
963    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
964    pub async fn delete_table_metadata_tombstone(
965        &self,
966        table_id: TableId,
967        table_name: &TableName,
968        table_route_value: &TableRouteValue,
969        region_wal_options: &HashMap<RegionNumber, WalOptions>,
970    ) -> Result<()> {
971        let table_metadata_keys =
972            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
973        self.tombstone_manager
974            .delete(table_metadata_keys)
975            .await
976            .map(|_| ())
977    }
978
979    /// Restores metadata for table.
980    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
981    pub async fn restore_table_metadata(
982        &self,
983        table_id: TableId,
984        table_name: &TableName,
985        table_route_value: &TableRouteValue,
986        region_wal_options: &HashMap<RegionNumber, WalOptions>,
987    ) -> Result<()> {
988        let keys =
989            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
990        self.tombstone_manager.restore(keys).await.map(|_| ())
991    }
992
993    /// Deletes metadata for table **permanently**.
994    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
995    pub async fn destroy_table_metadata(
996        &self,
997        table_id: TableId,
998        table_name: &TableName,
999        table_route_value: &TableRouteValue,
1000        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1001    ) -> Result<()> {
1002        let keys =
1003            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1004        let _ = self
1005            .kv_backend
1006            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1007            .await?;
1008        Ok(())
1009    }
1010
1011    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1012        let mut keys = Vec::with_capacity(3);
1013        let view_name = TableNameKey::new(
1014            &view_name.catalog_name,
1015            &view_name.schema_name,
1016            &view_name.table_name,
1017        );
1018        let table_info_key = TableInfoKey::new(view_id);
1019        let view_info_key = ViewInfoKey::new(view_id);
1020        keys.push(view_name.to_bytes());
1021        keys.push(table_info_key.to_bytes());
1022        keys.push(view_info_key.to_bytes());
1023
1024        Ok(keys)
1025    }
1026
1027    /// Deletes metadata for view **permanently**.
1028    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1029    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1030        let keys = self.view_info_keys(view_id, view_name)?;
1031        let _ = self
1032            .kv_backend
1033            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1034            .await?;
1035        Ok(())
1036    }
1037
1038    /// Renames the table name and returns an error if different metadata exists.
1039    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1040    /// and the new `TableNameKey` MUST be empty.
1041    pub async fn rename_table(
1042        &self,
1043        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1044        new_table_name: String,
1045    ) -> Result<()> {
1046        let current_table_info = &current_table_info_value.table_info;
1047        let table_id = current_table_info.ident.table_id;
1048
1049        let table_name_key = TableNameKey::new(
1050            &current_table_info.catalog_name,
1051            &current_table_info.schema_name,
1052            &current_table_info.name,
1053        );
1054
1055        let new_table_name_key = TableNameKey::new(
1056            &current_table_info.catalog_name,
1057            &current_table_info.schema_name,
1058            &new_table_name,
1059        );
1060
1061        // Updates table name.
1062        let update_table_name_txn = self.table_name_manager().build_update_txn(
1063            &table_name_key,
1064            &new_table_name_key,
1065            table_id,
1066        )?;
1067
1068        let new_table_info_value = current_table_info_value
1069            .inner
1070            .with_update(move |table_info| {
1071                table_info.name = new_table_name;
1072            });
1073
1074        // Updates table info.
1075        let (update_table_info_txn, on_update_table_info_failure) = self
1076            .table_info_manager()
1077            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1078
1079        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1080
1081        let mut r = self.kv_backend.txn(txn).await?;
1082
1083        // Checks whether metadata was already updated.
1084        if !r.succeeded {
1085            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1086            let remote_table_info = on_update_table_info_failure(&mut set)?
1087                .context(error::UnexpectedSnafu {
1088                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1089                })?
1090                .into_inner();
1091
1092            let op_name = "the renaming table metadata";
1093            ensure_values!(remote_table_info, new_table_info_value, op_name);
1094        }
1095
1096        Ok(())
1097    }
1098
1099    /// Updates table info and returns an error if different metadata exists.
1100    /// And cascade-ly update all redundant table options for each region
1101    /// if region_distribution is present.
1102    pub async fn update_table_info(
1103        &self,
1104        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1105        region_distribution: Option<RegionDistribution>,
1106        new_table_info: RawTableInfo,
1107    ) -> Result<()> {
1108        let table_id = current_table_info_value.table_info.ident.table_id;
1109        let new_table_info_value = current_table_info_value.update(new_table_info);
1110
1111        // Updates table info.
1112        let (update_table_info_txn, on_update_table_info_failure) = self
1113            .table_info_manager()
1114            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1115
1116        let txn = if let Some(region_distribution) = region_distribution {
1117            // region options induced from table info.
1118            let new_region_options = new_table_info_value.table_info.to_region_options();
1119            let update_datanode_table_options_txn = self
1120                .datanode_table_manager
1121                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1122                .await?;
1123            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1124        } else {
1125            update_table_info_txn
1126        };
1127
1128        let mut r = self.kv_backend.txn(txn).await?;
1129        // Checks whether metadata was already updated.
1130        if !r.succeeded {
1131            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1132            let remote_table_info = on_update_table_info_failure(&mut set)?
1133                .context(error::UnexpectedSnafu {
1134                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1135                })?
1136                .into_inner();
1137
1138            let op_name = "the updating table info";
1139            ensure_values!(remote_table_info, new_table_info_value, op_name);
1140        }
1141        Ok(())
1142    }
1143
1144    /// Updates view info and returns an error if different metadata exists.
1145    /// Parameters include:
1146    /// - `view_id`: the view id
1147    /// - `current_view_info_value`: the current view info for CAS checking
1148    /// - `new_view_info`: the encoded logical plan
1149    /// - `table_names`: the resolved fully table names in logical plan
1150    /// - `columns`: the view columns
1151    /// - `plan_columns`: the original plan columns
1152    /// - `definition`: The SQL to create the view
1153    ///
1154    #[allow(clippy::too_many_arguments)]
1155    pub async fn update_view_info(
1156        &self,
1157        view_id: TableId,
1158        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1159        new_view_info: Vec<u8>,
1160        table_names: HashSet<TableName>,
1161        columns: Vec<String>,
1162        plan_columns: Vec<String>,
1163        definition: String,
1164    ) -> Result<()> {
1165        let new_view_info_value = current_view_info_value.update(
1166            new_view_info,
1167            table_names,
1168            columns,
1169            plan_columns,
1170            definition,
1171        );
1172
1173        // Updates view info.
1174        let (update_view_info_txn, on_update_view_info_failure) = self
1175            .view_info_manager()
1176            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1177
1178        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1179
1180        // Checks whether metadata was already updated.
1181        if !r.succeeded {
1182            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1183            let remote_view_info = on_update_view_info_failure(&mut set)?
1184                .context(error::UnexpectedSnafu {
1185                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1186                })?
1187                .into_inner();
1188
1189            let op_name = "the updating view info";
1190            ensure_values!(remote_view_info, new_view_info_value, op_name);
1191        }
1192        Ok(())
1193    }
1194
1195    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1196        self.kv_backend.max_txn_ops()
1197    }
1198
1199    pub async fn batch_update_table_info_values(
1200        &self,
1201        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1202    ) -> Result<()> {
1203        let len = table_info_value_pairs.len();
1204        let mut txns = Vec::with_capacity(len);
1205        struct OnFailure<F, R>
1206        where
1207            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1208        {
1209            table_info_value: TableInfoValue,
1210            on_update_table_info_failure: F,
1211        }
1212        let mut on_failures = Vec::with_capacity(len);
1213
1214        for (table_info_value, new_table_info) in table_info_value_pairs {
1215            let table_id = table_info_value.table_info.ident.table_id;
1216
1217            let new_table_info_value = table_info_value.update(new_table_info);
1218
1219            let (update_table_info_txn, on_update_table_info_failure) =
1220                self.table_info_manager().build_update_txn(
1221                    table_id,
1222                    &table_info_value,
1223                    &new_table_info_value,
1224                )?;
1225
1226            txns.push(update_table_info_txn);
1227
1228            on_failures.push(OnFailure {
1229                table_info_value: new_table_info_value,
1230                on_update_table_info_failure,
1231            });
1232        }
1233
1234        let txn = Txn::merge_all(txns);
1235        let mut r = self.kv_backend.txn(txn).await?;
1236
1237        if !r.succeeded {
1238            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1239            for on_failure in on_failures {
1240                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1241                    .context(error::UnexpectedSnafu {
1242                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1243                    })?
1244                    .into_inner();
1245
1246                let op_name = "the batch updating table info";
1247                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1248            }
1249        }
1250
1251        Ok(())
1252    }
1253
1254    pub async fn update_table_route(
1255        &self,
1256        table_id: TableId,
1257        region_info: RegionInfo,
1258        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1259        new_region_routes: Vec<RegionRoute>,
1260        new_region_options: &HashMap<String, String>,
1261        new_region_wal_options: &HashMap<RegionNumber, String>,
1262    ) -> Result<()> {
1263        // Updates the datanode table key value pairs.
1264        let current_region_distribution =
1265            region_distribution(current_table_route_value.region_routes()?);
1266        let new_region_distribution = region_distribution(&new_region_routes);
1267
1268        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1269            table_id,
1270            region_info,
1271            current_region_distribution,
1272            new_region_distribution,
1273            new_region_options,
1274            new_region_wal_options,
1275        )?;
1276
1277        // Updates the table_route.
1278        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1279
1280        let (update_table_route_txn, on_update_table_route_failure) = self
1281            .table_route_manager()
1282            .table_route_storage()
1283            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1284
1285        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1286
1287        let mut r = self.kv_backend.txn(txn).await?;
1288
1289        // Checks whether metadata was already updated.
1290        if !r.succeeded {
1291            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1292            let remote_table_route = on_update_table_route_failure(&mut set)?
1293                .context(error::UnexpectedSnafu {
1294                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1295                })?
1296                .into_inner();
1297
1298            let op_name = "the updating table route";
1299            ensure_values!(remote_table_route, new_table_route_value, op_name);
1300        }
1301
1302        Ok(())
1303    }
1304
1305    /// Updates the leader status of the [RegionRoute].
1306    pub async fn update_leader_region_status<F>(
1307        &self,
1308        table_id: TableId,
1309        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1310        next_region_route_status: F,
1311    ) -> Result<()>
1312    where
1313        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1314    {
1315        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1316
1317        let mut updated = 0;
1318        for route in &mut new_region_routes {
1319            if let Some(state) = next_region_route_status(route) {
1320                if route.set_leader_state(state) {
1321                    updated += 1;
1322                }
1323            }
1324        }
1325
1326        if updated == 0 {
1327            warn!("No leader status updated");
1328            return Ok(());
1329        }
1330
1331        // Updates the table_route.
1332        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1333
1334        let (update_table_route_txn, on_update_table_route_failure) = self
1335            .table_route_manager()
1336            .table_route_storage()
1337            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1338
1339        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1340
1341        // Checks whether metadata was already updated.
1342        if !r.succeeded {
1343            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1344            let remote_table_route = on_update_table_route_failure(&mut set)?
1345                .context(error::UnexpectedSnafu {
1346                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1347                })?
1348                .into_inner();
1349
1350            let op_name = "the updating leader region status";
1351            ensure_values!(remote_table_route, new_table_route_value, op_name);
1352        }
1353
1354        Ok(())
1355    }
1356}
1357
1358#[macro_export]
1359macro_rules! impl_metadata_value {
1360    ($($val_ty: ty), *) => {
1361        $(
1362            impl $crate::key::MetadataValue for $val_ty {
1363                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1364                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1365                }
1366
1367                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1368                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1369                }
1370            }
1371        )*
1372    }
1373}
1374
1375macro_rules! impl_metadata_key_get_txn_op {
1376    ($($key: ty), *) => {
1377        $(
1378            impl $crate::key::MetadataKeyGetTxnOp for $key {
1379                /// Returns a [TxnOp] to retrieve the corresponding value
1380                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1381                fn build_get_op(
1382                    &self,
1383                ) -> (
1384                    TxnOp,
1385                    impl for<'a> FnMut(
1386                        &'a mut TxnOpGetResponseSet,
1387                    ) -> Option<Vec<u8>>,
1388                ) {
1389                    let raw_key = self.to_bytes();
1390                    (
1391                        TxnOp::Get(raw_key.clone()),
1392                        TxnOpGetResponseSet::filter(raw_key),
1393                    )
1394                }
1395            }
1396        )*
1397    }
1398}
1399
1400impl_metadata_key_get_txn_op! {
1401    TableNameKey<'_>,
1402    TableInfoKey,
1403    ViewInfoKey,
1404    TableRouteKey,
1405    DatanodeTableKey
1406}
1407
1408#[macro_export]
1409macro_rules! impl_optional_metadata_value {
1410    ($($val_ty: ty), *) => {
1411        $(
1412            impl $val_ty {
1413                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1414                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1415                }
1416
1417                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1418                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1419                }
1420            }
1421        )*
1422    }
1423}
1424
1425impl_metadata_value! {
1426    TableNameValue,
1427    TableInfoValue,
1428    ViewInfoValue,
1429    DatanodeTableValue,
1430    FlowInfoValue,
1431    FlowNameValue,
1432    FlowRouteValue,
1433    TableFlowValue,
1434    NodeAddressValue,
1435    SchemaNameValue,
1436    FlowStateValue,
1437    PoisonValue
1438}
1439
1440impl_optional_metadata_value! {
1441    CatalogNameValue,
1442    SchemaNameValue
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447    use std::collections::{BTreeMap, HashMap, HashSet};
1448    use std::sync::Arc;
1449
1450    use bytes::Bytes;
1451    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1452    use common_time::util::current_time_millis;
1453    use common_wal::options::{KafkaWalOptions, WalOptions};
1454    use futures::TryStreamExt;
1455    use store_api::storage::{RegionId, RegionNumber};
1456    use table::metadata::{RawTableInfo, TableInfo};
1457    use table::table_name::TableName;
1458
1459    use super::datanode_table::DatanodeTableKey;
1460    use super::test_utils;
1461    use crate::ddl::test_util::create_table::test_create_table_task;
1462    use crate::ddl::utils::region_storage_path;
1463    use crate::error::Result;
1464    use crate::key::datanode_table::RegionInfo;
1465    use crate::key::table_info::TableInfoValue;
1466    use crate::key::table_name::TableNameKey;
1467    use crate::key::table_route::TableRouteValue;
1468    use crate::key::{
1469        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1470        ViewInfoValue, TOPIC_REGION_PREFIX,
1471    };
1472    use crate::kv_backend::memory::MemoryKvBackend;
1473    use crate::kv_backend::KvBackend;
1474    use crate::peer::Peer;
1475    use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1476    use crate::rpc::store::RangeRequest;
1477    use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1478
1479    #[test]
1480    fn test_deserialized_value_with_bytes() {
1481        let region_route = new_test_region_route();
1482        let region_routes = vec![region_route.clone()];
1483
1484        let expected_region_routes =
1485            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1486        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1487
1488        // Serialize behaviors:
1489        // The inner field will be ignored.
1490        let value = DeserializedValueWithBytes {
1491            // ignored
1492            inner: TableRouteValue::physical(region_routes.clone()),
1493            bytes: Bytes::from(expected.clone()),
1494        };
1495
1496        let encoded = serde_json::to_vec(&value).unwrap();
1497
1498        // Deserialize behaviors:
1499        // The inner field will be deserialized from the bytes field.
1500        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1501            serde_json::from_slice(&encoded).unwrap();
1502
1503        assert_eq!(decoded.inner, expected_region_routes);
1504        assert_eq!(decoded.bytes, expected);
1505    }
1506
1507    fn new_test_region_route() -> RegionRoute {
1508        new_region_route(1, 2)
1509    }
1510
1511    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1512        RegionRoute {
1513            region: Region {
1514                id: region_id.into(),
1515                name: "r1".to_string(),
1516                partition: None,
1517                attrs: BTreeMap::new(),
1518                partition_expr: Default::default(),
1519            },
1520            leader_peer: Some(Peer::new(datanode, "a2")),
1521            follower_peers: vec![],
1522            leader_state: None,
1523            leader_down_since: None,
1524        }
1525    }
1526
1527    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1528        test_utils::new_test_table_info(10, region_numbers)
1529    }
1530
1531    fn new_test_table_names() -> HashSet<TableName> {
1532        let mut set = HashSet::new();
1533        set.insert(TableName {
1534            catalog_name: "greptime".to_string(),
1535            schema_name: "public".to_string(),
1536            table_name: "a_table".to_string(),
1537        });
1538        set.insert(TableName {
1539            catalog_name: "greptime".to_string(),
1540            schema_name: "public".to_string(),
1541            table_name: "b_table".to_string(),
1542        });
1543        set
1544    }
1545
1546    async fn create_physical_table_metadata(
1547        table_metadata_manager: &TableMetadataManager,
1548        table_info: RawTableInfo,
1549        region_routes: Vec<RegionRoute>,
1550        region_wal_options: HashMap<RegionNumber, String>,
1551    ) -> Result<()> {
1552        table_metadata_manager
1553            .create_table_metadata(
1554                table_info,
1555                TableRouteValue::physical(region_routes),
1556                region_wal_options,
1557            )
1558            .await
1559    }
1560
1561    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1562        let topics = (0..2)
1563            .map(|i| format!("greptimedb_topic{}", i))
1564            .collect::<Vec<_>>();
1565        let wal_options = topics
1566            .iter()
1567            .map(|topic| {
1568                WalOptions::Kafka(KafkaWalOptions {
1569                    topic: topic.clone(),
1570                })
1571            })
1572            .collect::<Vec<_>>();
1573
1574        (0..16)
1575            .enumerate()
1576            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1577            .collect()
1578    }
1579
1580    #[tokio::test]
1581    async fn test_raft_engine_topic_region_map() {
1582        let mem_kv = Arc::new(MemoryKvBackend::default());
1583        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1584        let region_route = new_test_region_route();
1585        let region_routes = &vec![region_route.clone()];
1586        let table_info: RawTableInfo =
1587            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1588        let wal_allocator = WalOptionsAllocator::RaftEngine;
1589        let regions = (0..16).collect();
1590        let region_wal_options =
1591            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1592        create_physical_table_metadata(
1593            &table_metadata_manager,
1594            table_info.clone(),
1595            region_routes.clone(),
1596            region_wal_options.clone(),
1597        )
1598        .await
1599        .unwrap();
1600
1601        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1602        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1603        let resp = mem_kv.range(range_req).await.unwrap();
1604        // Should be empty because the topic region map is empty for raft engine.
1605        assert!(resp.kvs.is_empty());
1606    }
1607
1608    #[tokio::test]
1609    async fn test_create_table_metadata() {
1610        let mem_kv = Arc::new(MemoryKvBackend::default());
1611        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1612        let region_route = new_test_region_route();
1613        let region_routes = &vec![region_route.clone()];
1614        let table_info: RawTableInfo =
1615            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1616        let region_wal_options = create_mock_region_wal_options()
1617            .into_iter()
1618            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1619            .collect::<HashMap<_, _>>();
1620
1621        // creates metadata.
1622        create_physical_table_metadata(
1623            &table_metadata_manager,
1624            table_info.clone(),
1625            region_routes.clone(),
1626            region_wal_options.clone(),
1627        )
1628        .await
1629        .unwrap();
1630
1631        // if metadata was already created, it should be ok.
1632        assert!(create_physical_table_metadata(
1633            &table_metadata_manager,
1634            table_info.clone(),
1635            region_routes.clone(),
1636            region_wal_options.clone(),
1637        )
1638        .await
1639        .is_ok());
1640
1641        let mut modified_region_routes = region_routes.clone();
1642        modified_region_routes.push(region_route.clone());
1643        // if remote metadata was exists, it should return an error.
1644        assert!(create_physical_table_metadata(
1645            &table_metadata_manager,
1646            table_info.clone(),
1647            modified_region_routes,
1648            region_wal_options.clone(),
1649        )
1650        .await
1651        .is_err());
1652
1653        let (remote_table_info, remote_table_route) = table_metadata_manager
1654            .get_full_table_info(10)
1655            .await
1656            .unwrap();
1657
1658        assert_eq!(
1659            remote_table_info.unwrap().into_inner().table_info,
1660            table_info
1661        );
1662        assert_eq!(
1663            remote_table_route
1664                .unwrap()
1665                .into_inner()
1666                .region_routes()
1667                .unwrap(),
1668            region_routes
1669        );
1670
1671        for i in 0..2 {
1672            let region_number = i as u32;
1673            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1674            let topic = format!("greptimedb_topic{}", i);
1675            let regions = table_metadata_manager
1676                .topic_region_manager
1677                .regions(&topic)
1678                .await
1679                .unwrap();
1680            assert_eq!(regions.len(), 8);
1681            assert_eq!(regions[0], region_id);
1682        }
1683    }
1684
1685    #[tokio::test]
1686    async fn test_create_logic_tables_metadata() {
1687        let mem_kv = Arc::new(MemoryKvBackend::default());
1688        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1689        let region_route = new_test_region_route();
1690        let region_routes = vec![region_route.clone()];
1691        let table_info: RawTableInfo =
1692            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1693        let table_id = table_info.ident.table_id;
1694        let table_route_value = TableRouteValue::physical(region_routes.clone());
1695
1696        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1697        // creates metadata.
1698        table_metadata_manager
1699            .create_logical_tables_metadata(tables_data.clone())
1700            .await
1701            .unwrap();
1702
1703        // if metadata was already created, it should be ok.
1704        assert!(table_metadata_manager
1705            .create_logical_tables_metadata(tables_data)
1706            .await
1707            .is_ok());
1708
1709        let mut modified_region_routes = region_routes.clone();
1710        modified_region_routes.push(new_region_route(2, 3));
1711        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1712        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1713        // if remote metadata was exists, it should return an error.
1714        assert!(table_metadata_manager
1715            .create_logical_tables_metadata(modified_tables_data)
1716            .await
1717            .is_err());
1718
1719        let (remote_table_info, remote_table_route) = table_metadata_manager
1720            .get_full_table_info(table_id)
1721            .await
1722            .unwrap();
1723
1724        assert_eq!(
1725            remote_table_info.unwrap().into_inner().table_info,
1726            table_info
1727        );
1728        assert_eq!(
1729            remote_table_route
1730                .unwrap()
1731                .into_inner()
1732                .region_routes()
1733                .unwrap(),
1734            &region_routes
1735        );
1736    }
1737
1738    #[tokio::test]
1739    async fn test_create_many_logical_tables_metadata() {
1740        let kv_backend = Arc::new(MemoryKvBackend::default());
1741        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1742
1743        let mut tables_data = vec![];
1744        for i in 0..128 {
1745            let table_id = i + 1;
1746            let regin_number = table_id * 3;
1747            let region_id = RegionId::new(table_id, regin_number);
1748            let region_route = new_region_route(region_id.as_u64(), 2);
1749            let region_routes = vec![region_route.clone()];
1750            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1751                table_id,
1752                &format!("my_table_{}", table_id),
1753                region_routes.iter().map(|r| r.region.id.region_number()),
1754            )
1755            .into();
1756            let table_route_value = TableRouteValue::physical(region_routes.clone());
1757
1758            tables_data.push((table_info, table_route_value));
1759        }
1760
1761        // creates metadata.
1762        table_metadata_manager
1763            .create_logical_tables_metadata(tables_data)
1764            .await
1765            .unwrap();
1766    }
1767
1768    #[tokio::test]
1769    async fn test_delete_table_metadata() {
1770        let mem_kv = Arc::new(MemoryKvBackend::default());
1771        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1772        let region_route = new_test_region_route();
1773        let region_routes = &vec![region_route.clone()];
1774        let table_info: RawTableInfo =
1775            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1776        let table_id = table_info.ident.table_id;
1777        let datanode_id = 2;
1778        let region_wal_options = create_mock_region_wal_options();
1779        let serialized_region_wal_options = region_wal_options
1780            .iter()
1781            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1782            .collect::<HashMap<_, _>>();
1783
1784        // creates metadata.
1785        create_physical_table_metadata(
1786            &table_metadata_manager,
1787            table_info.clone(),
1788            region_routes.clone(),
1789            serialized_region_wal_options,
1790        )
1791        .await
1792        .unwrap();
1793
1794        let table_name = TableName::new(
1795            table_info.catalog_name,
1796            table_info.schema_name,
1797            table_info.name,
1798        );
1799        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1800        // deletes metadata.
1801        table_metadata_manager
1802            .delete_table_metadata(
1803                table_id,
1804                &table_name,
1805                table_route_value,
1806                &region_wal_options,
1807            )
1808            .await
1809            .unwrap();
1810        // Should be ignored.
1811        table_metadata_manager
1812            .delete_table_metadata(
1813                table_id,
1814                &table_name,
1815                table_route_value,
1816                &region_wal_options,
1817            )
1818            .await
1819            .unwrap();
1820        assert!(table_metadata_manager
1821            .table_info_manager()
1822            .get(table_id)
1823            .await
1824            .unwrap()
1825            .is_none());
1826        assert!(table_metadata_manager
1827            .table_route_manager()
1828            .table_route_storage()
1829            .get(table_id)
1830            .await
1831            .unwrap()
1832            .is_none());
1833        assert!(table_metadata_manager
1834            .datanode_table_manager()
1835            .tables(datanode_id)
1836            .try_collect::<Vec<_>>()
1837            .await
1838            .unwrap()
1839            .is_empty());
1840        // Checks removed values
1841        let table_info = table_metadata_manager
1842            .table_info_manager()
1843            .get(table_id)
1844            .await
1845            .unwrap();
1846        assert!(table_info.is_none());
1847        let table_route = table_metadata_manager
1848            .table_route_manager()
1849            .table_route_storage()
1850            .get(table_id)
1851            .await
1852            .unwrap();
1853        assert!(table_route.is_none());
1854        // Logical delete removes the topic region mapping as well.
1855        let regions = table_metadata_manager
1856            .topic_region_manager
1857            .regions("greptimedb_topic0")
1858            .await
1859            .unwrap();
1860        assert_eq!(regions.len(), 0);
1861        let regions = table_metadata_manager
1862            .topic_region_manager
1863            .regions("greptimedb_topic1")
1864            .await
1865            .unwrap();
1866        assert_eq!(regions.len(), 0);
1867    }
1868
1869    #[tokio::test]
1870    async fn test_rename_table() {
1871        let mem_kv = Arc::new(MemoryKvBackend::default());
1872        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1873        let region_route = new_test_region_route();
1874        let region_routes = vec![region_route.clone()];
1875        let table_info: RawTableInfo =
1876            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1877        let table_id = table_info.ident.table_id;
1878        // creates metadata.
1879        create_physical_table_metadata(
1880            &table_metadata_manager,
1881            table_info.clone(),
1882            region_routes.clone(),
1883            HashMap::new(),
1884        )
1885        .await
1886        .unwrap();
1887
1888        let new_table_name = "another_name".to_string();
1889        let table_info_value =
1890            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1891
1892        table_metadata_manager
1893            .rename_table(&table_info_value, new_table_name.clone())
1894            .await
1895            .unwrap();
1896        // if remote metadata was updated, it should be ok.
1897        table_metadata_manager
1898            .rename_table(&table_info_value, new_table_name.clone())
1899            .await
1900            .unwrap();
1901        let mut modified_table_info = table_info.clone();
1902        modified_table_info.name = "hi".to_string();
1903        let modified_table_info_value =
1904            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1905        // if the table_info_value is wrong, it should return an error.
1906        // The ABA problem.
1907        assert!(table_metadata_manager
1908            .rename_table(&modified_table_info_value, new_table_name.clone())
1909            .await
1910            .is_err());
1911
1912        let old_table_name = TableNameKey::new(
1913            &table_info.catalog_name,
1914            &table_info.schema_name,
1915            &table_info.name,
1916        );
1917        let new_table_name = TableNameKey::new(
1918            &table_info.catalog_name,
1919            &table_info.schema_name,
1920            &new_table_name,
1921        );
1922
1923        assert!(table_metadata_manager
1924            .table_name_manager()
1925            .get(old_table_name)
1926            .await
1927            .unwrap()
1928            .is_none());
1929
1930        assert_eq!(
1931            table_metadata_manager
1932                .table_name_manager()
1933                .get(new_table_name)
1934                .await
1935                .unwrap()
1936                .unwrap()
1937                .table_id(),
1938            table_id
1939        );
1940    }
1941
1942    #[tokio::test]
1943    async fn test_update_table_info() {
1944        let mem_kv = Arc::new(MemoryKvBackend::default());
1945        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1946        let region_route = new_test_region_route();
1947        let region_routes = vec![region_route.clone()];
1948        let table_info: RawTableInfo =
1949            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1950        let table_id = table_info.ident.table_id;
1951        // creates metadata.
1952        create_physical_table_metadata(
1953            &table_metadata_manager,
1954            table_info.clone(),
1955            region_routes.clone(),
1956            HashMap::new(),
1957        )
1958        .await
1959        .unwrap();
1960
1961        let mut new_table_info = table_info.clone();
1962        new_table_info.name = "hi".to_string();
1963        let current_table_info_value =
1964            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1965        // should be ok.
1966        table_metadata_manager
1967            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1968            .await
1969            .unwrap();
1970        // if table info was updated, it should be ok.
1971        table_metadata_manager
1972            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1973            .await
1974            .unwrap();
1975
1976        // updated table_info should equal the `new_table_info`
1977        let updated_table_info = table_metadata_manager
1978            .table_info_manager()
1979            .get(table_id)
1980            .await
1981            .unwrap()
1982            .unwrap()
1983            .into_inner();
1984        assert_eq!(updated_table_info.table_info, new_table_info);
1985
1986        let mut wrong_table_info = table_info.clone();
1987        wrong_table_info.name = "wrong".to_string();
1988        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1989            current_table_info_value.update(wrong_table_info),
1990        );
1991        // if the current_table_info_value is wrong, it should return an error.
1992        // The ABA problem.
1993        assert!(table_metadata_manager
1994            .update_table_info(&wrong_table_info_value, None, new_table_info)
1995            .await
1996            .is_err())
1997    }
1998
1999    #[tokio::test]
2000    async fn test_update_table_leader_region_status() {
2001        let mem_kv = Arc::new(MemoryKvBackend::default());
2002        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2003        let datanode = 1;
2004        let region_routes = vec![
2005            RegionRoute {
2006                region: Region {
2007                    id: 1.into(),
2008                    name: "r1".to_string(),
2009                    partition: None,
2010                    attrs: BTreeMap::new(),
2011                    partition_expr: Default::default(),
2012                },
2013                leader_peer: Some(Peer::new(datanode, "a2")),
2014                leader_state: Some(LeaderState::Downgrading),
2015                follower_peers: vec![],
2016                leader_down_since: Some(current_time_millis()),
2017            },
2018            RegionRoute {
2019                region: Region {
2020                    id: 2.into(),
2021                    name: "r2".to_string(),
2022                    partition: None,
2023                    attrs: BTreeMap::new(),
2024                    partition_expr: Default::default(),
2025                },
2026                leader_peer: Some(Peer::new(datanode, "a1")),
2027                leader_state: None,
2028                follower_peers: vec![],
2029                leader_down_since: None,
2030            },
2031        ];
2032        let table_info: RawTableInfo =
2033            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2034        let table_id = table_info.ident.table_id;
2035        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2036            TableRouteValue::physical(region_routes.clone()),
2037        );
2038
2039        // creates metadata.
2040        create_physical_table_metadata(
2041            &table_metadata_manager,
2042            table_info.clone(),
2043            region_routes.clone(),
2044            HashMap::new(),
2045        )
2046        .await
2047        .unwrap();
2048
2049        table_metadata_manager
2050            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2051                if region_route.leader_state.is_some() {
2052                    None
2053                } else {
2054                    Some(Some(LeaderState::Downgrading))
2055                }
2056            })
2057            .await
2058            .unwrap();
2059
2060        let updated_route_value = table_metadata_manager
2061            .table_route_manager()
2062            .table_route_storage()
2063            .get(table_id)
2064            .await
2065            .unwrap()
2066            .unwrap();
2067
2068        assert_eq!(
2069            updated_route_value.region_routes().unwrap()[0].leader_state,
2070            Some(LeaderState::Downgrading)
2071        );
2072
2073        assert!(updated_route_value.region_routes().unwrap()[0]
2074            .leader_down_since
2075            .is_some());
2076
2077        assert_eq!(
2078            updated_route_value.region_routes().unwrap()[1].leader_state,
2079            Some(LeaderState::Downgrading)
2080        );
2081        assert!(updated_route_value.region_routes().unwrap()[1]
2082            .leader_down_since
2083            .is_some());
2084    }
2085
2086    async fn assert_datanode_table(
2087        table_metadata_manager: &TableMetadataManager,
2088        table_id: u32,
2089        region_routes: &[RegionRoute],
2090    ) {
2091        let region_distribution = region_distribution(region_routes);
2092        for (datanode, regions) in region_distribution {
2093            let got = table_metadata_manager
2094                .datanode_table_manager()
2095                .get(&DatanodeTableKey::new(datanode, table_id))
2096                .await
2097                .unwrap()
2098                .unwrap();
2099
2100            assert_eq!(got.regions, regions.leader_regions);
2101            assert_eq!(got.follower_regions, regions.follower_regions);
2102        }
2103    }
2104
2105    #[tokio::test]
2106    async fn test_update_table_route() {
2107        let mem_kv = Arc::new(MemoryKvBackend::default());
2108        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2109        let region_route = new_test_region_route();
2110        let region_routes = vec![region_route.clone()];
2111        let table_info: RawTableInfo =
2112            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2113        let table_id = table_info.ident.table_id;
2114        let engine = table_info.meta.engine.as_str();
2115        let region_storage_path =
2116            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2117        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2118            TableRouteValue::physical(region_routes.clone()),
2119        );
2120
2121        // creates metadata.
2122        create_physical_table_metadata(
2123            &table_metadata_manager,
2124            table_info.clone(),
2125            region_routes.clone(),
2126            HashMap::new(),
2127        )
2128        .await
2129        .unwrap();
2130
2131        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2132        let new_region_routes = vec![
2133            new_region_route(1, 1),
2134            new_region_route(2, 2),
2135            new_region_route(3, 3),
2136        ];
2137        // it should be ok.
2138        table_metadata_manager
2139            .update_table_route(
2140                table_id,
2141                RegionInfo {
2142                    engine: engine.to_string(),
2143                    region_storage_path: region_storage_path.to_string(),
2144                    region_options: HashMap::new(),
2145                    region_wal_options: HashMap::new(),
2146                },
2147                &current_table_route_value,
2148                new_region_routes.clone(),
2149                &HashMap::new(),
2150                &HashMap::new(),
2151            )
2152            .await
2153            .unwrap();
2154        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2155
2156        // if the table route was updated. it should be ok.
2157        table_metadata_manager
2158            .update_table_route(
2159                table_id,
2160                RegionInfo {
2161                    engine: engine.to_string(),
2162                    region_storage_path: region_storage_path.to_string(),
2163                    region_options: HashMap::new(),
2164                    region_wal_options: HashMap::new(),
2165                },
2166                &current_table_route_value,
2167                new_region_routes.clone(),
2168                &HashMap::new(),
2169                &HashMap::new(),
2170            )
2171            .await
2172            .unwrap();
2173
2174        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2175            current_table_route_value
2176                .inner
2177                .update(new_region_routes.clone())
2178                .unwrap(),
2179        );
2180        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2181        // it should be ok.
2182        table_metadata_manager
2183            .update_table_route(
2184                table_id,
2185                RegionInfo {
2186                    engine: engine.to_string(),
2187                    region_storage_path: region_storage_path.to_string(),
2188                    region_options: HashMap::new(),
2189                    region_wal_options: HashMap::new(),
2190                },
2191                &current_table_route_value,
2192                new_region_routes.clone(),
2193                &HashMap::new(),
2194                &HashMap::new(),
2195            )
2196            .await
2197            .unwrap();
2198        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2199
2200        // if the current_table_route_value is wrong, it should return an error.
2201        // The ABA problem.
2202        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2203            current_table_route_value
2204                .update(vec![
2205                    new_region_route(1, 1),
2206                    new_region_route(2, 2),
2207                    new_region_route(3, 3),
2208                    new_region_route(4, 4),
2209                ])
2210                .unwrap(),
2211        );
2212        assert!(table_metadata_manager
2213            .update_table_route(
2214                table_id,
2215                RegionInfo {
2216                    engine: engine.to_string(),
2217                    region_storage_path: region_storage_path.to_string(),
2218                    region_options: HashMap::new(),
2219                    region_wal_options: HashMap::new(),
2220                },
2221                &wrong_table_route_value,
2222                new_region_routes,
2223                &HashMap::new(),
2224                &HashMap::new(),
2225            )
2226            .await
2227            .is_err());
2228    }
2229
2230    #[tokio::test]
2231    async fn test_destroy_table_metadata() {
2232        let mem_kv = Arc::new(MemoryKvBackend::default());
2233        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2234        let table_id = 1025;
2235        let table_name = "foo";
2236        let task = test_create_table_task(table_name, table_id);
2237        let options = create_mock_region_wal_options();
2238        let serialized_options = options
2239            .iter()
2240            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2241            .collect::<HashMap<_, _>>();
2242        table_metadata_manager
2243            .create_table_metadata(
2244                task.table_info,
2245                TableRouteValue::physical(vec![
2246                    RegionRoute {
2247                        region: Region::new_test(RegionId::new(table_id, 1)),
2248                        leader_peer: Some(Peer::empty(1)),
2249                        follower_peers: vec![Peer::empty(5)],
2250                        leader_state: None,
2251                        leader_down_since: None,
2252                    },
2253                    RegionRoute {
2254                        region: Region::new_test(RegionId::new(table_id, 2)),
2255                        leader_peer: Some(Peer::empty(2)),
2256                        follower_peers: vec![Peer::empty(4)],
2257                        leader_state: None,
2258                        leader_down_since: None,
2259                    },
2260                    RegionRoute {
2261                        region: Region::new_test(RegionId::new(table_id, 3)),
2262                        leader_peer: Some(Peer::empty(3)),
2263                        follower_peers: vec![],
2264                        leader_state: None,
2265                        leader_down_since: None,
2266                    },
2267                ]),
2268                serialized_options,
2269            )
2270            .await
2271            .unwrap();
2272        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2273        let table_route_value = table_metadata_manager
2274            .table_route_manager
2275            .table_route_storage()
2276            .get_with_raw_bytes(table_id)
2277            .await
2278            .unwrap()
2279            .unwrap();
2280        table_metadata_manager
2281            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2282            .await
2283            .unwrap();
2284        assert!(mem_kv.is_empty());
2285    }
2286
2287    #[tokio::test]
2288    async fn test_restore_table_metadata() {
2289        let mem_kv = Arc::new(MemoryKvBackend::default());
2290        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2291        let table_id = 1025;
2292        let table_name = "foo";
2293        let task = test_create_table_task(table_name, table_id);
2294        let options = create_mock_region_wal_options();
2295        let serialized_options = options
2296            .iter()
2297            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2298            .collect::<HashMap<_, _>>();
2299        table_metadata_manager
2300            .create_table_metadata(
2301                task.table_info,
2302                TableRouteValue::physical(vec![
2303                    RegionRoute {
2304                        region: Region::new_test(RegionId::new(table_id, 1)),
2305                        leader_peer: Some(Peer::empty(1)),
2306                        follower_peers: vec![Peer::empty(5)],
2307                        leader_state: None,
2308                        leader_down_since: None,
2309                    },
2310                    RegionRoute {
2311                        region: Region::new_test(RegionId::new(table_id, 2)),
2312                        leader_peer: Some(Peer::empty(2)),
2313                        follower_peers: vec![Peer::empty(4)],
2314                        leader_state: None,
2315                        leader_down_since: None,
2316                    },
2317                    RegionRoute {
2318                        region: Region::new_test(RegionId::new(table_id, 3)),
2319                        leader_peer: Some(Peer::empty(3)),
2320                        follower_peers: vec![],
2321                        leader_state: None,
2322                        leader_down_since: None,
2323                    },
2324                ]),
2325                serialized_options,
2326            )
2327            .await
2328            .unwrap();
2329        let expected_result = mem_kv.dump();
2330        let table_route_value = table_metadata_manager
2331            .table_route_manager
2332            .table_route_storage()
2333            .get_with_raw_bytes(table_id)
2334            .await
2335            .unwrap()
2336            .unwrap();
2337        let region_routes = table_route_value.region_routes().unwrap();
2338        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2339        let table_route_value = TableRouteValue::physical(region_routes.clone());
2340        table_metadata_manager
2341            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2342            .await
2343            .unwrap();
2344        table_metadata_manager
2345            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2346            .await
2347            .unwrap();
2348        let kvs = mem_kv.dump();
2349        assert_eq!(kvs, expected_result);
2350        // Should be ignored.
2351        table_metadata_manager
2352            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2353            .await
2354            .unwrap();
2355        let kvs = mem_kv.dump();
2356        assert_eq!(kvs, expected_result);
2357    }
2358
2359    #[tokio::test]
2360    async fn test_create_update_view_info() {
2361        let mem_kv = Arc::new(MemoryKvBackend::default());
2362        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2363
2364        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2365
2366        let view_id = view_info.ident.table_id;
2367
2368        let logical_plan: Vec<u8> = vec![1, 2, 3];
2369        let columns = vec!["a".to_string()];
2370        let plan_columns = vec!["number".to_string()];
2371        let table_names = new_test_table_names();
2372        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2373
2374        // Create metadata
2375        table_metadata_manager
2376            .create_view_metadata(
2377                view_info.clone(),
2378                logical_plan.clone(),
2379                table_names.clone(),
2380                columns.clone(),
2381                plan_columns.clone(),
2382                definition.to_string(),
2383            )
2384            .await
2385            .unwrap();
2386
2387        {
2388            // assert view info
2389            let current_view_info = table_metadata_manager
2390                .view_info_manager()
2391                .get(view_id)
2392                .await
2393                .unwrap()
2394                .unwrap()
2395                .into_inner();
2396            assert_eq!(current_view_info.view_info, logical_plan);
2397            assert_eq!(current_view_info.table_names, table_names);
2398            assert_eq!(current_view_info.definition, definition);
2399            assert_eq!(current_view_info.columns, columns);
2400            assert_eq!(current_view_info.plan_columns, plan_columns);
2401            // assert table info
2402            let current_table_info = table_metadata_manager
2403                .table_info_manager()
2404                .get(view_id)
2405                .await
2406                .unwrap()
2407                .unwrap()
2408                .into_inner();
2409            assert_eq!(current_table_info.table_info, view_info);
2410        }
2411
2412        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2413        let new_table_names = {
2414            let mut set = HashSet::new();
2415            set.insert(TableName {
2416                catalog_name: "greptime".to_string(),
2417                schema_name: "public".to_string(),
2418                table_name: "b_table".to_string(),
2419            });
2420            set.insert(TableName {
2421                catalog_name: "greptime".to_string(),
2422                schema_name: "public".to_string(),
2423                table_name: "c_table".to_string(),
2424            });
2425            set
2426        };
2427        let new_columns = vec!["b".to_string()];
2428        let new_plan_columns = vec!["number2".to_string()];
2429        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2430
2431        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2432            logical_plan.clone(),
2433            table_names,
2434            columns,
2435            plan_columns,
2436            definition.to_string(),
2437        ));
2438        // should be ok.
2439        table_metadata_manager
2440            .update_view_info(
2441                view_id,
2442                &current_view_info_value,
2443                new_logical_plan.clone(),
2444                new_table_names.clone(),
2445                new_columns.clone(),
2446                new_plan_columns.clone(),
2447                new_definition.to_string(),
2448            )
2449            .await
2450            .unwrap();
2451        // if table info was updated, it should be ok.
2452        table_metadata_manager
2453            .update_view_info(
2454                view_id,
2455                &current_view_info_value,
2456                new_logical_plan.clone(),
2457                new_table_names.clone(),
2458                new_columns.clone(),
2459                new_plan_columns.clone(),
2460                new_definition.to_string(),
2461            )
2462            .await
2463            .unwrap();
2464
2465        // updated view_info should equal the `new_logical_plan`
2466        let updated_view_info = table_metadata_manager
2467            .view_info_manager()
2468            .get(view_id)
2469            .await
2470            .unwrap()
2471            .unwrap()
2472            .into_inner();
2473        assert_eq!(updated_view_info.view_info, new_logical_plan);
2474        assert_eq!(updated_view_info.table_names, new_table_names);
2475        assert_eq!(updated_view_info.definition, new_definition);
2476        assert_eq!(updated_view_info.columns, new_columns);
2477        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2478
2479        let wrong_view_info = logical_plan.clone();
2480        let wrong_definition = "wrong_definition";
2481        let wrong_view_info_value =
2482            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2483                wrong_view_info,
2484                new_table_names.clone(),
2485                new_columns.clone(),
2486                new_plan_columns.clone(),
2487                wrong_definition.to_string(),
2488            ));
2489        // if the current_view_info_value is wrong, it should return an error.
2490        // The ABA problem.
2491        assert!(table_metadata_manager
2492            .update_view_info(
2493                view_id,
2494                &wrong_view_info_value,
2495                new_logical_plan.clone(),
2496                new_table_names.clone(),
2497                vec!["c".to_string()],
2498                vec!["number3".to_string()],
2499                wrong_definition.to_string(),
2500            )
2501            .await
2502            .is_err());
2503
2504        // The view_info is not changed.
2505        let current_view_info = table_metadata_manager
2506            .view_info_manager()
2507            .get(view_id)
2508            .await
2509            .unwrap()
2510            .unwrap()
2511            .into_inner();
2512        assert_eq!(current_view_info.view_info, new_logical_plan);
2513        assert_eq!(current_view_info.table_names, new_table_names);
2514        assert_eq!(current_view_info.definition, new_definition);
2515        assert_eq!(current_view_info.columns, new_columns);
2516        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2517    }
2518
2519    #[test]
2520    fn test_region_role_set_deserialize() {
2521        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2522        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2523        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2524        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2525
2526        let s = r#"[1, 2, 3]"#;
2527        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2528        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2529        assert!(region_role_set.follower_regions.is_empty());
2530    }
2531
2532    #[test]
2533    fn test_region_distribution_deserialize() {
2534        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2535        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2536        assert_eq!(region_distribution.len(), 2);
2537        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2538        assert!(region_distribution[&1].follower_regions.is_empty());
2539        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2540        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2541    }
2542}