common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_base::regex_pattern::NAME_PATTERN;
125use common_catalog::consts::{
126    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
127};
128use common_telemetry::warn;
129use common_wal::options::WalOptions;
130use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
131use flow::flow_route::FlowRouteValue;
132use flow::table_flow::TableFlowValue;
133use lazy_static::lazy_static;
134use regex::Regex;
135pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
136use serde::de::DeserializeOwned;
137use serde::{Deserialize, Serialize};
138use snafu::{OptionExt, ResultExt, ensure};
139use store_api::storage::RegionNumber;
140use table::metadata::{RawTableInfo, TableId};
141use table::table_name::TableName;
142use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
143use table_name::{TableNameKey, TableNameManager, TableNameValue};
144use topic_name::TopicNameManager;
145use topic_region::{TopicRegionKey, TopicRegionManager};
146use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
147
148use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
149use self::datanode_table::RegionInfo;
150use self::flow::flow_info::FlowInfoValue;
151use self::flow::flow_name::FlowNameValue;
152use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
153use self::table_route::{TableRouteManager, TableRouteValue};
154use self::tombstone::TombstoneManager;
155use crate::DatanodeId;
156use crate::error::{self, Result, SerdeJsonSnafu};
157use crate::key::flow::flow_state::FlowStateValue;
158use crate::key::node_address::NodeAddressValue;
159use crate::key::table_route::TableRouteKey;
160use crate::key::topic_region::TopicRegionValue;
161use crate::key::txn_helper::TxnOpGetResponseSet;
162use crate::kv_backend::KvBackendRef;
163use crate::kv_backend::txn::{Txn, TxnOp};
164use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
165use crate::rpc::store::BatchDeleteRequest;
166use crate::state_store::PoisonValue;
167
168pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
169pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
170pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
171pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
172pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
173
174pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
175pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
176pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
177pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
178pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
179pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
180pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
181pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
182pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
183// The legacy topic key prefix is used to store the topic name in previous versions.
184pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
185pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
186
187/// The election key.
188pub const ELECTION_KEY: &str = "__metasrv_election";
189/// The root key of metasrv election candidates.
190pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
191
192/// The keys with these prefixes will be loaded into the cache when the leader starts.
193pub const CACHE_KEY_PREFIXES: [&str; 5] = [
194    TABLE_NAME_KEY_PREFIX,
195    CATALOG_NAME_KEY_PREFIX,
196    SCHEMA_NAME_KEY_PREFIX,
197    TABLE_ROUTE_PREFIX,
198    NODE_ADDRESS_PREFIX,
199];
200
201/// A set of regions with the same role.
202#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
203pub struct RegionRoleSet {
204    /// Leader regions.
205    pub leader_regions: Vec<RegionNumber>,
206    /// Follower regions.
207    pub follower_regions: Vec<RegionNumber>,
208}
209
210impl<'de> Deserialize<'de> for RegionRoleSet {
211    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
212    where
213        D: serde::Deserializer<'de>,
214    {
215        #[derive(Deserialize)]
216        #[serde(untagged)]
217        enum RegionRoleSetOrLeaderOnly {
218            Full {
219                leader_regions: Vec<RegionNumber>,
220                follower_regions: Vec<RegionNumber>,
221            },
222            LeaderOnly(Vec<RegionNumber>),
223        }
224        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
225            RegionRoleSetOrLeaderOnly::Full {
226                leader_regions,
227                follower_regions,
228            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
229            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
230                Ok(RegionRoleSet::new(leader_regions, vec![]))
231            }
232        }
233    }
234}
235
236impl RegionRoleSet {
237    /// Create a new region role set.
238    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
239        Self {
240            leader_regions,
241            follower_regions,
242        }
243    }
244
245    /// Add a leader region to the set.
246    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
247        self.leader_regions.push(region_number);
248    }
249
250    /// Add a follower region to the set.
251    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
252        self.follower_regions.push(region_number);
253    }
254
255    /// Sort the regions.
256    pub fn sort(&mut self) {
257        self.follower_regions.sort();
258        self.leader_regions.sort();
259    }
260}
261
262/// The distribution of regions.
263///
264/// The key is the datanode id, the value is the region role set.
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
266
267/// The id of flow.
268pub type FlowId = u32;
269/// The partition of flow.
270pub type FlowPartitionId = u32;
271
272lazy_static! {
273    pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
274}
275
276lazy_static! {
277    static ref TABLE_INFO_KEY_PATTERN: Regex =
278        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
279}
280
281lazy_static! {
282    static ref VIEW_INFO_KEY_PATTERN: Regex =
283        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
288        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
289}
290
291lazy_static! {
292    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
293        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
294}
295
296lazy_static! {
297    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
298        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
299    ))
300    .unwrap();
301}
302
303lazy_static! {
304    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
305    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
307    ))
308        .unwrap();
309}
310
311lazy_static! {
312    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
313    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
314        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
315    ))
316        .unwrap();
317}
318
319lazy_static! {
320    static ref NODE_ADDRESS_PATTERN: Regex =
321        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
322}
323
324lazy_static! {
325    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
326        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
327}
328
329lazy_static! {
330    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
331        "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
332    ))
333    .unwrap();
334}
335
336/// The key of metadata.
337pub trait MetadataKey<'a, T> {
338    fn to_bytes(&self) -> Vec<u8>;
339
340    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
341}
342
343#[derive(Debug, Clone, PartialEq)]
344pub struct BytesAdapter(Vec<u8>);
345
346impl From<Vec<u8>> for BytesAdapter {
347    fn from(value: Vec<u8>) -> Self {
348        Self(value)
349    }
350}
351
352impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
353    fn to_bytes(&self) -> Vec<u8> {
354        self.0.clone()
355    }
356
357    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
358        Ok(BytesAdapter(bytes.to_vec()))
359    }
360}
361
362pub(crate) trait MetadataKeyGetTxnOp {
363    fn build_get_op(
364        &self,
365    ) -> (
366        TxnOp,
367        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
368    );
369}
370
371pub trait MetadataValue {
372    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
373    where
374        Self: Sized;
375
376    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
377}
378
379pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
380
381pub struct TableMetadataManager {
382    table_name_manager: TableNameManager,
383    table_info_manager: TableInfoManager,
384    view_info_manager: ViewInfoManager,
385    datanode_table_manager: DatanodeTableManager,
386    catalog_manager: CatalogManager,
387    schema_manager: SchemaManager,
388    table_route_manager: TableRouteManager,
389    tombstone_manager: TombstoneManager,
390    topic_name_manager: TopicNameManager,
391    topic_region_manager: TopicRegionManager,
392    kv_backend: KvBackendRef,
393}
394
395#[macro_export]
396macro_rules! ensure_values {
397    ($got:expr, $expected_value:expr, $name:expr) => {
398        ensure!(
399            $got == $expected_value,
400            error::UnexpectedSnafu {
401                err_msg: format!(
402                    "Reads the different value: {:?} during {}, expected: {:?}",
403                    $got, $name, $expected_value
404                )
405            }
406        );
407    };
408}
409
410/// A struct containing a deserialized value(`inner`) and an original bytes.
411///
412/// - Serialize behaviors:
413///
414/// The `inner` field will be ignored.
415///
416/// - Deserialize behaviors:
417///
418/// The `inner` field will be deserialized from the `bytes` field.
419pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
420    // The original bytes of the inner.
421    bytes: Bytes,
422    // The value was deserialized from the original bytes.
423    inner: T,
424}
425
426impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
427    type Target = T;
428
429    fn deref(&self) -> &Self::Target {
430        &self.inner
431    }
432}
433
434impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
435    fn deref_mut(&mut self) -> &mut Self::Target {
436        &mut self.inner
437    }
438}
439
440impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
441    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442        write!(
443            f,
444            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
445            self.inner, self.bytes
446        )
447    }
448}
449
450impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
451    /// - Serialize behaviors:
452    ///
453    /// The `inner` field will be ignored.
454    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
455    where
456        S: serde::Serializer,
457    {
458        // Safety: The original bytes are always JSON encoded.
459        // It's more efficiently than `serialize_bytes`.
460        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
461    }
462}
463
464impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
465    for DeserializedValueWithBytes<T>
466{
467    /// - Deserialize behaviors:
468    ///
469    /// The `inner` field will be deserialized from the `bytes` field.
470    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
471    where
472        D: serde::Deserializer<'de>,
473    {
474        let buf = String::deserialize(deserializer)?;
475        let bytes = Bytes::from(buf);
476
477        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
478            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480        Ok(value)
481    }
482}
483
484impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
485    fn clone(&self) -> Self {
486        Self {
487            bytes: self.bytes.clone(),
488            inner: self.inner.clone(),
489        }
490    }
491}
492
493impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
494    /// Returns a struct containing a deserialized value and an original `bytes`.
495    /// It accepts original bytes of inner.
496    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
497        let inner = T::try_from_raw_value(&bytes)?;
498        Ok(Self { bytes, inner })
499    }
500
501    /// Returns a struct containing a deserialized value and an original `bytes`.
502    /// It accepts original bytes of inner.
503    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
504        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
505    }
506
507    pub fn into_inner(self) -> T {
508        self.inner
509    }
510
511    pub fn get_inner_ref(&self) -> &T {
512        &self.inner
513    }
514
515    /// Returns original `bytes`
516    pub fn get_raw_bytes(&self) -> Vec<u8> {
517        self.bytes.to_vec()
518    }
519
520    #[cfg(any(test, feature = "testing"))]
521    pub fn from_inner(inner: T) -> Self {
522        let bytes = serde_json::to_vec(&inner).unwrap();
523
524        Self {
525            bytes: Bytes::from(bytes),
526            inner,
527        }
528    }
529}
530
531impl TableMetadataManager {
532    pub fn new(kv_backend: KvBackendRef) -> Self {
533        TableMetadataManager {
534            table_name_manager: TableNameManager::new(kv_backend.clone()),
535            table_info_manager: TableInfoManager::new(kv_backend.clone()),
536            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
537            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
538            catalog_manager: CatalogManager::new(kv_backend.clone()),
539            schema_manager: SchemaManager::new(kv_backend.clone()),
540            table_route_manager: TableRouteManager::new(kv_backend.clone()),
541            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
542            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
543            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
544            kv_backend,
545        }
546    }
547
548    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
549    pub fn new_with_custom_tombstone_prefix(
550        kv_backend: KvBackendRef,
551        tombstone_prefix: &str,
552    ) -> Self {
553        Self {
554            table_name_manager: TableNameManager::new(kv_backend.clone()),
555            table_info_manager: TableInfoManager::new(kv_backend.clone()),
556            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
557            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
558            catalog_manager: CatalogManager::new(kv_backend.clone()),
559            schema_manager: SchemaManager::new(kv_backend.clone()),
560            table_route_manager: TableRouteManager::new(kv_backend.clone()),
561            tombstone_manager: TombstoneManager::new_with_prefix(
562                kv_backend.clone(),
563                tombstone_prefix,
564            ),
565            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
566            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
567            kv_backend,
568        }
569    }
570
571    pub async fn init(&self) -> Result<()> {
572        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
573
574        self.catalog_manager().create(catalog_name, true).await?;
575
576        let internal_schemas = [
577            DEFAULT_SCHEMA_NAME,
578            INFORMATION_SCHEMA_NAME,
579            DEFAULT_PRIVATE_SCHEMA_NAME,
580        ];
581
582        for schema_name in internal_schemas {
583            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
584
585            self.schema_manager().create(schema_key, None, true).await?;
586        }
587
588        Ok(())
589    }
590
591    pub fn table_name_manager(&self) -> &TableNameManager {
592        &self.table_name_manager
593    }
594
595    pub fn table_info_manager(&self) -> &TableInfoManager {
596        &self.table_info_manager
597    }
598
599    pub fn view_info_manager(&self) -> &ViewInfoManager {
600        &self.view_info_manager
601    }
602
603    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
604        &self.datanode_table_manager
605    }
606
607    pub fn catalog_manager(&self) -> &CatalogManager {
608        &self.catalog_manager
609    }
610
611    pub fn schema_manager(&self) -> &SchemaManager {
612        &self.schema_manager
613    }
614
615    pub fn table_route_manager(&self) -> &TableRouteManager {
616        &self.table_route_manager
617    }
618
619    pub fn topic_name_manager(&self) -> &TopicNameManager {
620        &self.topic_name_manager
621    }
622
623    pub fn topic_region_manager(&self) -> &TopicRegionManager {
624        &self.topic_region_manager
625    }
626
627    pub fn kv_backend(&self) -> &KvBackendRef {
628        &self.kv_backend
629    }
630
631    pub async fn get_full_table_info(
632        &self,
633        table_id: TableId,
634    ) -> Result<(
635        Option<DeserializedValueWithBytes<TableInfoValue>>,
636        Option<DeserializedValueWithBytes<TableRouteValue>>,
637    )> {
638        let table_info_key = TableInfoKey::new(table_id);
639        let table_route_key = TableRouteKey::new(table_id);
640        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
641        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
642
643        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
644        let mut res = self.kv_backend.txn(txn).await?;
645        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
646        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
647        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
648        Ok((table_info_value, table_route_value))
649    }
650
651    /// Creates metadata for view and returns an error if different metadata exists.
652    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
653    /// Parameters include:
654    /// - `view_info`: the encoded logical plan
655    /// - `table_names`: the resolved fully table names in logical plan
656    /// - `columns`: the view columns
657    /// - `plan_columns`: the original plan columns
658    /// - `definition`: The SQL to create the view
659    ///
660    pub async fn create_view_metadata(
661        &self,
662        view_info: RawTableInfo,
663        raw_logical_plan: Vec<u8>,
664        table_names: HashSet<TableName>,
665        columns: Vec<String>,
666        plan_columns: Vec<String>,
667        definition: String,
668    ) -> Result<()> {
669        let view_id = view_info.ident.table_id;
670
671        // Creates view name.
672        let view_name = TableNameKey::new(
673            &view_info.catalog_name,
674            &view_info.schema_name,
675            &view_info.name,
676        );
677        let create_table_name_txn = self
678            .table_name_manager()
679            .build_create_txn(&view_name, view_id)?;
680
681        // Creates table info.
682        let table_info_value = TableInfoValue::new(view_info);
683
684        let (create_table_info_txn, on_create_table_info_failure) = self
685            .table_info_manager()
686            .build_create_txn(view_id, &table_info_value)?;
687
688        // Creates view info
689        let view_info_value = ViewInfoValue::new(
690            raw_logical_plan,
691            table_names,
692            columns,
693            plan_columns,
694            definition,
695        );
696        let (create_view_info_txn, on_create_view_info_failure) = self
697            .view_info_manager()
698            .build_create_txn(view_id, &view_info_value)?;
699
700        let txn = Txn::merge_all(vec![
701            create_table_name_txn,
702            create_table_info_txn,
703            create_view_info_txn,
704        ]);
705
706        let mut r = self.kv_backend.txn(txn).await?;
707
708        // Checks whether metadata was already created.
709        if !r.succeeded {
710            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
711            let remote_table_info = on_create_table_info_failure(&mut set)?
712                .context(error::UnexpectedSnafu {
713                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
714                })?
715                .into_inner();
716
717            let remote_view_info = on_create_view_info_failure(&mut set)?
718                .context(error::UnexpectedSnafu {
719                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
720                })?
721                .into_inner();
722
723            let op_name = "the creating view metadata";
724            ensure_values!(remote_table_info, table_info_value, op_name);
725            ensure_values!(remote_view_info, view_info_value, op_name);
726        }
727
728        Ok(())
729    }
730
731    /// Creates metadata for table and returns an error if different metadata exists.
732    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
733    pub async fn create_table_metadata(
734        &self,
735        mut table_info: RawTableInfo,
736        table_route_value: TableRouteValue,
737        region_wal_options: HashMap<RegionNumber, String>,
738    ) -> Result<()> {
739        let region_numbers = table_route_value.region_numbers();
740        table_info.meta.region_numbers = region_numbers;
741        let table_id = table_info.ident.table_id;
742        let engine = table_info.meta.engine.clone();
743
744        // Creates table name.
745        let table_name = TableNameKey::new(
746            &table_info.catalog_name,
747            &table_info.schema_name,
748            &table_info.name,
749        );
750        let create_table_name_txn = self
751            .table_name_manager()
752            .build_create_txn(&table_name, table_id)?;
753
754        let region_options = table_info.to_region_options();
755        // Creates table info.
756        let table_info_value = TableInfoValue::new(table_info);
757        let (create_table_info_txn, on_create_table_info_failure) = self
758            .table_info_manager()
759            .build_create_txn(table_id, &table_info_value)?;
760
761        let (create_table_route_txn, on_create_table_route_failure) = self
762            .table_route_manager()
763            .table_route_storage()
764            .build_create_txn(table_id, &table_route_value)?;
765
766        let create_topic_region_txn = self
767            .topic_region_manager
768            .build_create_txn(table_id, &region_wal_options)?;
769
770        let mut txn = Txn::merge_all(vec![
771            create_table_name_txn,
772            create_table_info_txn,
773            create_table_route_txn,
774            create_topic_region_txn,
775        ]);
776
777        if let TableRouteValue::Physical(x) = &table_route_value {
778            let region_storage_path = table_info_value.region_storage_path();
779            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
780                table_id,
781                &engine,
782                &region_storage_path,
783                region_options,
784                region_wal_options,
785                region_distribution(&x.region_routes),
786            )?;
787            txn = txn.merge(create_datanode_table_txn);
788        }
789
790        let mut r = self.kv_backend.txn(txn).await?;
791
792        // Checks whether metadata was already created.
793        if !r.succeeded {
794            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
795            let remote_table_info = on_create_table_info_failure(&mut set)?
796                .context(error::UnexpectedSnafu {
797                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
798                })?
799                .into_inner();
800
801            let remote_table_route = on_create_table_route_failure(&mut set)?
802                .context(error::UnexpectedSnafu {
803                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
804                })?
805                .into_inner();
806
807            let op_name = "the creating table metadata";
808            ensure_values!(remote_table_info, table_info_value, op_name);
809            ensure_values!(remote_table_route, table_route_value, op_name);
810        }
811
812        Ok(())
813    }
814
815    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
816        // The batch size is max_txn_size / 3 because the size of the `tables_data`
817        // is 3 times the size of the `tables_data`.
818        self.kv_backend.max_txn_ops() / 3
819    }
820
821    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
822    pub async fn create_logical_tables_metadata(
823        &self,
824        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
825    ) -> Result<()> {
826        let len = tables_data.len();
827        let mut txns = Vec::with_capacity(3 * len);
828        struct OnFailure<F1, R1, F2, R2>
829        where
830            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
831            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
832        {
833            table_info_value: TableInfoValue,
834            on_create_table_info_failure: F1,
835            table_route_value: TableRouteValue,
836            on_create_table_route_failure: F2,
837        }
838        let mut on_failures = Vec::with_capacity(len);
839        for (mut table_info, table_route_value) in tables_data {
840            table_info.meta.region_numbers = table_route_value.region_numbers();
841            let table_id = table_info.ident.table_id;
842
843            // Creates table name.
844            let table_name = TableNameKey::new(
845                &table_info.catalog_name,
846                &table_info.schema_name,
847                &table_info.name,
848            );
849            let create_table_name_txn = self
850                .table_name_manager()
851                .build_create_txn(&table_name, table_id)?;
852            txns.push(create_table_name_txn);
853
854            // Creates table info.
855            let table_info_value = TableInfoValue::new(table_info);
856            let (create_table_info_txn, on_create_table_info_failure) =
857                self.table_info_manager()
858                    .build_create_txn(table_id, &table_info_value)?;
859            txns.push(create_table_info_txn);
860
861            let (create_table_route_txn, on_create_table_route_failure) = self
862                .table_route_manager()
863                .table_route_storage()
864                .build_create_txn(table_id, &table_route_value)?;
865            txns.push(create_table_route_txn);
866
867            on_failures.push(OnFailure {
868                table_info_value,
869                on_create_table_info_failure,
870                table_route_value,
871                on_create_table_route_failure,
872            });
873        }
874
875        let txn = Txn::merge_all(txns);
876        let mut r = self.kv_backend.txn(txn).await?;
877
878        // Checks whether metadata was already created.
879        if !r.succeeded {
880            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
881            for on_failure in on_failures {
882                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
883                    .context(error::UnexpectedSnafu {
884                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
885                    })?
886                    .into_inner();
887
888                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
889                    .context(error::UnexpectedSnafu {
890                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
891                    })?
892                    .into_inner();
893
894                let op_name = "the creating logical tables metadata";
895                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
896                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
897            }
898        }
899
900        Ok(())
901    }
902
903    fn table_metadata_keys(
904        &self,
905        table_id: TableId,
906        table_name: &TableName,
907        table_route_value: &TableRouteValue,
908        region_wal_options: &HashMap<RegionNumber, WalOptions>,
909    ) -> Result<Vec<Vec<u8>>> {
910        // Builds keys
911        let datanode_ids = if table_route_value.is_physical() {
912            region_distribution(table_route_value.region_routes()?)
913                .into_keys()
914                .collect()
915        } else {
916            vec![]
917        };
918        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
919        let table_name = TableNameKey::new(
920            &table_name.catalog_name,
921            &table_name.schema_name,
922            &table_name.table_name,
923        );
924        let table_info_key = TableInfoKey::new(table_id);
925        let table_route_key = TableRouteKey::new(table_id);
926        let datanode_table_keys = datanode_ids
927            .into_iter()
928            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
929            .collect::<HashSet<_>>();
930        let topic_region_map = self
931            .topic_region_manager
932            .get_topic_region_mapping(table_id, region_wal_options);
933        let topic_region_keys = topic_region_map
934            .iter()
935            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
936            .collect::<Vec<_>>();
937        keys.push(table_name.to_bytes());
938        keys.push(table_info_key.to_bytes());
939        keys.push(table_route_key.to_bytes());
940        for key in &datanode_table_keys {
941            keys.push(key.to_bytes());
942        }
943        for key in topic_region_keys {
944            keys.push(key.to_bytes());
945        }
946        Ok(keys)
947    }
948
949    /// Deletes metadata for table **logically**.
950    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
951    pub async fn delete_table_metadata(
952        &self,
953        table_id: TableId,
954        table_name: &TableName,
955        table_route_value: &TableRouteValue,
956        region_wal_options: &HashMap<RegionNumber, WalOptions>,
957    ) -> Result<()> {
958        let keys =
959            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
960        self.tombstone_manager.create(keys).await.map(|_| ())
961    }
962
963    /// Deletes metadata tombstone for table **permanently**.
964    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
965    pub async fn delete_table_metadata_tombstone(
966        &self,
967        table_id: TableId,
968        table_name: &TableName,
969        table_route_value: &TableRouteValue,
970        region_wal_options: &HashMap<RegionNumber, WalOptions>,
971    ) -> Result<()> {
972        let table_metadata_keys =
973            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974        self.tombstone_manager
975            .delete(table_metadata_keys)
976            .await
977            .map(|_| ())
978    }
979
980    /// Restores metadata for table.
981    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
982    pub async fn restore_table_metadata(
983        &self,
984        table_id: TableId,
985        table_name: &TableName,
986        table_route_value: &TableRouteValue,
987        region_wal_options: &HashMap<RegionNumber, WalOptions>,
988    ) -> Result<()> {
989        let keys =
990            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
991        self.tombstone_manager.restore(keys).await.map(|_| ())
992    }
993
994    /// Deletes metadata for table **permanently**.
995    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
996    pub async fn destroy_table_metadata(
997        &self,
998        table_id: TableId,
999        table_name: &TableName,
1000        table_route_value: &TableRouteValue,
1001        region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002    ) -> Result<()> {
1003        let keys =
1004            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005        let _ = self
1006            .kv_backend
1007            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1008            .await?;
1009        Ok(())
1010    }
1011
1012    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1013        let mut keys = Vec::with_capacity(3);
1014        let view_name = TableNameKey::new(
1015            &view_name.catalog_name,
1016            &view_name.schema_name,
1017            &view_name.table_name,
1018        );
1019        let table_info_key = TableInfoKey::new(view_id);
1020        let view_info_key = ViewInfoKey::new(view_id);
1021        keys.push(view_name.to_bytes());
1022        keys.push(table_info_key.to_bytes());
1023        keys.push(view_info_key.to_bytes());
1024
1025        Ok(keys)
1026    }
1027
1028    /// Deletes metadata for view **permanently**.
1029    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1030    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1031        let keys = self.view_info_keys(view_id, view_name)?;
1032        let _ = self
1033            .kv_backend
1034            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1035            .await?;
1036        Ok(())
1037    }
1038
1039    /// Renames the table name and returns an error if different metadata exists.
1040    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1041    /// and the new `TableNameKey` MUST be empty.
1042    pub async fn rename_table(
1043        &self,
1044        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1045        new_table_name: String,
1046    ) -> Result<()> {
1047        let current_table_info = &current_table_info_value.table_info;
1048        let table_id = current_table_info.ident.table_id;
1049
1050        let table_name_key = TableNameKey::new(
1051            &current_table_info.catalog_name,
1052            &current_table_info.schema_name,
1053            &current_table_info.name,
1054        );
1055
1056        let new_table_name_key = TableNameKey::new(
1057            &current_table_info.catalog_name,
1058            &current_table_info.schema_name,
1059            &new_table_name,
1060        );
1061
1062        // Updates table name.
1063        let update_table_name_txn = self.table_name_manager().build_update_txn(
1064            &table_name_key,
1065            &new_table_name_key,
1066            table_id,
1067        )?;
1068
1069        let new_table_info_value = current_table_info_value
1070            .inner
1071            .with_update(move |table_info| {
1072                table_info.name = new_table_name;
1073            });
1074
1075        // Updates table info.
1076        let (update_table_info_txn, on_update_table_info_failure) = self
1077            .table_info_manager()
1078            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1079
1080        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1081
1082        let mut r = self.kv_backend.txn(txn).await?;
1083
1084        // Checks whether metadata was already updated.
1085        if !r.succeeded {
1086            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1087            let remote_table_info = on_update_table_info_failure(&mut set)?
1088                .context(error::UnexpectedSnafu {
1089                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1090                })?
1091                .into_inner();
1092
1093            let op_name = "the renaming table metadata";
1094            ensure_values!(remote_table_info, new_table_info_value, op_name);
1095        }
1096
1097        Ok(())
1098    }
1099
1100    /// Updates table info and returns an error if different metadata exists.
1101    /// And cascade-ly update all redundant table options for each region
1102    /// if region_distribution is present.
1103    pub async fn update_table_info(
1104        &self,
1105        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1106        region_distribution: Option<RegionDistribution>,
1107        new_table_info: RawTableInfo,
1108    ) -> Result<()> {
1109        let table_id = current_table_info_value.table_info.ident.table_id;
1110        let new_table_info_value = current_table_info_value.update(new_table_info);
1111
1112        // Updates table info.
1113        let (update_table_info_txn, on_update_table_info_failure) = self
1114            .table_info_manager()
1115            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1116
1117        let txn = if let Some(region_distribution) = region_distribution {
1118            // region options induced from table info.
1119            let new_region_options = new_table_info_value.table_info.to_region_options();
1120            let update_datanode_table_options_txn = self
1121                .datanode_table_manager
1122                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1123                .await?;
1124            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1125        } else {
1126            update_table_info_txn
1127        };
1128
1129        let mut r = self.kv_backend.txn(txn).await?;
1130        // Checks whether metadata was already updated.
1131        if !r.succeeded {
1132            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1133            let remote_table_info = on_update_table_info_failure(&mut set)?
1134                .context(error::UnexpectedSnafu {
1135                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1136                })?
1137                .into_inner();
1138
1139            let op_name = "the updating table info";
1140            ensure_values!(remote_table_info, new_table_info_value, op_name);
1141        }
1142        Ok(())
1143    }
1144
1145    /// Updates view info and returns an error if different metadata exists.
1146    /// Parameters include:
1147    /// - `view_id`: the view id
1148    /// - `current_view_info_value`: the current view info for CAS checking
1149    /// - `new_view_info`: the encoded logical plan
1150    /// - `table_names`: the resolved fully table names in logical plan
1151    /// - `columns`: the view columns
1152    /// - `plan_columns`: the original plan columns
1153    /// - `definition`: The SQL to create the view
1154    ///
1155    #[allow(clippy::too_many_arguments)]
1156    pub async fn update_view_info(
1157        &self,
1158        view_id: TableId,
1159        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1160        new_view_info: Vec<u8>,
1161        table_names: HashSet<TableName>,
1162        columns: Vec<String>,
1163        plan_columns: Vec<String>,
1164        definition: String,
1165    ) -> Result<()> {
1166        let new_view_info_value = current_view_info_value.update(
1167            new_view_info,
1168            table_names,
1169            columns,
1170            plan_columns,
1171            definition,
1172        );
1173
1174        // Updates view info.
1175        let (update_view_info_txn, on_update_view_info_failure) = self
1176            .view_info_manager()
1177            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1178
1179        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1180
1181        // Checks whether metadata was already updated.
1182        if !r.succeeded {
1183            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1184            let remote_view_info = on_update_view_info_failure(&mut set)?
1185                .context(error::UnexpectedSnafu {
1186                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1187                })?
1188                .into_inner();
1189
1190            let op_name = "the updating view info";
1191            ensure_values!(remote_view_info, new_view_info_value, op_name);
1192        }
1193        Ok(())
1194    }
1195
1196    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1197        self.kv_backend.max_txn_ops()
1198    }
1199
1200    pub async fn batch_update_table_info_values(
1201        &self,
1202        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1203    ) -> Result<()> {
1204        let len = table_info_value_pairs.len();
1205        let mut txns = Vec::with_capacity(len);
1206        struct OnFailure<F, R>
1207        where
1208            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1209        {
1210            table_info_value: TableInfoValue,
1211            on_update_table_info_failure: F,
1212        }
1213        let mut on_failures = Vec::with_capacity(len);
1214
1215        for (table_info_value, new_table_info) in table_info_value_pairs {
1216            let table_id = table_info_value.table_info.ident.table_id;
1217
1218            let new_table_info_value = table_info_value.update(new_table_info);
1219
1220            let (update_table_info_txn, on_update_table_info_failure) =
1221                self.table_info_manager().build_update_txn(
1222                    table_id,
1223                    &table_info_value,
1224                    &new_table_info_value,
1225                )?;
1226
1227            txns.push(update_table_info_txn);
1228
1229            on_failures.push(OnFailure {
1230                table_info_value: new_table_info_value,
1231                on_update_table_info_failure,
1232            });
1233        }
1234
1235        let txn = Txn::merge_all(txns);
1236        let mut r = self.kv_backend.txn(txn).await?;
1237
1238        if !r.succeeded {
1239            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1240            for on_failure in on_failures {
1241                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1242                    .context(error::UnexpectedSnafu {
1243                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1244                    })?
1245                    .into_inner();
1246
1247                let op_name = "the batch updating table info";
1248                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1249            }
1250        }
1251
1252        Ok(())
1253    }
1254
1255    pub async fn update_table_route(
1256        &self,
1257        table_id: TableId,
1258        region_info: RegionInfo,
1259        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1260        new_region_routes: Vec<RegionRoute>,
1261        new_region_options: &HashMap<String, String>,
1262        new_region_wal_options: &HashMap<RegionNumber, String>,
1263    ) -> Result<()> {
1264        // Updates the datanode table key value pairs.
1265        let current_region_distribution =
1266            region_distribution(current_table_route_value.region_routes()?);
1267        let new_region_distribution = region_distribution(&new_region_routes);
1268
1269        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1270            table_id,
1271            region_info,
1272            current_region_distribution,
1273            new_region_distribution,
1274            new_region_options,
1275            new_region_wal_options,
1276        )?;
1277
1278        // Updates the table_route.
1279        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1280
1281        let (update_table_route_txn, on_update_table_route_failure) = self
1282            .table_route_manager()
1283            .table_route_storage()
1284            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1285
1286        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1287
1288        let mut r = self.kv_backend.txn(txn).await?;
1289
1290        // Checks whether metadata was already updated.
1291        if !r.succeeded {
1292            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1293            let remote_table_route = on_update_table_route_failure(&mut set)?
1294                .context(error::UnexpectedSnafu {
1295                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1296                })?
1297                .into_inner();
1298
1299            let op_name = "the updating table route";
1300            ensure_values!(remote_table_route, new_table_route_value, op_name);
1301        }
1302
1303        Ok(())
1304    }
1305
1306    /// Updates the leader status of the [RegionRoute].
1307    pub async fn update_leader_region_status<F>(
1308        &self,
1309        table_id: TableId,
1310        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1311        next_region_route_status: F,
1312    ) -> Result<()>
1313    where
1314        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1315    {
1316        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1317
1318        let mut updated = 0;
1319        for route in &mut new_region_routes {
1320            if let Some(state) = next_region_route_status(route)
1321                && route.set_leader_state(state)
1322            {
1323                updated += 1;
1324            }
1325        }
1326
1327        if updated == 0 {
1328            warn!("No leader status updated");
1329            return Ok(());
1330        }
1331
1332        // Updates the table_route.
1333        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1334
1335        let (update_table_route_txn, on_update_table_route_failure) = self
1336            .table_route_manager()
1337            .table_route_storage()
1338            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1339
1340        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1341
1342        // Checks whether metadata was already updated.
1343        if !r.succeeded {
1344            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1345            let remote_table_route = on_update_table_route_failure(&mut set)?
1346                .context(error::UnexpectedSnafu {
1347                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1348                })?
1349                .into_inner();
1350
1351            let op_name = "the updating leader region status";
1352            ensure_values!(remote_table_route, new_table_route_value, op_name);
1353        }
1354
1355        Ok(())
1356    }
1357}
1358
1359#[macro_export]
1360macro_rules! impl_metadata_value {
1361    ($($val_ty: ty), *) => {
1362        $(
1363            impl $crate::key::MetadataValue for $val_ty {
1364                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1365                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1366                }
1367
1368                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1369                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1370                }
1371            }
1372        )*
1373    }
1374}
1375
1376macro_rules! impl_metadata_key_get_txn_op {
1377    ($($key: ty), *) => {
1378        $(
1379            impl $crate::key::MetadataKeyGetTxnOp for $key {
1380                /// Returns a [TxnOp] to retrieve the corresponding value
1381                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1382                fn build_get_op(
1383                    &self,
1384                ) -> (
1385                    TxnOp,
1386                    impl for<'a> FnMut(
1387                        &'a mut TxnOpGetResponseSet,
1388                    ) -> Option<Vec<u8>>,
1389                ) {
1390                    let raw_key = self.to_bytes();
1391                    (
1392                        TxnOp::Get(raw_key.clone()),
1393                        TxnOpGetResponseSet::filter(raw_key),
1394                    )
1395                }
1396            }
1397        )*
1398    }
1399}
1400
1401impl_metadata_key_get_txn_op! {
1402    TableNameKey<'_>,
1403    TableInfoKey,
1404    ViewInfoKey,
1405    TableRouteKey,
1406    DatanodeTableKey
1407}
1408
1409#[macro_export]
1410macro_rules! impl_optional_metadata_value {
1411    ($($val_ty: ty), *) => {
1412        $(
1413            impl $val_ty {
1414                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1415                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1416                }
1417
1418                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1419                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1420                }
1421            }
1422        )*
1423    }
1424}
1425
1426impl_metadata_value! {
1427    TableNameValue,
1428    TableInfoValue,
1429    ViewInfoValue,
1430    DatanodeTableValue,
1431    FlowInfoValue,
1432    FlowNameValue,
1433    FlowRouteValue,
1434    TableFlowValue,
1435    NodeAddressValue,
1436    SchemaNameValue,
1437    FlowStateValue,
1438    PoisonValue,
1439    TopicRegionValue
1440}
1441
1442impl_optional_metadata_value! {
1443    CatalogNameValue,
1444    SchemaNameValue
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449    use std::collections::{BTreeMap, HashMap, HashSet};
1450    use std::sync::Arc;
1451
1452    use bytes::Bytes;
1453    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1454    use common_time::util::current_time_millis;
1455    use common_wal::options::{KafkaWalOptions, WalOptions};
1456    use futures::TryStreamExt;
1457    use store_api::storage::{RegionId, RegionNumber};
1458    use table::metadata::{RawTableInfo, TableInfo};
1459    use table::table_name::TableName;
1460
1461    use super::datanode_table::DatanodeTableKey;
1462    use super::test_utils;
1463    use crate::ddl::test_util::create_table::test_create_table_task;
1464    use crate::ddl::utils::region_storage_path;
1465    use crate::error::Result;
1466    use crate::key::datanode_table::RegionInfo;
1467    use crate::key::table_info::TableInfoValue;
1468    use crate::key::table_name::TableNameKey;
1469    use crate::key::table_route::TableRouteValue;
1470    use crate::key::{
1471        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1472        TableMetadataManager, ViewInfoValue,
1473    };
1474    use crate::kv_backend::KvBackend;
1475    use crate::kv_backend::memory::MemoryKvBackend;
1476    use crate::peer::Peer;
1477    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1478    use crate::rpc::store::RangeRequest;
1479    use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
1480
1481    #[test]
1482    fn test_deserialized_value_with_bytes() {
1483        let region_route = new_test_region_route();
1484        let region_routes = vec![region_route.clone()];
1485
1486        let expected_region_routes =
1487            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1488        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1489
1490        // Serialize behaviors:
1491        // The inner field will be ignored.
1492        let value = DeserializedValueWithBytes {
1493            // ignored
1494            inner: TableRouteValue::physical(region_routes.clone()),
1495            bytes: Bytes::from(expected.clone()),
1496        };
1497
1498        let encoded = serde_json::to_vec(&value).unwrap();
1499
1500        // Deserialize behaviors:
1501        // The inner field will be deserialized from the bytes field.
1502        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1503            serde_json::from_slice(&encoded).unwrap();
1504
1505        assert_eq!(decoded.inner, expected_region_routes);
1506        assert_eq!(decoded.bytes, expected);
1507    }
1508
1509    fn new_test_region_route() -> RegionRoute {
1510        new_region_route(1, 2)
1511    }
1512
1513    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1514        RegionRoute {
1515            region: Region {
1516                id: region_id.into(),
1517                name: "r1".to_string(),
1518                partition: None,
1519                attrs: BTreeMap::new(),
1520                partition_expr: Default::default(),
1521            },
1522            leader_peer: Some(Peer::new(datanode, "a2")),
1523            follower_peers: vec![],
1524            leader_state: None,
1525            leader_down_since: None,
1526        }
1527    }
1528
1529    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1530        test_utils::new_test_table_info(10, region_numbers)
1531    }
1532
1533    fn new_test_table_names() -> HashSet<TableName> {
1534        let mut set = HashSet::new();
1535        set.insert(TableName {
1536            catalog_name: "greptime".to_string(),
1537            schema_name: "public".to_string(),
1538            table_name: "a_table".to_string(),
1539        });
1540        set.insert(TableName {
1541            catalog_name: "greptime".to_string(),
1542            schema_name: "public".to_string(),
1543            table_name: "b_table".to_string(),
1544        });
1545        set
1546    }
1547
1548    async fn create_physical_table_metadata(
1549        table_metadata_manager: &TableMetadataManager,
1550        table_info: RawTableInfo,
1551        region_routes: Vec<RegionRoute>,
1552        region_wal_options: HashMap<RegionNumber, String>,
1553    ) -> Result<()> {
1554        table_metadata_manager
1555            .create_table_metadata(
1556                table_info,
1557                TableRouteValue::physical(region_routes),
1558                region_wal_options,
1559            )
1560            .await
1561    }
1562
1563    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1564        let topics = (0..2)
1565            .map(|i| format!("greptimedb_topic{}", i))
1566            .collect::<Vec<_>>();
1567        let wal_options = topics
1568            .iter()
1569            .map(|topic| {
1570                WalOptions::Kafka(KafkaWalOptions {
1571                    topic: topic.clone(),
1572                })
1573            })
1574            .collect::<Vec<_>>();
1575
1576        (0..16)
1577            .enumerate()
1578            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1579            .collect()
1580    }
1581
1582    #[tokio::test]
1583    async fn test_raft_engine_topic_region_map() {
1584        let mem_kv = Arc::new(MemoryKvBackend::default());
1585        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1586        let region_route = new_test_region_route();
1587        let region_routes = &vec![region_route.clone()];
1588        let table_info: RawTableInfo =
1589            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1590        let wal_allocator = WalOptionsAllocator::RaftEngine;
1591        let regions = (0..16).collect();
1592        let region_wal_options =
1593            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1594        create_physical_table_metadata(
1595            &table_metadata_manager,
1596            table_info.clone(),
1597            region_routes.clone(),
1598            region_wal_options.clone(),
1599        )
1600        .await
1601        .unwrap();
1602
1603        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1604        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1605        let resp = mem_kv.range(range_req).await.unwrap();
1606        // Should be empty because the topic region map is empty for raft engine.
1607        assert!(resp.kvs.is_empty());
1608    }
1609
1610    #[tokio::test]
1611    async fn test_create_table_metadata() {
1612        let mem_kv = Arc::new(MemoryKvBackend::default());
1613        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1614        let region_route = new_test_region_route();
1615        let region_routes = &vec![region_route.clone()];
1616        let table_info: RawTableInfo =
1617            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1618        let region_wal_options = create_mock_region_wal_options()
1619            .into_iter()
1620            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1621            .collect::<HashMap<_, _>>();
1622
1623        // creates metadata.
1624        create_physical_table_metadata(
1625            &table_metadata_manager,
1626            table_info.clone(),
1627            region_routes.clone(),
1628            region_wal_options.clone(),
1629        )
1630        .await
1631        .unwrap();
1632
1633        // if metadata was already created, it should be ok.
1634        assert!(
1635            create_physical_table_metadata(
1636                &table_metadata_manager,
1637                table_info.clone(),
1638                region_routes.clone(),
1639                region_wal_options.clone(),
1640            )
1641            .await
1642            .is_ok()
1643        );
1644
1645        let mut modified_region_routes = region_routes.clone();
1646        modified_region_routes.push(region_route.clone());
1647        // if remote metadata was exists, it should return an error.
1648        assert!(
1649            create_physical_table_metadata(
1650                &table_metadata_manager,
1651                table_info.clone(),
1652                modified_region_routes,
1653                region_wal_options.clone(),
1654            )
1655            .await
1656            .is_err()
1657        );
1658
1659        let (remote_table_info, remote_table_route) = table_metadata_manager
1660            .get_full_table_info(10)
1661            .await
1662            .unwrap();
1663
1664        assert_eq!(
1665            remote_table_info.unwrap().into_inner().table_info,
1666            table_info
1667        );
1668        assert_eq!(
1669            remote_table_route
1670                .unwrap()
1671                .into_inner()
1672                .region_routes()
1673                .unwrap(),
1674            region_routes
1675        );
1676
1677        for i in 0..2 {
1678            let region_number = i as u32;
1679            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1680            let topic = format!("greptimedb_topic{}", i);
1681            let regions = table_metadata_manager
1682                .topic_region_manager
1683                .regions(&topic)
1684                .await
1685                .unwrap()
1686                .into_keys()
1687                .collect::<Vec<_>>();
1688            assert_eq!(regions.len(), 8);
1689            assert!(regions.contains(&region_id));
1690        }
1691    }
1692
1693    #[tokio::test]
1694    async fn test_create_logic_tables_metadata() {
1695        let mem_kv = Arc::new(MemoryKvBackend::default());
1696        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1697        let region_route = new_test_region_route();
1698        let region_routes = vec![region_route.clone()];
1699        let table_info: RawTableInfo =
1700            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1701        let table_id = table_info.ident.table_id;
1702        let table_route_value = TableRouteValue::physical(region_routes.clone());
1703
1704        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1705        // creates metadata.
1706        table_metadata_manager
1707            .create_logical_tables_metadata(tables_data.clone())
1708            .await
1709            .unwrap();
1710
1711        // if metadata was already created, it should be ok.
1712        assert!(
1713            table_metadata_manager
1714                .create_logical_tables_metadata(tables_data)
1715                .await
1716                .is_ok()
1717        );
1718
1719        let mut modified_region_routes = region_routes.clone();
1720        modified_region_routes.push(new_region_route(2, 3));
1721        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1722        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1723        // if remote metadata was exists, it should return an error.
1724        assert!(
1725            table_metadata_manager
1726                .create_logical_tables_metadata(modified_tables_data)
1727                .await
1728                .is_err()
1729        );
1730
1731        let (remote_table_info, remote_table_route) = table_metadata_manager
1732            .get_full_table_info(table_id)
1733            .await
1734            .unwrap();
1735
1736        assert_eq!(
1737            remote_table_info.unwrap().into_inner().table_info,
1738            table_info
1739        );
1740        assert_eq!(
1741            remote_table_route
1742                .unwrap()
1743                .into_inner()
1744                .region_routes()
1745                .unwrap(),
1746            &region_routes
1747        );
1748    }
1749
1750    #[tokio::test]
1751    async fn test_create_many_logical_tables_metadata() {
1752        let kv_backend = Arc::new(MemoryKvBackend::default());
1753        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1754
1755        let mut tables_data = vec![];
1756        for i in 0..128 {
1757            let table_id = i + 1;
1758            let regin_number = table_id * 3;
1759            let region_id = RegionId::new(table_id, regin_number);
1760            let region_route = new_region_route(region_id.as_u64(), 2);
1761            let region_routes = vec![region_route.clone()];
1762            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1763                table_id,
1764                &format!("my_table_{}", table_id),
1765                region_routes.iter().map(|r| r.region.id.region_number()),
1766            )
1767            .into();
1768            let table_route_value = TableRouteValue::physical(region_routes.clone());
1769
1770            tables_data.push((table_info, table_route_value));
1771        }
1772
1773        // creates metadata.
1774        table_metadata_manager
1775            .create_logical_tables_metadata(tables_data)
1776            .await
1777            .unwrap();
1778    }
1779
1780    #[tokio::test]
1781    async fn test_delete_table_metadata() {
1782        let mem_kv = Arc::new(MemoryKvBackend::default());
1783        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1784        let region_route = new_test_region_route();
1785        let region_routes = &vec![region_route.clone()];
1786        let table_info: RawTableInfo =
1787            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1788        let table_id = table_info.ident.table_id;
1789        let datanode_id = 2;
1790        let region_wal_options = create_mock_region_wal_options();
1791        let serialized_region_wal_options = region_wal_options
1792            .iter()
1793            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1794            .collect::<HashMap<_, _>>();
1795
1796        // creates metadata.
1797        create_physical_table_metadata(
1798            &table_metadata_manager,
1799            table_info.clone(),
1800            region_routes.clone(),
1801            serialized_region_wal_options,
1802        )
1803        .await
1804        .unwrap();
1805
1806        let table_name = TableName::new(
1807            table_info.catalog_name,
1808            table_info.schema_name,
1809            table_info.name,
1810        );
1811        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1812        // deletes metadata.
1813        table_metadata_manager
1814            .delete_table_metadata(
1815                table_id,
1816                &table_name,
1817                table_route_value,
1818                &region_wal_options,
1819            )
1820            .await
1821            .unwrap();
1822        // Should be ignored.
1823        table_metadata_manager
1824            .delete_table_metadata(
1825                table_id,
1826                &table_name,
1827                table_route_value,
1828                &region_wal_options,
1829            )
1830            .await
1831            .unwrap();
1832        assert!(
1833            table_metadata_manager
1834                .table_info_manager()
1835                .get(table_id)
1836                .await
1837                .unwrap()
1838                .is_none()
1839        );
1840        assert!(
1841            table_metadata_manager
1842                .table_route_manager()
1843                .table_route_storage()
1844                .get(table_id)
1845                .await
1846                .unwrap()
1847                .is_none()
1848        );
1849        assert!(
1850            table_metadata_manager
1851                .datanode_table_manager()
1852                .tables(datanode_id)
1853                .try_collect::<Vec<_>>()
1854                .await
1855                .unwrap()
1856                .is_empty()
1857        );
1858        // Checks removed values
1859        let table_info = table_metadata_manager
1860            .table_info_manager()
1861            .get(table_id)
1862            .await
1863            .unwrap();
1864        assert!(table_info.is_none());
1865        let table_route = table_metadata_manager
1866            .table_route_manager()
1867            .table_route_storage()
1868            .get(table_id)
1869            .await
1870            .unwrap();
1871        assert!(table_route.is_none());
1872        // Logical delete removes the topic region mapping as well.
1873        let regions = table_metadata_manager
1874            .topic_region_manager
1875            .regions("greptimedb_topic0")
1876            .await
1877            .unwrap();
1878        assert_eq!(regions.len(), 0);
1879        let regions = table_metadata_manager
1880            .topic_region_manager
1881            .regions("greptimedb_topic1")
1882            .await
1883            .unwrap();
1884        assert_eq!(regions.len(), 0);
1885    }
1886
1887    #[tokio::test]
1888    async fn test_rename_table() {
1889        let mem_kv = Arc::new(MemoryKvBackend::default());
1890        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1891        let region_route = new_test_region_route();
1892        let region_routes = vec![region_route.clone()];
1893        let table_info: RawTableInfo =
1894            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1895        let table_id = table_info.ident.table_id;
1896        // creates metadata.
1897        create_physical_table_metadata(
1898            &table_metadata_manager,
1899            table_info.clone(),
1900            region_routes.clone(),
1901            HashMap::new(),
1902        )
1903        .await
1904        .unwrap();
1905
1906        let new_table_name = "another_name".to_string();
1907        let table_info_value =
1908            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1909
1910        table_metadata_manager
1911            .rename_table(&table_info_value, new_table_name.clone())
1912            .await
1913            .unwrap();
1914        // if remote metadata was updated, it should be ok.
1915        table_metadata_manager
1916            .rename_table(&table_info_value, new_table_name.clone())
1917            .await
1918            .unwrap();
1919        let mut modified_table_info = table_info.clone();
1920        modified_table_info.name = "hi".to_string();
1921        let modified_table_info_value =
1922            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1923        // if the table_info_value is wrong, it should return an error.
1924        // The ABA problem.
1925        assert!(
1926            table_metadata_manager
1927                .rename_table(&modified_table_info_value, new_table_name.clone())
1928                .await
1929                .is_err()
1930        );
1931
1932        let old_table_name = TableNameKey::new(
1933            &table_info.catalog_name,
1934            &table_info.schema_name,
1935            &table_info.name,
1936        );
1937        let new_table_name = TableNameKey::new(
1938            &table_info.catalog_name,
1939            &table_info.schema_name,
1940            &new_table_name,
1941        );
1942
1943        assert!(
1944            table_metadata_manager
1945                .table_name_manager()
1946                .get(old_table_name)
1947                .await
1948                .unwrap()
1949                .is_none()
1950        );
1951
1952        assert_eq!(
1953            table_metadata_manager
1954                .table_name_manager()
1955                .get(new_table_name)
1956                .await
1957                .unwrap()
1958                .unwrap()
1959                .table_id(),
1960            table_id
1961        );
1962    }
1963
1964    #[tokio::test]
1965    async fn test_update_table_info() {
1966        let mem_kv = Arc::new(MemoryKvBackend::default());
1967        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1968        let region_route = new_test_region_route();
1969        let region_routes = vec![region_route.clone()];
1970        let table_info: RawTableInfo =
1971            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1972        let table_id = table_info.ident.table_id;
1973        // creates metadata.
1974        create_physical_table_metadata(
1975            &table_metadata_manager,
1976            table_info.clone(),
1977            region_routes.clone(),
1978            HashMap::new(),
1979        )
1980        .await
1981        .unwrap();
1982
1983        let mut new_table_info = table_info.clone();
1984        new_table_info.name = "hi".to_string();
1985        let current_table_info_value =
1986            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1987        // should be ok.
1988        table_metadata_manager
1989            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1990            .await
1991            .unwrap();
1992        // if table info was updated, it should be ok.
1993        table_metadata_manager
1994            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1995            .await
1996            .unwrap();
1997
1998        // updated table_info should equal the `new_table_info`
1999        let updated_table_info = table_metadata_manager
2000            .table_info_manager()
2001            .get(table_id)
2002            .await
2003            .unwrap()
2004            .unwrap()
2005            .into_inner();
2006        assert_eq!(updated_table_info.table_info, new_table_info);
2007
2008        let mut wrong_table_info = table_info.clone();
2009        wrong_table_info.name = "wrong".to_string();
2010        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2011            current_table_info_value.update(wrong_table_info),
2012        );
2013        // if the current_table_info_value is wrong, it should return an error.
2014        // The ABA problem.
2015        assert!(
2016            table_metadata_manager
2017                .update_table_info(&wrong_table_info_value, None, new_table_info)
2018                .await
2019                .is_err()
2020        )
2021    }
2022
2023    #[tokio::test]
2024    async fn test_update_table_leader_region_status() {
2025        let mem_kv = Arc::new(MemoryKvBackend::default());
2026        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2027        let datanode = 1;
2028        let region_routes = vec![
2029            RegionRoute {
2030                region: Region {
2031                    id: 1.into(),
2032                    name: "r1".to_string(),
2033                    partition: None,
2034                    attrs: BTreeMap::new(),
2035                    partition_expr: Default::default(),
2036                },
2037                leader_peer: Some(Peer::new(datanode, "a2")),
2038                leader_state: Some(LeaderState::Downgrading),
2039                follower_peers: vec![],
2040                leader_down_since: Some(current_time_millis()),
2041            },
2042            RegionRoute {
2043                region: Region {
2044                    id: 2.into(),
2045                    name: "r2".to_string(),
2046                    partition: None,
2047                    attrs: BTreeMap::new(),
2048                    partition_expr: Default::default(),
2049                },
2050                leader_peer: Some(Peer::new(datanode, "a1")),
2051                leader_state: None,
2052                follower_peers: vec![],
2053                leader_down_since: None,
2054            },
2055        ];
2056        let table_info: RawTableInfo =
2057            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2058        let table_id = table_info.ident.table_id;
2059        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2060            TableRouteValue::physical(region_routes.clone()),
2061        );
2062
2063        // creates metadata.
2064        create_physical_table_metadata(
2065            &table_metadata_manager,
2066            table_info.clone(),
2067            region_routes.clone(),
2068            HashMap::new(),
2069        )
2070        .await
2071        .unwrap();
2072
2073        table_metadata_manager
2074            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2075                if region_route.leader_state.is_some() {
2076                    None
2077                } else {
2078                    Some(Some(LeaderState::Downgrading))
2079                }
2080            })
2081            .await
2082            .unwrap();
2083
2084        let updated_route_value = table_metadata_manager
2085            .table_route_manager()
2086            .table_route_storage()
2087            .get(table_id)
2088            .await
2089            .unwrap()
2090            .unwrap();
2091
2092        assert_eq!(
2093            updated_route_value.region_routes().unwrap()[0].leader_state,
2094            Some(LeaderState::Downgrading)
2095        );
2096
2097        assert!(
2098            updated_route_value.region_routes().unwrap()[0]
2099                .leader_down_since
2100                .is_some()
2101        );
2102
2103        assert_eq!(
2104            updated_route_value.region_routes().unwrap()[1].leader_state,
2105            Some(LeaderState::Downgrading)
2106        );
2107        assert!(
2108            updated_route_value.region_routes().unwrap()[1]
2109                .leader_down_since
2110                .is_some()
2111        );
2112    }
2113
2114    async fn assert_datanode_table(
2115        table_metadata_manager: &TableMetadataManager,
2116        table_id: u32,
2117        region_routes: &[RegionRoute],
2118    ) {
2119        let region_distribution = region_distribution(region_routes);
2120        for (datanode, regions) in region_distribution {
2121            let got = table_metadata_manager
2122                .datanode_table_manager()
2123                .get(&DatanodeTableKey::new(datanode, table_id))
2124                .await
2125                .unwrap()
2126                .unwrap();
2127
2128            assert_eq!(got.regions, regions.leader_regions);
2129            assert_eq!(got.follower_regions, regions.follower_regions);
2130        }
2131    }
2132
2133    #[tokio::test]
2134    async fn test_update_table_route() {
2135        let mem_kv = Arc::new(MemoryKvBackend::default());
2136        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2137        let region_route = new_test_region_route();
2138        let region_routes = vec![region_route.clone()];
2139        let table_info: RawTableInfo =
2140            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2141        let table_id = table_info.ident.table_id;
2142        let engine = table_info.meta.engine.as_str();
2143        let region_storage_path =
2144            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2145        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2146            TableRouteValue::physical(region_routes.clone()),
2147        );
2148
2149        // creates metadata.
2150        create_physical_table_metadata(
2151            &table_metadata_manager,
2152            table_info.clone(),
2153            region_routes.clone(),
2154            HashMap::new(),
2155        )
2156        .await
2157        .unwrap();
2158
2159        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2160        let new_region_routes = vec![
2161            new_region_route(1, 1),
2162            new_region_route(2, 2),
2163            new_region_route(3, 3),
2164        ];
2165        // it should be ok.
2166        table_metadata_manager
2167            .update_table_route(
2168                table_id,
2169                RegionInfo {
2170                    engine: engine.to_string(),
2171                    region_storage_path: region_storage_path.clone(),
2172                    region_options: HashMap::new(),
2173                    region_wal_options: HashMap::new(),
2174                },
2175                &current_table_route_value,
2176                new_region_routes.clone(),
2177                &HashMap::new(),
2178                &HashMap::new(),
2179            )
2180            .await
2181            .unwrap();
2182        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2183
2184        // if the table route was updated. it should be ok.
2185        table_metadata_manager
2186            .update_table_route(
2187                table_id,
2188                RegionInfo {
2189                    engine: engine.to_string(),
2190                    region_storage_path: region_storage_path.clone(),
2191                    region_options: HashMap::new(),
2192                    region_wal_options: HashMap::new(),
2193                },
2194                &current_table_route_value,
2195                new_region_routes.clone(),
2196                &HashMap::new(),
2197                &HashMap::new(),
2198            )
2199            .await
2200            .unwrap();
2201
2202        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2203            current_table_route_value
2204                .inner
2205                .update(new_region_routes.clone())
2206                .unwrap(),
2207        );
2208        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2209        // it should be ok.
2210        table_metadata_manager
2211            .update_table_route(
2212                table_id,
2213                RegionInfo {
2214                    engine: engine.to_string(),
2215                    region_storage_path: region_storage_path.clone(),
2216                    region_options: HashMap::new(),
2217                    region_wal_options: HashMap::new(),
2218                },
2219                &current_table_route_value,
2220                new_region_routes.clone(),
2221                &HashMap::new(),
2222                &HashMap::new(),
2223            )
2224            .await
2225            .unwrap();
2226        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2227
2228        // if the current_table_route_value is wrong, it should return an error.
2229        // The ABA problem.
2230        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2231            current_table_route_value
2232                .update(vec![
2233                    new_region_route(1, 1),
2234                    new_region_route(2, 2),
2235                    new_region_route(3, 3),
2236                    new_region_route(4, 4),
2237                ])
2238                .unwrap(),
2239        );
2240        assert!(
2241            table_metadata_manager
2242                .update_table_route(
2243                    table_id,
2244                    RegionInfo {
2245                        engine: engine.to_string(),
2246                        region_storage_path: region_storage_path.clone(),
2247                        region_options: HashMap::new(),
2248                        region_wal_options: HashMap::new(),
2249                    },
2250                    &wrong_table_route_value,
2251                    new_region_routes,
2252                    &HashMap::new(),
2253                    &HashMap::new(),
2254                )
2255                .await
2256                .is_err()
2257        );
2258    }
2259
2260    #[tokio::test]
2261    async fn test_destroy_table_metadata() {
2262        let mem_kv = Arc::new(MemoryKvBackend::default());
2263        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2264        let table_id = 1025;
2265        let table_name = "foo";
2266        let task = test_create_table_task(table_name, table_id);
2267        let options = create_mock_region_wal_options();
2268        let serialized_options = options
2269            .iter()
2270            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2271            .collect::<HashMap<_, _>>();
2272        table_metadata_manager
2273            .create_table_metadata(
2274                task.table_info,
2275                TableRouteValue::physical(vec![
2276                    RegionRoute {
2277                        region: Region::new_test(RegionId::new(table_id, 1)),
2278                        leader_peer: Some(Peer::empty(1)),
2279                        follower_peers: vec![Peer::empty(5)],
2280                        leader_state: None,
2281                        leader_down_since: None,
2282                    },
2283                    RegionRoute {
2284                        region: Region::new_test(RegionId::new(table_id, 2)),
2285                        leader_peer: Some(Peer::empty(2)),
2286                        follower_peers: vec![Peer::empty(4)],
2287                        leader_state: None,
2288                        leader_down_since: None,
2289                    },
2290                    RegionRoute {
2291                        region: Region::new_test(RegionId::new(table_id, 3)),
2292                        leader_peer: Some(Peer::empty(3)),
2293                        follower_peers: vec![],
2294                        leader_state: None,
2295                        leader_down_since: None,
2296                    },
2297                ]),
2298                serialized_options,
2299            )
2300            .await
2301            .unwrap();
2302        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2303        let table_route_value = table_metadata_manager
2304            .table_route_manager
2305            .table_route_storage()
2306            .get_with_raw_bytes(table_id)
2307            .await
2308            .unwrap()
2309            .unwrap();
2310        table_metadata_manager
2311            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2312            .await
2313            .unwrap();
2314        assert!(mem_kv.is_empty());
2315    }
2316
2317    #[tokio::test]
2318    async fn test_restore_table_metadata() {
2319        let mem_kv = Arc::new(MemoryKvBackend::default());
2320        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2321        let table_id = 1025;
2322        let table_name = "foo";
2323        let task = test_create_table_task(table_name, table_id);
2324        let options = create_mock_region_wal_options();
2325        let serialized_options = options
2326            .iter()
2327            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2328            .collect::<HashMap<_, _>>();
2329        table_metadata_manager
2330            .create_table_metadata(
2331                task.table_info,
2332                TableRouteValue::physical(vec![
2333                    RegionRoute {
2334                        region: Region::new_test(RegionId::new(table_id, 1)),
2335                        leader_peer: Some(Peer::empty(1)),
2336                        follower_peers: vec![Peer::empty(5)],
2337                        leader_state: None,
2338                        leader_down_since: None,
2339                    },
2340                    RegionRoute {
2341                        region: Region::new_test(RegionId::new(table_id, 2)),
2342                        leader_peer: Some(Peer::empty(2)),
2343                        follower_peers: vec![Peer::empty(4)],
2344                        leader_state: None,
2345                        leader_down_since: None,
2346                    },
2347                    RegionRoute {
2348                        region: Region::new_test(RegionId::new(table_id, 3)),
2349                        leader_peer: Some(Peer::empty(3)),
2350                        follower_peers: vec![],
2351                        leader_state: None,
2352                        leader_down_since: None,
2353                    },
2354                ]),
2355                serialized_options,
2356            )
2357            .await
2358            .unwrap();
2359        let expected_result = mem_kv.dump();
2360        let table_route_value = table_metadata_manager
2361            .table_route_manager
2362            .table_route_storage()
2363            .get_with_raw_bytes(table_id)
2364            .await
2365            .unwrap()
2366            .unwrap();
2367        let region_routes = table_route_value.region_routes().unwrap();
2368        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2369        let table_route_value = TableRouteValue::physical(region_routes.clone());
2370        table_metadata_manager
2371            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2372            .await
2373            .unwrap();
2374        table_metadata_manager
2375            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2376            .await
2377            .unwrap();
2378        let kvs = mem_kv.dump();
2379        assert_eq!(kvs, expected_result);
2380        // Should be ignored.
2381        table_metadata_manager
2382            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2383            .await
2384            .unwrap();
2385        let kvs = mem_kv.dump();
2386        assert_eq!(kvs, expected_result);
2387    }
2388
2389    #[tokio::test]
2390    async fn test_create_update_view_info() {
2391        let mem_kv = Arc::new(MemoryKvBackend::default());
2392        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2393
2394        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2395
2396        let view_id = view_info.ident.table_id;
2397
2398        let logical_plan: Vec<u8> = vec![1, 2, 3];
2399        let columns = vec!["a".to_string()];
2400        let plan_columns = vec!["number".to_string()];
2401        let table_names = new_test_table_names();
2402        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2403
2404        // Create metadata
2405        table_metadata_manager
2406            .create_view_metadata(
2407                view_info.clone(),
2408                logical_plan.clone(),
2409                table_names.clone(),
2410                columns.clone(),
2411                plan_columns.clone(),
2412                definition.to_string(),
2413            )
2414            .await
2415            .unwrap();
2416
2417        {
2418            // assert view info
2419            let current_view_info = table_metadata_manager
2420                .view_info_manager()
2421                .get(view_id)
2422                .await
2423                .unwrap()
2424                .unwrap()
2425                .into_inner();
2426            assert_eq!(current_view_info.view_info, logical_plan);
2427            assert_eq!(current_view_info.table_names, table_names);
2428            assert_eq!(current_view_info.definition, definition);
2429            assert_eq!(current_view_info.columns, columns);
2430            assert_eq!(current_view_info.plan_columns, plan_columns);
2431            // assert table info
2432            let current_table_info = table_metadata_manager
2433                .table_info_manager()
2434                .get(view_id)
2435                .await
2436                .unwrap()
2437                .unwrap()
2438                .into_inner();
2439            assert_eq!(current_table_info.table_info, view_info);
2440        }
2441
2442        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2443        let new_table_names = {
2444            let mut set = HashSet::new();
2445            set.insert(TableName {
2446                catalog_name: "greptime".to_string(),
2447                schema_name: "public".to_string(),
2448                table_name: "b_table".to_string(),
2449            });
2450            set.insert(TableName {
2451                catalog_name: "greptime".to_string(),
2452                schema_name: "public".to_string(),
2453                table_name: "c_table".to_string(),
2454            });
2455            set
2456        };
2457        let new_columns = vec!["b".to_string()];
2458        let new_plan_columns = vec!["number2".to_string()];
2459        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2460
2461        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2462            logical_plan.clone(),
2463            table_names,
2464            columns,
2465            plan_columns,
2466            definition.to_string(),
2467        ));
2468        // should be ok.
2469        table_metadata_manager
2470            .update_view_info(
2471                view_id,
2472                &current_view_info_value,
2473                new_logical_plan.clone(),
2474                new_table_names.clone(),
2475                new_columns.clone(),
2476                new_plan_columns.clone(),
2477                new_definition.to_string(),
2478            )
2479            .await
2480            .unwrap();
2481        // if table info was updated, it should be ok.
2482        table_metadata_manager
2483            .update_view_info(
2484                view_id,
2485                &current_view_info_value,
2486                new_logical_plan.clone(),
2487                new_table_names.clone(),
2488                new_columns.clone(),
2489                new_plan_columns.clone(),
2490                new_definition.to_string(),
2491            )
2492            .await
2493            .unwrap();
2494
2495        // updated view_info should equal the `new_logical_plan`
2496        let updated_view_info = table_metadata_manager
2497            .view_info_manager()
2498            .get(view_id)
2499            .await
2500            .unwrap()
2501            .unwrap()
2502            .into_inner();
2503        assert_eq!(updated_view_info.view_info, new_logical_plan);
2504        assert_eq!(updated_view_info.table_names, new_table_names);
2505        assert_eq!(updated_view_info.definition, new_definition);
2506        assert_eq!(updated_view_info.columns, new_columns);
2507        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2508
2509        let wrong_view_info = logical_plan.clone();
2510        let wrong_definition = "wrong_definition";
2511        let wrong_view_info_value =
2512            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2513                wrong_view_info,
2514                new_table_names.clone(),
2515                new_columns.clone(),
2516                new_plan_columns.clone(),
2517                wrong_definition.to_string(),
2518            ));
2519        // if the current_view_info_value is wrong, it should return an error.
2520        // The ABA problem.
2521        assert!(
2522            table_metadata_manager
2523                .update_view_info(
2524                    view_id,
2525                    &wrong_view_info_value,
2526                    new_logical_plan.clone(),
2527                    new_table_names.clone(),
2528                    vec!["c".to_string()],
2529                    vec!["number3".to_string()],
2530                    wrong_definition.to_string(),
2531                )
2532                .await
2533                .is_err()
2534        );
2535
2536        // The view_info is not changed.
2537        let current_view_info = table_metadata_manager
2538            .view_info_manager()
2539            .get(view_id)
2540            .await
2541            .unwrap()
2542            .unwrap()
2543            .into_inner();
2544        assert_eq!(current_view_info.view_info, new_logical_plan);
2545        assert_eq!(current_view_info.table_names, new_table_names);
2546        assert_eq!(current_view_info.definition, new_definition);
2547        assert_eq!(current_view_info.columns, new_columns);
2548        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2549    }
2550
2551    #[test]
2552    fn test_region_role_set_deserialize() {
2553        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2554        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2555        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2556        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2557
2558        let s = r#"[1, 2, 3]"#;
2559        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2560        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2561        assert!(region_role_set.follower_regions.is_empty());
2562    }
2563
2564    #[test]
2565    fn test_region_distribution_deserialize() {
2566        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2567        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2568        assert_eq!(region_distribution.len(), 2);
2569        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2570        assert!(region_distribution[&1].follower_regions.is_empty());
2571        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2572        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2573    }
2574}