common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod maintenance;
104pub mod node_address;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const MAINTENANCE_KEY: &str = "__maintenance";
168
169pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
170pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
171pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
172pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
173pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
174pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
175pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
176pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
177pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
178// The legacy topic key prefix is used to store the topic name in previous versions.
179pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
180pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
181
182/// The keys with these prefixes will be loaded into the cache when the leader starts.
183pub const CACHE_KEY_PREFIXES: [&str; 5] = [
184    TABLE_NAME_KEY_PREFIX,
185    CATALOG_NAME_KEY_PREFIX,
186    SCHEMA_NAME_KEY_PREFIX,
187    TABLE_ROUTE_PREFIX,
188    NODE_ADDRESS_PREFIX,
189];
190
191/// A set of regions with the same role.
192#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
193pub struct RegionRoleSet {
194    /// Leader regions.
195    pub leader_regions: Vec<RegionNumber>,
196    /// Follower regions.
197    pub follower_regions: Vec<RegionNumber>,
198}
199
200impl<'de> Deserialize<'de> for RegionRoleSet {
201    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
202    where
203        D: serde::Deserializer<'de>,
204    {
205        #[derive(Deserialize)]
206        #[serde(untagged)]
207        enum RegionRoleSetOrLeaderOnly {
208            Full {
209                leader_regions: Vec<RegionNumber>,
210                follower_regions: Vec<RegionNumber>,
211            },
212            LeaderOnly(Vec<RegionNumber>),
213        }
214        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
215            RegionRoleSetOrLeaderOnly::Full {
216                leader_regions,
217                follower_regions,
218            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
219            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
220                Ok(RegionRoleSet::new(leader_regions, vec![]))
221            }
222        }
223    }
224}
225
226impl RegionRoleSet {
227    /// Create a new region role set.
228    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
229        Self {
230            leader_regions,
231            follower_regions,
232        }
233    }
234
235    /// Add a leader region to the set.
236    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
237        self.leader_regions.push(region_number);
238    }
239
240    /// Add a follower region to the set.
241    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
242        self.follower_regions.push(region_number);
243    }
244
245    /// Sort the regions.
246    pub fn sort(&mut self) {
247        self.follower_regions.sort();
248        self.leader_regions.sort();
249    }
250}
251
252/// The distribution of regions.
253///
254/// The key is the datanode id, the value is the region role set.
255pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
256
257/// The id of flow.
258pub type FlowId = u32;
259/// The partition of flow.
260pub type FlowPartitionId = u32;
261
262lazy_static! {
263    pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
264}
265
266lazy_static! {
267    static ref TABLE_INFO_KEY_PATTERN: Regex =
268        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
269}
270
271lazy_static! {
272    static ref VIEW_INFO_KEY_PATTERN: Regex =
273        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
274}
275
276lazy_static! {
277    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
278        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
279}
280
281lazy_static! {
282    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
283        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
288        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
289    ))
290    .unwrap();
291}
292
293lazy_static! {
294    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
295    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
296        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
297    ))
298        .unwrap();
299}
300
301lazy_static! {
302    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
303    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
304        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
305    ))
306        .unwrap();
307}
308
309lazy_static! {
310    static ref NODE_ADDRESS_PATTERN: Regex =
311        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
312}
313
314lazy_static! {
315    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
316        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
317}
318
319lazy_static! {
320    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
321        "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
322    ))
323    .unwrap();
324}
325
326/// The key of metadata.
327pub trait MetadataKey<'a, T> {
328    fn to_bytes(&self) -> Vec<u8>;
329
330    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
331}
332
333#[derive(Debug, Clone, PartialEq)]
334pub struct BytesAdapter(Vec<u8>);
335
336impl From<Vec<u8>> for BytesAdapter {
337    fn from(value: Vec<u8>) -> Self {
338        Self(value)
339    }
340}
341
342impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
343    fn to_bytes(&self) -> Vec<u8> {
344        self.0.clone()
345    }
346
347    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
348        Ok(BytesAdapter(bytes.to_vec()))
349    }
350}
351
352pub(crate) trait MetadataKeyGetTxnOp {
353    fn build_get_op(
354        &self,
355    ) -> (
356        TxnOp,
357        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
358    );
359}
360
361pub trait MetadataValue {
362    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
363    where
364        Self: Sized;
365
366    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
367}
368
369pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
370
371pub struct TableMetadataManager {
372    table_name_manager: TableNameManager,
373    table_info_manager: TableInfoManager,
374    view_info_manager: ViewInfoManager,
375    datanode_table_manager: DatanodeTableManager,
376    catalog_manager: CatalogManager,
377    schema_manager: SchemaManager,
378    table_route_manager: TableRouteManager,
379    tombstone_manager: TombstoneManager,
380    topic_name_manager: TopicNameManager,
381    topic_region_manager: TopicRegionManager,
382    kv_backend: KvBackendRef,
383}
384
385#[macro_export]
386macro_rules! ensure_values {
387    ($got:expr, $expected_value:expr, $name:expr) => {
388        ensure!(
389            $got == $expected_value,
390            error::UnexpectedSnafu {
391                err_msg: format!(
392                    "Reads the different value: {:?} during {}, expected: {:?}",
393                    $got, $name, $expected_value
394                )
395            }
396        );
397    };
398}
399
400/// A struct containing a deserialized value(`inner`) and an original bytes.
401///
402/// - Serialize behaviors:
403///
404/// The `inner` field will be ignored.
405///
406/// - Deserialize behaviors:
407///
408/// The `inner` field will be deserialized from the `bytes` field.
409pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
410    // The original bytes of the inner.
411    bytes: Bytes,
412    // The value was deserialized from the original bytes.
413    inner: T,
414}
415
416impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
417    type Target = T;
418
419    fn deref(&self) -> &Self::Target {
420        &self.inner
421    }
422}
423
424impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
425    fn deref_mut(&mut self) -> &mut Self::Target {
426        &mut self.inner
427    }
428}
429
430impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        write!(
433            f,
434            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
435            self.inner, self.bytes
436        )
437    }
438}
439
440impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
441    /// - Serialize behaviors:
442    ///
443    /// The `inner` field will be ignored.
444    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
445    where
446        S: serde::Serializer,
447    {
448        // Safety: The original bytes are always JSON encoded.
449        // It's more efficiently than `serialize_bytes`.
450        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
451    }
452}
453
454impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
455    for DeserializedValueWithBytes<T>
456{
457    /// - Deserialize behaviors:
458    ///
459    /// The `inner` field will be deserialized from the `bytes` field.
460    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
461    where
462        D: serde::Deserializer<'de>,
463    {
464        let buf = String::deserialize(deserializer)?;
465        let bytes = Bytes::from(buf);
466
467        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
468            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
469
470        Ok(value)
471    }
472}
473
474impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
475    fn clone(&self) -> Self {
476        Self {
477            bytes: self.bytes.clone(),
478            inner: self.inner.clone(),
479        }
480    }
481}
482
483impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
484    /// Returns a struct containing a deserialized value and an original `bytes`.
485    /// It accepts original bytes of inner.
486    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
487        let inner = T::try_from_raw_value(&bytes)?;
488        Ok(Self { bytes, inner })
489    }
490
491    /// Returns a struct containing a deserialized value and an original `bytes`.
492    /// It accepts original bytes of inner.
493    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
494        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
495    }
496
497    pub fn into_inner(self) -> T {
498        self.inner
499    }
500
501    pub fn get_inner_ref(&self) -> &T {
502        &self.inner
503    }
504
505    /// Returns original `bytes`
506    pub fn get_raw_bytes(&self) -> Vec<u8> {
507        self.bytes.to_vec()
508    }
509
510    #[cfg(any(test, feature = "testing"))]
511    pub fn from_inner(inner: T) -> Self {
512        let bytes = serde_json::to_vec(&inner).unwrap();
513
514        Self {
515            bytes: Bytes::from(bytes),
516            inner,
517        }
518    }
519}
520
521impl TableMetadataManager {
522    pub fn new(kv_backend: KvBackendRef) -> Self {
523        TableMetadataManager {
524            table_name_manager: TableNameManager::new(kv_backend.clone()),
525            table_info_manager: TableInfoManager::new(kv_backend.clone()),
526            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
527            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
528            catalog_manager: CatalogManager::new(kv_backend.clone()),
529            schema_manager: SchemaManager::new(kv_backend.clone()),
530            table_route_manager: TableRouteManager::new(kv_backend.clone()),
531            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
532            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
533            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
534            kv_backend,
535        }
536    }
537
538    pub async fn init(&self) -> Result<()> {
539        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
540
541        self.catalog_manager().create(catalog_name, true).await?;
542
543        let internal_schemas = [
544            DEFAULT_SCHEMA_NAME,
545            INFORMATION_SCHEMA_NAME,
546            DEFAULT_PRIVATE_SCHEMA_NAME,
547        ];
548
549        for schema_name in internal_schemas {
550            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
551
552            self.schema_manager().create(schema_key, None, true).await?;
553        }
554
555        Ok(())
556    }
557
558    pub fn table_name_manager(&self) -> &TableNameManager {
559        &self.table_name_manager
560    }
561
562    pub fn table_info_manager(&self) -> &TableInfoManager {
563        &self.table_info_manager
564    }
565
566    pub fn view_info_manager(&self) -> &ViewInfoManager {
567        &self.view_info_manager
568    }
569
570    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
571        &self.datanode_table_manager
572    }
573
574    pub fn catalog_manager(&self) -> &CatalogManager {
575        &self.catalog_manager
576    }
577
578    pub fn schema_manager(&self) -> &SchemaManager {
579        &self.schema_manager
580    }
581
582    pub fn table_route_manager(&self) -> &TableRouteManager {
583        &self.table_route_manager
584    }
585
586    pub fn topic_name_manager(&self) -> &TopicNameManager {
587        &self.topic_name_manager
588    }
589
590    pub fn topic_region_manager(&self) -> &TopicRegionManager {
591        &self.topic_region_manager
592    }
593
594    #[cfg(feature = "testing")]
595    pub fn kv_backend(&self) -> &KvBackendRef {
596        &self.kv_backend
597    }
598
599    pub async fn get_full_table_info(
600        &self,
601        table_id: TableId,
602    ) -> Result<(
603        Option<DeserializedValueWithBytes<TableInfoValue>>,
604        Option<DeserializedValueWithBytes<TableRouteValue>>,
605    )> {
606        let table_info_key = TableInfoKey::new(table_id);
607        let table_route_key = TableRouteKey::new(table_id);
608        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
609        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
610
611        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
612        let mut res = self.kv_backend.txn(txn).await?;
613        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
614        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
615        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
616        Ok((table_info_value, table_route_value))
617    }
618
619    /// Creates metadata for view and returns an error if different metadata exists.
620    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
621    /// Parameters include:
622    /// - `view_info`: the encoded logical plan
623    /// - `table_names`: the resolved fully table names in logical plan
624    /// - `columns`: the view columns
625    /// - `plan_columns`: the original plan columns
626    /// - `definition`: The SQL to create the view
627    ///
628    pub async fn create_view_metadata(
629        &self,
630        view_info: RawTableInfo,
631        raw_logical_plan: Vec<u8>,
632        table_names: HashSet<TableName>,
633        columns: Vec<String>,
634        plan_columns: Vec<String>,
635        definition: String,
636    ) -> Result<()> {
637        let view_id = view_info.ident.table_id;
638
639        // Creates view name.
640        let view_name = TableNameKey::new(
641            &view_info.catalog_name,
642            &view_info.schema_name,
643            &view_info.name,
644        );
645        let create_table_name_txn = self
646            .table_name_manager()
647            .build_create_txn(&view_name, view_id)?;
648
649        // Creates table info.
650        let table_info_value = TableInfoValue::new(view_info);
651
652        let (create_table_info_txn, on_create_table_info_failure) = self
653            .table_info_manager()
654            .build_create_txn(view_id, &table_info_value)?;
655
656        // Creates view info
657        let view_info_value = ViewInfoValue::new(
658            raw_logical_plan,
659            table_names,
660            columns,
661            plan_columns,
662            definition,
663        );
664        let (create_view_info_txn, on_create_view_info_failure) = self
665            .view_info_manager()
666            .build_create_txn(view_id, &view_info_value)?;
667
668        let txn = Txn::merge_all(vec![
669            create_table_name_txn,
670            create_table_info_txn,
671            create_view_info_txn,
672        ]);
673
674        let mut r = self.kv_backend.txn(txn).await?;
675
676        // Checks whether metadata was already created.
677        if !r.succeeded {
678            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
679            let remote_table_info = on_create_table_info_failure(&mut set)?
680                .context(error::UnexpectedSnafu {
681                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
682                })?
683                .into_inner();
684
685            let remote_view_info = on_create_view_info_failure(&mut set)?
686                .context(error::UnexpectedSnafu {
687                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
688                })?
689                .into_inner();
690
691            let op_name = "the creating view metadata";
692            ensure_values!(remote_table_info, table_info_value, op_name);
693            ensure_values!(remote_view_info, view_info_value, op_name);
694        }
695
696        Ok(())
697    }
698
699    /// Creates metadata for table and returns an error if different metadata exists.
700    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
701    pub async fn create_table_metadata(
702        &self,
703        mut table_info: RawTableInfo,
704        table_route_value: TableRouteValue,
705        region_wal_options: HashMap<RegionNumber, String>,
706    ) -> Result<()> {
707        let region_numbers = table_route_value.region_numbers();
708        table_info.meta.region_numbers = region_numbers;
709        let table_id = table_info.ident.table_id;
710        let engine = table_info.meta.engine.clone();
711
712        // Creates table name.
713        let table_name = TableNameKey::new(
714            &table_info.catalog_name,
715            &table_info.schema_name,
716            &table_info.name,
717        );
718        let create_table_name_txn = self
719            .table_name_manager()
720            .build_create_txn(&table_name, table_id)?;
721
722        let region_options = table_info.to_region_options();
723        // Creates table info.
724        let table_info_value = TableInfoValue::new(table_info);
725        let (create_table_info_txn, on_create_table_info_failure) = self
726            .table_info_manager()
727            .build_create_txn(table_id, &table_info_value)?;
728
729        let (create_table_route_txn, on_create_table_route_failure) = self
730            .table_route_manager()
731            .table_route_storage()
732            .build_create_txn(table_id, &table_route_value)?;
733
734        let create_topic_region_txn = self
735            .topic_region_manager
736            .build_create_txn(table_id, &region_wal_options)?;
737
738        let mut txn = Txn::merge_all(vec![
739            create_table_name_txn,
740            create_table_info_txn,
741            create_table_route_txn,
742            create_topic_region_txn,
743        ]);
744
745        if let TableRouteValue::Physical(x) = &table_route_value {
746            let region_storage_path = table_info_value.region_storage_path();
747            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
748                table_id,
749                &engine,
750                &region_storage_path,
751                region_options,
752                region_wal_options,
753                region_distribution(&x.region_routes),
754            )?;
755            txn = txn.merge(create_datanode_table_txn);
756        }
757
758        let mut r = self.kv_backend.txn(txn).await?;
759
760        // Checks whether metadata was already created.
761        if !r.succeeded {
762            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
763            let remote_table_info = on_create_table_info_failure(&mut set)?
764                .context(error::UnexpectedSnafu {
765                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
766                })?
767                .into_inner();
768
769            let remote_table_route = on_create_table_route_failure(&mut set)?
770                .context(error::UnexpectedSnafu {
771                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
772                })?
773                .into_inner();
774
775            let op_name = "the creating table metadata";
776            ensure_values!(remote_table_info, table_info_value, op_name);
777            ensure_values!(remote_table_route, table_route_value, op_name);
778        }
779
780        Ok(())
781    }
782
783    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
784        // The batch size is max_txn_size / 3 because the size of the `tables_data`
785        // is 3 times the size of the `tables_data`.
786        self.kv_backend.max_txn_ops() / 3
787    }
788
789    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
790    pub async fn create_logical_tables_metadata(
791        &self,
792        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
793    ) -> Result<()> {
794        let len = tables_data.len();
795        let mut txns = Vec::with_capacity(3 * len);
796        struct OnFailure<F1, R1, F2, R2>
797        where
798            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
799            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
800        {
801            table_info_value: TableInfoValue,
802            on_create_table_info_failure: F1,
803            table_route_value: TableRouteValue,
804            on_create_table_route_failure: F2,
805        }
806        let mut on_failures = Vec::with_capacity(len);
807        for (mut table_info, table_route_value) in tables_data {
808            table_info.meta.region_numbers = table_route_value.region_numbers();
809            let table_id = table_info.ident.table_id;
810
811            // Creates table name.
812            let table_name = TableNameKey::new(
813                &table_info.catalog_name,
814                &table_info.schema_name,
815                &table_info.name,
816            );
817            let create_table_name_txn = self
818                .table_name_manager()
819                .build_create_txn(&table_name, table_id)?;
820            txns.push(create_table_name_txn);
821
822            // Creates table info.
823            let table_info_value = TableInfoValue::new(table_info);
824            let (create_table_info_txn, on_create_table_info_failure) =
825                self.table_info_manager()
826                    .build_create_txn(table_id, &table_info_value)?;
827            txns.push(create_table_info_txn);
828
829            let (create_table_route_txn, on_create_table_route_failure) = self
830                .table_route_manager()
831                .table_route_storage()
832                .build_create_txn(table_id, &table_route_value)?;
833            txns.push(create_table_route_txn);
834
835            on_failures.push(OnFailure {
836                table_info_value,
837                on_create_table_info_failure,
838                table_route_value,
839                on_create_table_route_failure,
840            });
841        }
842
843        let txn = Txn::merge_all(txns);
844        let mut r = self.kv_backend.txn(txn).await?;
845
846        // Checks whether metadata was already created.
847        if !r.succeeded {
848            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
849            for on_failure in on_failures {
850                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
851                    .context(error::UnexpectedSnafu {
852                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
853                    })?
854                    .into_inner();
855
856                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
857                    .context(error::UnexpectedSnafu {
858                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
859                    })?
860                    .into_inner();
861
862                let op_name = "the creating logical tables metadata";
863                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
864                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
865            }
866        }
867
868        Ok(())
869    }
870
871    fn table_metadata_keys(
872        &self,
873        table_id: TableId,
874        table_name: &TableName,
875        table_route_value: &TableRouteValue,
876        region_wal_options: &HashMap<RegionNumber, WalOptions>,
877    ) -> Result<Vec<Vec<u8>>> {
878        // Builds keys
879        let datanode_ids = if table_route_value.is_physical() {
880            region_distribution(table_route_value.region_routes()?)
881                .into_keys()
882                .collect()
883        } else {
884            vec![]
885        };
886        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
887        let table_name = TableNameKey::new(
888            &table_name.catalog_name,
889            &table_name.schema_name,
890            &table_name.table_name,
891        );
892        let table_info_key = TableInfoKey::new(table_id);
893        let table_route_key = TableRouteKey::new(table_id);
894        let datanode_table_keys = datanode_ids
895            .into_iter()
896            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
897            .collect::<HashSet<_>>();
898        let topic_region_map = self
899            .topic_region_manager
900            .get_topic_region_mapping(table_id, region_wal_options);
901        let topic_region_keys = topic_region_map
902            .iter()
903            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
904            .collect::<Vec<_>>();
905        keys.push(table_name.to_bytes());
906        keys.push(table_info_key.to_bytes());
907        keys.push(table_route_key.to_bytes());
908        for key in &datanode_table_keys {
909            keys.push(key.to_bytes());
910        }
911        for key in topic_region_keys {
912            keys.push(key.to_bytes());
913        }
914        Ok(keys)
915    }
916
917    /// Deletes metadata for table **logically**.
918    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
919    pub async fn delete_table_metadata(
920        &self,
921        table_id: TableId,
922        table_name: &TableName,
923        table_route_value: &TableRouteValue,
924        region_wal_options: &HashMap<RegionNumber, WalOptions>,
925    ) -> Result<()> {
926        let keys =
927            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
928        self.tombstone_manager.create(keys).await
929    }
930
931    /// Deletes metadata tombstone for table **permanently**.
932    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
933    pub async fn delete_table_metadata_tombstone(
934        &self,
935        table_id: TableId,
936        table_name: &TableName,
937        table_route_value: &TableRouteValue,
938        region_wal_options: &HashMap<RegionNumber, WalOptions>,
939    ) -> Result<()> {
940        let table_metadata_keys =
941            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
942        self.tombstone_manager.delete(table_metadata_keys).await
943    }
944
945    /// Restores metadata for table.
946    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
947    pub async fn restore_table_metadata(
948        &self,
949        table_id: TableId,
950        table_name: &TableName,
951        table_route_value: &TableRouteValue,
952        region_wal_options: &HashMap<RegionNumber, WalOptions>,
953    ) -> Result<()> {
954        let keys =
955            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
956        self.tombstone_manager.restore(keys).await
957    }
958
959    /// Deletes metadata for table **permanently**.
960    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
961    pub async fn destroy_table_metadata(
962        &self,
963        table_id: TableId,
964        table_name: &TableName,
965        table_route_value: &TableRouteValue,
966        region_wal_options: &HashMap<RegionNumber, WalOptions>,
967    ) -> Result<()> {
968        let keys =
969            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
970        let _ = self
971            .kv_backend
972            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
973            .await?;
974        Ok(())
975    }
976
977    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
978        let mut keys = Vec::with_capacity(3);
979        let view_name = TableNameKey::new(
980            &view_name.catalog_name,
981            &view_name.schema_name,
982            &view_name.table_name,
983        );
984        let table_info_key = TableInfoKey::new(view_id);
985        let view_info_key = ViewInfoKey::new(view_id);
986        keys.push(view_name.to_bytes());
987        keys.push(table_info_key.to_bytes());
988        keys.push(view_info_key.to_bytes());
989
990        Ok(keys)
991    }
992
993    /// Deletes metadata for view **permanently**.
994    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
995    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
996        let keys = self.view_info_keys(view_id, view_name)?;
997        let _ = self
998            .kv_backend
999            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1000            .await?;
1001        Ok(())
1002    }
1003
1004    /// Renames the table name and returns an error if different metadata exists.
1005    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1006    /// and the new `TableNameKey` MUST be empty.
1007    pub async fn rename_table(
1008        &self,
1009        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1010        new_table_name: String,
1011    ) -> Result<()> {
1012        let current_table_info = &current_table_info_value.table_info;
1013        let table_id = current_table_info.ident.table_id;
1014
1015        let table_name_key = TableNameKey::new(
1016            &current_table_info.catalog_name,
1017            &current_table_info.schema_name,
1018            &current_table_info.name,
1019        );
1020
1021        let new_table_name_key = TableNameKey::new(
1022            &current_table_info.catalog_name,
1023            &current_table_info.schema_name,
1024            &new_table_name,
1025        );
1026
1027        // Updates table name.
1028        let update_table_name_txn = self.table_name_manager().build_update_txn(
1029            &table_name_key,
1030            &new_table_name_key,
1031            table_id,
1032        )?;
1033
1034        let new_table_info_value = current_table_info_value
1035            .inner
1036            .with_update(move |table_info| {
1037                table_info.name = new_table_name;
1038            });
1039
1040        // Updates table info.
1041        let (update_table_info_txn, on_update_table_info_failure) = self
1042            .table_info_manager()
1043            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1044
1045        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1046
1047        let mut r = self.kv_backend.txn(txn).await?;
1048
1049        // Checks whether metadata was already updated.
1050        if !r.succeeded {
1051            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1052            let remote_table_info = on_update_table_info_failure(&mut set)?
1053                .context(error::UnexpectedSnafu {
1054                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1055                })?
1056                .into_inner();
1057
1058            let op_name = "the renaming table metadata";
1059            ensure_values!(remote_table_info, new_table_info_value, op_name);
1060        }
1061
1062        Ok(())
1063    }
1064
1065    /// Updates table info and returns an error if different metadata exists.
1066    /// And cascade-ly update all redundant table options for each region
1067    /// if region_distribution is present.
1068    pub async fn update_table_info(
1069        &self,
1070        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1071        region_distribution: Option<RegionDistribution>,
1072        new_table_info: RawTableInfo,
1073    ) -> Result<()> {
1074        let table_id = current_table_info_value.table_info.ident.table_id;
1075        let new_table_info_value = current_table_info_value.update(new_table_info);
1076
1077        // Updates table info.
1078        let (update_table_info_txn, on_update_table_info_failure) = self
1079            .table_info_manager()
1080            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1081
1082        let txn = if let Some(region_distribution) = region_distribution {
1083            // region options induced from table info.
1084            let new_region_options = new_table_info_value.table_info.to_region_options();
1085            let update_datanode_table_options_txn = self
1086                .datanode_table_manager
1087                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1088                .await?;
1089            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1090        } else {
1091            update_table_info_txn
1092        };
1093
1094        let mut r = self.kv_backend.txn(txn).await?;
1095        // Checks whether metadata was already updated.
1096        if !r.succeeded {
1097            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1098            let remote_table_info = on_update_table_info_failure(&mut set)?
1099                .context(error::UnexpectedSnafu {
1100                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1101                })?
1102                .into_inner();
1103
1104            let op_name = "the updating table info";
1105            ensure_values!(remote_table_info, new_table_info_value, op_name);
1106        }
1107        Ok(())
1108    }
1109
1110    /// Updates view info and returns an error if different metadata exists.
1111    /// Parameters include:
1112    /// - `view_id`: the view id
1113    /// - `current_view_info_value`: the current view info for CAS checking
1114    /// - `new_view_info`: the encoded logical plan
1115    /// - `table_names`: the resolved fully table names in logical plan
1116    /// - `columns`: the view columns
1117    /// - `plan_columns`: the original plan columns
1118    /// - `definition`: The SQL to create the view
1119    ///
1120    #[allow(clippy::too_many_arguments)]
1121    pub async fn update_view_info(
1122        &self,
1123        view_id: TableId,
1124        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1125        new_view_info: Vec<u8>,
1126        table_names: HashSet<TableName>,
1127        columns: Vec<String>,
1128        plan_columns: Vec<String>,
1129        definition: String,
1130    ) -> Result<()> {
1131        let new_view_info_value = current_view_info_value.update(
1132            new_view_info,
1133            table_names,
1134            columns,
1135            plan_columns,
1136            definition,
1137        );
1138
1139        // Updates view info.
1140        let (update_view_info_txn, on_update_view_info_failure) = self
1141            .view_info_manager()
1142            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1143
1144        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1145
1146        // Checks whether metadata was already updated.
1147        if !r.succeeded {
1148            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1149            let remote_view_info = on_update_view_info_failure(&mut set)?
1150                .context(error::UnexpectedSnafu {
1151                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1152                })?
1153                .into_inner();
1154
1155            let op_name = "the updating view info";
1156            ensure_values!(remote_view_info, new_view_info_value, op_name);
1157        }
1158        Ok(())
1159    }
1160
1161    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1162        self.kv_backend.max_txn_ops()
1163    }
1164
1165    pub async fn batch_update_table_info_values(
1166        &self,
1167        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1168    ) -> Result<()> {
1169        let len = table_info_value_pairs.len();
1170        let mut txns = Vec::with_capacity(len);
1171        struct OnFailure<F, R>
1172        where
1173            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1174        {
1175            table_info_value: TableInfoValue,
1176            on_update_table_info_failure: F,
1177        }
1178        let mut on_failures = Vec::with_capacity(len);
1179
1180        for (table_info_value, new_table_info) in table_info_value_pairs {
1181            let table_id = table_info_value.table_info.ident.table_id;
1182
1183            let new_table_info_value = table_info_value.update(new_table_info);
1184
1185            let (update_table_info_txn, on_update_table_info_failure) =
1186                self.table_info_manager().build_update_txn(
1187                    table_id,
1188                    &table_info_value,
1189                    &new_table_info_value,
1190                )?;
1191
1192            txns.push(update_table_info_txn);
1193
1194            on_failures.push(OnFailure {
1195                table_info_value: new_table_info_value,
1196                on_update_table_info_failure,
1197            });
1198        }
1199
1200        let txn = Txn::merge_all(txns);
1201        let mut r = self.kv_backend.txn(txn).await?;
1202
1203        if !r.succeeded {
1204            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1205            for on_failure in on_failures {
1206                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1207                    .context(error::UnexpectedSnafu {
1208                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1209                    })?
1210                    .into_inner();
1211
1212                let op_name = "the batch updating table info";
1213                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1214            }
1215        }
1216
1217        Ok(())
1218    }
1219
1220    pub async fn update_table_route(
1221        &self,
1222        table_id: TableId,
1223        region_info: RegionInfo,
1224        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1225        new_region_routes: Vec<RegionRoute>,
1226        new_region_options: &HashMap<String, String>,
1227        new_region_wal_options: &HashMap<RegionNumber, String>,
1228    ) -> Result<()> {
1229        // Updates the datanode table key value pairs.
1230        let current_region_distribution =
1231            region_distribution(current_table_route_value.region_routes()?);
1232        let new_region_distribution = region_distribution(&new_region_routes);
1233
1234        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1235            table_id,
1236            region_info,
1237            current_region_distribution,
1238            new_region_distribution,
1239            new_region_options,
1240            new_region_wal_options,
1241        )?;
1242
1243        // Updates the table_route.
1244        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1245
1246        let (update_table_route_txn, on_update_table_route_failure) = self
1247            .table_route_manager()
1248            .table_route_storage()
1249            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1250
1251        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1252
1253        let mut r = self.kv_backend.txn(txn).await?;
1254
1255        // Checks whether metadata was already updated.
1256        if !r.succeeded {
1257            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1258            let remote_table_route = on_update_table_route_failure(&mut set)?
1259                .context(error::UnexpectedSnafu {
1260                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1261                })?
1262                .into_inner();
1263
1264            let op_name = "the updating table route";
1265            ensure_values!(remote_table_route, new_table_route_value, op_name);
1266        }
1267
1268        Ok(())
1269    }
1270
1271    /// Updates the leader status of the [RegionRoute].
1272    pub async fn update_leader_region_status<F>(
1273        &self,
1274        table_id: TableId,
1275        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1276        next_region_route_status: F,
1277    ) -> Result<()>
1278    where
1279        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1280    {
1281        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1282
1283        let mut updated = 0;
1284        for route in &mut new_region_routes {
1285            if let Some(state) = next_region_route_status(route) {
1286                if route.set_leader_state(state) {
1287                    updated += 1;
1288                }
1289            }
1290        }
1291
1292        if updated == 0 {
1293            warn!("No leader status updated");
1294            return Ok(());
1295        }
1296
1297        // Updates the table_route.
1298        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299
1300        let (update_table_route_txn, on_update_table_route_failure) = self
1301            .table_route_manager()
1302            .table_route_storage()
1303            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1304
1305        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1306
1307        // Checks whether metadata was already updated.
1308        if !r.succeeded {
1309            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1310            let remote_table_route = on_update_table_route_failure(&mut set)?
1311                .context(error::UnexpectedSnafu {
1312                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1313                })?
1314                .into_inner();
1315
1316            let op_name = "the updating leader region status";
1317            ensure_values!(remote_table_route, new_table_route_value, op_name);
1318        }
1319
1320        Ok(())
1321    }
1322}
1323
1324#[macro_export]
1325macro_rules! impl_metadata_value {
1326    ($($val_ty: ty), *) => {
1327        $(
1328            impl $crate::key::MetadataValue for $val_ty {
1329                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1330                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1331                }
1332
1333                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1334                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1335                }
1336            }
1337        )*
1338    }
1339}
1340
1341macro_rules! impl_metadata_key_get_txn_op {
1342    ($($key: ty), *) => {
1343        $(
1344            impl $crate::key::MetadataKeyGetTxnOp for $key {
1345                /// Returns a [TxnOp] to retrieve the corresponding value
1346                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1347                fn build_get_op(
1348                    &self,
1349                ) -> (
1350                    TxnOp,
1351                    impl for<'a> FnMut(
1352                        &'a mut TxnOpGetResponseSet,
1353                    ) -> Option<Vec<u8>>,
1354                ) {
1355                    let raw_key = self.to_bytes();
1356                    (
1357                        TxnOp::Get(raw_key.clone()),
1358                        TxnOpGetResponseSet::filter(raw_key),
1359                    )
1360                }
1361            }
1362        )*
1363    }
1364}
1365
1366impl_metadata_key_get_txn_op! {
1367    TableNameKey<'_>,
1368    TableInfoKey,
1369    ViewInfoKey,
1370    TableRouteKey,
1371    DatanodeTableKey
1372}
1373
1374#[macro_export]
1375macro_rules! impl_optional_metadata_value {
1376    ($($val_ty: ty), *) => {
1377        $(
1378            impl $val_ty {
1379                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1380                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1381                }
1382
1383                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1384                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1385                }
1386            }
1387        )*
1388    }
1389}
1390
1391impl_metadata_value! {
1392    TableNameValue,
1393    TableInfoValue,
1394    ViewInfoValue,
1395    DatanodeTableValue,
1396    FlowInfoValue,
1397    FlowNameValue,
1398    FlowRouteValue,
1399    TableFlowValue,
1400    NodeAddressValue,
1401    SchemaNameValue,
1402    FlowStateValue,
1403    PoisonValue
1404}
1405
1406impl_optional_metadata_value! {
1407    CatalogNameValue,
1408    SchemaNameValue
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use std::collections::{BTreeMap, HashMap, HashSet};
1414    use std::sync::Arc;
1415
1416    use bytes::Bytes;
1417    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1418    use common_time::util::current_time_millis;
1419    use common_wal::options::{KafkaWalOptions, WalOptions};
1420    use futures::TryStreamExt;
1421    use store_api::storage::{RegionId, RegionNumber};
1422    use table::metadata::{RawTableInfo, TableInfo};
1423    use table::table_name::TableName;
1424
1425    use super::datanode_table::DatanodeTableKey;
1426    use super::test_utils;
1427    use crate::ddl::test_util::create_table::test_create_table_task;
1428    use crate::ddl::utils::region_storage_path;
1429    use crate::error::Result;
1430    use crate::key::datanode_table::RegionInfo;
1431    use crate::key::table_info::TableInfoValue;
1432    use crate::key::table_name::TableNameKey;
1433    use crate::key::table_route::TableRouteValue;
1434    use crate::key::{
1435        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1436        ViewInfoValue, TOPIC_REGION_PREFIX,
1437    };
1438    use crate::kv_backend::memory::MemoryKvBackend;
1439    use crate::kv_backend::KvBackend;
1440    use crate::peer::Peer;
1441    use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1442    use crate::rpc::store::RangeRequest;
1443    use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1444
1445    #[test]
1446    fn test_deserialized_value_with_bytes() {
1447        let region_route = new_test_region_route();
1448        let region_routes = vec![region_route.clone()];
1449
1450        let expected_region_routes =
1451            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1452        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1453
1454        // Serialize behaviors:
1455        // The inner field will be ignored.
1456        let value = DeserializedValueWithBytes {
1457            // ignored
1458            inner: TableRouteValue::physical(region_routes.clone()),
1459            bytes: Bytes::from(expected.clone()),
1460        };
1461
1462        let encoded = serde_json::to_vec(&value).unwrap();
1463
1464        // Deserialize behaviors:
1465        // The inner field will be deserialized from the bytes field.
1466        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1467            serde_json::from_slice(&encoded).unwrap();
1468
1469        assert_eq!(decoded.inner, expected_region_routes);
1470        assert_eq!(decoded.bytes, expected);
1471    }
1472
1473    fn new_test_region_route() -> RegionRoute {
1474        new_region_route(1, 2)
1475    }
1476
1477    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1478        RegionRoute {
1479            region: Region {
1480                id: region_id.into(),
1481                name: "r1".to_string(),
1482                partition: None,
1483                attrs: BTreeMap::new(),
1484            },
1485            leader_peer: Some(Peer::new(datanode, "a2")),
1486            follower_peers: vec![],
1487            leader_state: None,
1488            leader_down_since: None,
1489        }
1490    }
1491
1492    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1493        test_utils::new_test_table_info(10, region_numbers)
1494    }
1495
1496    fn new_test_table_names() -> HashSet<TableName> {
1497        let mut set = HashSet::new();
1498        set.insert(TableName {
1499            catalog_name: "greptime".to_string(),
1500            schema_name: "public".to_string(),
1501            table_name: "a_table".to_string(),
1502        });
1503        set.insert(TableName {
1504            catalog_name: "greptime".to_string(),
1505            schema_name: "public".to_string(),
1506            table_name: "b_table".to_string(),
1507        });
1508        set
1509    }
1510
1511    async fn create_physical_table_metadata(
1512        table_metadata_manager: &TableMetadataManager,
1513        table_info: RawTableInfo,
1514        region_routes: Vec<RegionRoute>,
1515        region_wal_options: HashMap<RegionNumber, String>,
1516    ) -> Result<()> {
1517        table_metadata_manager
1518            .create_table_metadata(
1519                table_info,
1520                TableRouteValue::physical(region_routes),
1521                region_wal_options,
1522            )
1523            .await
1524    }
1525
1526    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1527        let topics = (0..2)
1528            .map(|i| format!("greptimedb_topic{}", i))
1529            .collect::<Vec<_>>();
1530        let wal_options = topics
1531            .iter()
1532            .map(|topic| {
1533                WalOptions::Kafka(KafkaWalOptions {
1534                    topic: topic.clone(),
1535                })
1536            })
1537            .collect::<Vec<_>>();
1538
1539        (0..16)
1540            .enumerate()
1541            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1542            .collect()
1543    }
1544
1545    #[tokio::test]
1546    async fn test_raft_engine_topic_region_map() {
1547        let mem_kv = Arc::new(MemoryKvBackend::default());
1548        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1549        let region_route = new_test_region_route();
1550        let region_routes = &vec![region_route.clone()];
1551        let table_info: RawTableInfo =
1552            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1553        let wal_allocator = WalOptionsAllocator::RaftEngine;
1554        let regions = (0..16).collect();
1555        let region_wal_options =
1556            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1557        create_physical_table_metadata(
1558            &table_metadata_manager,
1559            table_info.clone(),
1560            region_routes.clone(),
1561            region_wal_options.clone(),
1562        )
1563        .await
1564        .unwrap();
1565
1566        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1567        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1568        let resp = mem_kv.range(range_req).await.unwrap();
1569        // Should be empty because the topic region map is empty for raft engine.
1570        assert!(resp.kvs.is_empty());
1571    }
1572
1573    #[tokio::test]
1574    async fn test_create_table_metadata() {
1575        let mem_kv = Arc::new(MemoryKvBackend::default());
1576        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1577        let region_route = new_test_region_route();
1578        let region_routes = &vec![region_route.clone()];
1579        let table_info: RawTableInfo =
1580            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1581        let region_wal_options = create_mock_region_wal_options()
1582            .into_iter()
1583            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1584            .collect::<HashMap<_, _>>();
1585
1586        // creates metadata.
1587        create_physical_table_metadata(
1588            &table_metadata_manager,
1589            table_info.clone(),
1590            region_routes.clone(),
1591            region_wal_options.clone(),
1592        )
1593        .await
1594        .unwrap();
1595
1596        // if metadata was already created, it should be ok.
1597        assert!(create_physical_table_metadata(
1598            &table_metadata_manager,
1599            table_info.clone(),
1600            region_routes.clone(),
1601            region_wal_options.clone(),
1602        )
1603        .await
1604        .is_ok());
1605
1606        let mut modified_region_routes = region_routes.clone();
1607        modified_region_routes.push(region_route.clone());
1608        // if remote metadata was exists, it should return an error.
1609        assert!(create_physical_table_metadata(
1610            &table_metadata_manager,
1611            table_info.clone(),
1612            modified_region_routes,
1613            region_wal_options.clone(),
1614        )
1615        .await
1616        .is_err());
1617
1618        let (remote_table_info, remote_table_route) = table_metadata_manager
1619            .get_full_table_info(10)
1620            .await
1621            .unwrap();
1622
1623        assert_eq!(
1624            remote_table_info.unwrap().into_inner().table_info,
1625            table_info
1626        );
1627        assert_eq!(
1628            remote_table_route
1629                .unwrap()
1630                .into_inner()
1631                .region_routes()
1632                .unwrap(),
1633            region_routes
1634        );
1635
1636        for i in 0..2 {
1637            let region_number = i as u32;
1638            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1639            let topic = format!("greptimedb_topic{}", i);
1640            let regions = table_metadata_manager
1641                .topic_region_manager
1642                .regions(&topic)
1643                .await
1644                .unwrap();
1645            assert_eq!(regions.len(), 8);
1646            assert_eq!(regions[0], region_id);
1647        }
1648    }
1649
1650    #[tokio::test]
1651    async fn test_create_logic_tables_metadata() {
1652        let mem_kv = Arc::new(MemoryKvBackend::default());
1653        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1654        let region_route = new_test_region_route();
1655        let region_routes = vec![region_route.clone()];
1656        let table_info: RawTableInfo =
1657            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1658        let table_id = table_info.ident.table_id;
1659        let table_route_value = TableRouteValue::physical(region_routes.clone());
1660
1661        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1662        // creates metadata.
1663        table_metadata_manager
1664            .create_logical_tables_metadata(tables_data.clone())
1665            .await
1666            .unwrap();
1667
1668        // if metadata was already created, it should be ok.
1669        assert!(table_metadata_manager
1670            .create_logical_tables_metadata(tables_data)
1671            .await
1672            .is_ok());
1673
1674        let mut modified_region_routes = region_routes.clone();
1675        modified_region_routes.push(new_region_route(2, 3));
1676        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1677        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1678        // if remote metadata was exists, it should return an error.
1679        assert!(table_metadata_manager
1680            .create_logical_tables_metadata(modified_tables_data)
1681            .await
1682            .is_err());
1683
1684        let (remote_table_info, remote_table_route) = table_metadata_manager
1685            .get_full_table_info(table_id)
1686            .await
1687            .unwrap();
1688
1689        assert_eq!(
1690            remote_table_info.unwrap().into_inner().table_info,
1691            table_info
1692        );
1693        assert_eq!(
1694            remote_table_route
1695                .unwrap()
1696                .into_inner()
1697                .region_routes()
1698                .unwrap(),
1699            &region_routes
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_create_many_logical_tables_metadata() {
1705        let kv_backend = Arc::new(MemoryKvBackend::default());
1706        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1707
1708        let mut tables_data = vec![];
1709        for i in 0..128 {
1710            let table_id = i + 1;
1711            let regin_number = table_id * 3;
1712            let region_id = RegionId::new(table_id, regin_number);
1713            let region_route = new_region_route(region_id.as_u64(), 2);
1714            let region_routes = vec![region_route.clone()];
1715            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1716                table_id,
1717                &format!("my_table_{}", table_id),
1718                region_routes.iter().map(|r| r.region.id.region_number()),
1719            )
1720            .into();
1721            let table_route_value = TableRouteValue::physical(region_routes.clone());
1722
1723            tables_data.push((table_info, table_route_value));
1724        }
1725
1726        // creates metadata.
1727        table_metadata_manager
1728            .create_logical_tables_metadata(tables_data)
1729            .await
1730            .unwrap();
1731    }
1732
1733    #[tokio::test]
1734    async fn test_delete_table_metadata() {
1735        let mem_kv = Arc::new(MemoryKvBackend::default());
1736        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1737        let region_route = new_test_region_route();
1738        let region_routes = &vec![region_route.clone()];
1739        let table_info: RawTableInfo =
1740            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1741        let table_id = table_info.ident.table_id;
1742        let datanode_id = 2;
1743        let region_wal_options = create_mock_region_wal_options();
1744        let serialized_region_wal_options = region_wal_options
1745            .iter()
1746            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1747            .collect::<HashMap<_, _>>();
1748
1749        // creates metadata.
1750        create_physical_table_metadata(
1751            &table_metadata_manager,
1752            table_info.clone(),
1753            region_routes.clone(),
1754            serialized_region_wal_options,
1755        )
1756        .await
1757        .unwrap();
1758
1759        let table_name = TableName::new(
1760            table_info.catalog_name,
1761            table_info.schema_name,
1762            table_info.name,
1763        );
1764        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1765        // deletes metadata.
1766        table_metadata_manager
1767            .delete_table_metadata(
1768                table_id,
1769                &table_name,
1770                table_route_value,
1771                &region_wal_options,
1772            )
1773            .await
1774            .unwrap();
1775        // Should be ignored.
1776        table_metadata_manager
1777            .delete_table_metadata(
1778                table_id,
1779                &table_name,
1780                table_route_value,
1781                &region_wal_options,
1782            )
1783            .await
1784            .unwrap();
1785        assert!(table_metadata_manager
1786            .table_info_manager()
1787            .get(table_id)
1788            .await
1789            .unwrap()
1790            .is_none());
1791        assert!(table_metadata_manager
1792            .table_route_manager()
1793            .table_route_storage()
1794            .get(table_id)
1795            .await
1796            .unwrap()
1797            .is_none());
1798        assert!(table_metadata_manager
1799            .datanode_table_manager()
1800            .tables(datanode_id)
1801            .try_collect::<Vec<_>>()
1802            .await
1803            .unwrap()
1804            .is_empty());
1805        // Checks removed values
1806        let table_info = table_metadata_manager
1807            .table_info_manager()
1808            .get(table_id)
1809            .await
1810            .unwrap();
1811        assert!(table_info.is_none());
1812        let table_route = table_metadata_manager
1813            .table_route_manager()
1814            .table_route_storage()
1815            .get(table_id)
1816            .await
1817            .unwrap();
1818        assert!(table_route.is_none());
1819        // Logical delete removes the topic region mapping as well.
1820        let regions = table_metadata_manager
1821            .topic_region_manager
1822            .regions("greptimedb_topic0")
1823            .await
1824            .unwrap();
1825        assert_eq!(regions.len(), 0);
1826        let regions = table_metadata_manager
1827            .topic_region_manager
1828            .regions("greptimedb_topic1")
1829            .await
1830            .unwrap();
1831        assert_eq!(regions.len(), 0);
1832    }
1833
1834    #[tokio::test]
1835    async fn test_rename_table() {
1836        let mem_kv = Arc::new(MemoryKvBackend::default());
1837        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1838        let region_route = new_test_region_route();
1839        let region_routes = vec![region_route.clone()];
1840        let table_info: RawTableInfo =
1841            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1842        let table_id = table_info.ident.table_id;
1843        // creates metadata.
1844        create_physical_table_metadata(
1845            &table_metadata_manager,
1846            table_info.clone(),
1847            region_routes.clone(),
1848            HashMap::new(),
1849        )
1850        .await
1851        .unwrap();
1852
1853        let new_table_name = "another_name".to_string();
1854        let table_info_value =
1855            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1856
1857        table_metadata_manager
1858            .rename_table(&table_info_value, new_table_name.clone())
1859            .await
1860            .unwrap();
1861        // if remote metadata was updated, it should be ok.
1862        table_metadata_manager
1863            .rename_table(&table_info_value, new_table_name.clone())
1864            .await
1865            .unwrap();
1866        let mut modified_table_info = table_info.clone();
1867        modified_table_info.name = "hi".to_string();
1868        let modified_table_info_value =
1869            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1870        // if the table_info_value is wrong, it should return an error.
1871        // The ABA problem.
1872        assert!(table_metadata_manager
1873            .rename_table(&modified_table_info_value, new_table_name.clone())
1874            .await
1875            .is_err());
1876
1877        let old_table_name = TableNameKey::new(
1878            &table_info.catalog_name,
1879            &table_info.schema_name,
1880            &table_info.name,
1881        );
1882        let new_table_name = TableNameKey::new(
1883            &table_info.catalog_name,
1884            &table_info.schema_name,
1885            &new_table_name,
1886        );
1887
1888        assert!(table_metadata_manager
1889            .table_name_manager()
1890            .get(old_table_name)
1891            .await
1892            .unwrap()
1893            .is_none());
1894
1895        assert_eq!(
1896            table_metadata_manager
1897                .table_name_manager()
1898                .get(new_table_name)
1899                .await
1900                .unwrap()
1901                .unwrap()
1902                .table_id(),
1903            table_id
1904        );
1905    }
1906
1907    #[tokio::test]
1908    async fn test_update_table_info() {
1909        let mem_kv = Arc::new(MemoryKvBackend::default());
1910        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1911        let region_route = new_test_region_route();
1912        let region_routes = vec![region_route.clone()];
1913        let table_info: RawTableInfo =
1914            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1915        let table_id = table_info.ident.table_id;
1916        // creates metadata.
1917        create_physical_table_metadata(
1918            &table_metadata_manager,
1919            table_info.clone(),
1920            region_routes.clone(),
1921            HashMap::new(),
1922        )
1923        .await
1924        .unwrap();
1925
1926        let mut new_table_info = table_info.clone();
1927        new_table_info.name = "hi".to_string();
1928        let current_table_info_value =
1929            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1930        // should be ok.
1931        table_metadata_manager
1932            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1933            .await
1934            .unwrap();
1935        // if table info was updated, it should be ok.
1936        table_metadata_manager
1937            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1938            .await
1939            .unwrap();
1940
1941        // updated table_info should equal the `new_table_info`
1942        let updated_table_info = table_metadata_manager
1943            .table_info_manager()
1944            .get(table_id)
1945            .await
1946            .unwrap()
1947            .unwrap()
1948            .into_inner();
1949        assert_eq!(updated_table_info.table_info, new_table_info);
1950
1951        let mut wrong_table_info = table_info.clone();
1952        wrong_table_info.name = "wrong".to_string();
1953        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1954            current_table_info_value.update(wrong_table_info),
1955        );
1956        // if the current_table_info_value is wrong, it should return an error.
1957        // The ABA problem.
1958        assert!(table_metadata_manager
1959            .update_table_info(&wrong_table_info_value, None, new_table_info)
1960            .await
1961            .is_err())
1962    }
1963
1964    #[tokio::test]
1965    async fn test_update_table_leader_region_status() {
1966        let mem_kv = Arc::new(MemoryKvBackend::default());
1967        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1968        let datanode = 1;
1969        let region_routes = vec![
1970            RegionRoute {
1971                region: Region {
1972                    id: 1.into(),
1973                    name: "r1".to_string(),
1974                    partition: None,
1975                    attrs: BTreeMap::new(),
1976                },
1977                leader_peer: Some(Peer::new(datanode, "a2")),
1978                leader_state: Some(LeaderState::Downgrading),
1979                follower_peers: vec![],
1980                leader_down_since: Some(current_time_millis()),
1981            },
1982            RegionRoute {
1983                region: Region {
1984                    id: 2.into(),
1985                    name: "r2".to_string(),
1986                    partition: None,
1987                    attrs: BTreeMap::new(),
1988                },
1989                leader_peer: Some(Peer::new(datanode, "a1")),
1990                leader_state: None,
1991                follower_peers: vec![],
1992                leader_down_since: None,
1993            },
1994        ];
1995        let table_info: RawTableInfo =
1996            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1997        let table_id = table_info.ident.table_id;
1998        let current_table_route_value = DeserializedValueWithBytes::from_inner(
1999            TableRouteValue::physical(region_routes.clone()),
2000        );
2001
2002        // creates metadata.
2003        create_physical_table_metadata(
2004            &table_metadata_manager,
2005            table_info.clone(),
2006            region_routes.clone(),
2007            HashMap::new(),
2008        )
2009        .await
2010        .unwrap();
2011
2012        table_metadata_manager
2013            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2014                if region_route.leader_state.is_some() {
2015                    None
2016                } else {
2017                    Some(Some(LeaderState::Downgrading))
2018                }
2019            })
2020            .await
2021            .unwrap();
2022
2023        let updated_route_value = table_metadata_manager
2024            .table_route_manager()
2025            .table_route_storage()
2026            .get(table_id)
2027            .await
2028            .unwrap()
2029            .unwrap();
2030
2031        assert_eq!(
2032            updated_route_value.region_routes().unwrap()[0].leader_state,
2033            Some(LeaderState::Downgrading)
2034        );
2035
2036        assert!(updated_route_value.region_routes().unwrap()[0]
2037            .leader_down_since
2038            .is_some());
2039
2040        assert_eq!(
2041            updated_route_value.region_routes().unwrap()[1].leader_state,
2042            Some(LeaderState::Downgrading)
2043        );
2044        assert!(updated_route_value.region_routes().unwrap()[1]
2045            .leader_down_since
2046            .is_some());
2047    }
2048
2049    async fn assert_datanode_table(
2050        table_metadata_manager: &TableMetadataManager,
2051        table_id: u32,
2052        region_routes: &[RegionRoute],
2053    ) {
2054        let region_distribution = region_distribution(region_routes);
2055        for (datanode, regions) in region_distribution {
2056            let got = table_metadata_manager
2057                .datanode_table_manager()
2058                .get(&DatanodeTableKey::new(datanode, table_id))
2059                .await
2060                .unwrap()
2061                .unwrap();
2062
2063            assert_eq!(got.regions, regions.leader_regions);
2064            assert_eq!(got.follower_regions, regions.follower_regions);
2065        }
2066    }
2067
2068    #[tokio::test]
2069    async fn test_update_table_route() {
2070        let mem_kv = Arc::new(MemoryKvBackend::default());
2071        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2072        let region_route = new_test_region_route();
2073        let region_routes = vec![region_route.clone()];
2074        let table_info: RawTableInfo =
2075            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2076        let table_id = table_info.ident.table_id;
2077        let engine = table_info.meta.engine.as_str();
2078        let region_storage_path =
2079            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2080        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2081            TableRouteValue::physical(region_routes.clone()),
2082        );
2083
2084        // creates metadata.
2085        create_physical_table_metadata(
2086            &table_metadata_manager,
2087            table_info.clone(),
2088            region_routes.clone(),
2089            HashMap::new(),
2090        )
2091        .await
2092        .unwrap();
2093
2094        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2095        let new_region_routes = vec![
2096            new_region_route(1, 1),
2097            new_region_route(2, 2),
2098            new_region_route(3, 3),
2099        ];
2100        // it should be ok.
2101        table_metadata_manager
2102            .update_table_route(
2103                table_id,
2104                RegionInfo {
2105                    engine: engine.to_string(),
2106                    region_storage_path: region_storage_path.to_string(),
2107                    region_options: HashMap::new(),
2108                    region_wal_options: HashMap::new(),
2109                },
2110                &current_table_route_value,
2111                new_region_routes.clone(),
2112                &HashMap::new(),
2113                &HashMap::new(),
2114            )
2115            .await
2116            .unwrap();
2117        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2118
2119        // if the table route was updated. it should be ok.
2120        table_metadata_manager
2121            .update_table_route(
2122                table_id,
2123                RegionInfo {
2124                    engine: engine.to_string(),
2125                    region_storage_path: region_storage_path.to_string(),
2126                    region_options: HashMap::new(),
2127                    region_wal_options: HashMap::new(),
2128                },
2129                &current_table_route_value,
2130                new_region_routes.clone(),
2131                &HashMap::new(),
2132                &HashMap::new(),
2133            )
2134            .await
2135            .unwrap();
2136
2137        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2138            current_table_route_value
2139                .inner
2140                .update(new_region_routes.clone())
2141                .unwrap(),
2142        );
2143        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2144        // it should be ok.
2145        table_metadata_manager
2146            .update_table_route(
2147                table_id,
2148                RegionInfo {
2149                    engine: engine.to_string(),
2150                    region_storage_path: region_storage_path.to_string(),
2151                    region_options: HashMap::new(),
2152                    region_wal_options: HashMap::new(),
2153                },
2154                &current_table_route_value,
2155                new_region_routes.clone(),
2156                &HashMap::new(),
2157                &HashMap::new(),
2158            )
2159            .await
2160            .unwrap();
2161        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2162
2163        // if the current_table_route_value is wrong, it should return an error.
2164        // The ABA problem.
2165        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2166            current_table_route_value
2167                .update(vec![
2168                    new_region_route(1, 1),
2169                    new_region_route(2, 2),
2170                    new_region_route(3, 3),
2171                    new_region_route(4, 4),
2172                ])
2173                .unwrap(),
2174        );
2175        assert!(table_metadata_manager
2176            .update_table_route(
2177                table_id,
2178                RegionInfo {
2179                    engine: engine.to_string(),
2180                    region_storage_path: region_storage_path.to_string(),
2181                    region_options: HashMap::new(),
2182                    region_wal_options: HashMap::new(),
2183                },
2184                &wrong_table_route_value,
2185                new_region_routes,
2186                &HashMap::new(),
2187                &HashMap::new(),
2188            )
2189            .await
2190            .is_err());
2191    }
2192
2193    #[tokio::test]
2194    async fn test_destroy_table_metadata() {
2195        let mem_kv = Arc::new(MemoryKvBackend::default());
2196        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2197        let table_id = 1025;
2198        let table_name = "foo";
2199        let task = test_create_table_task(table_name, table_id);
2200        let options = create_mock_region_wal_options();
2201        let serialized_options = options
2202            .iter()
2203            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2204            .collect::<HashMap<_, _>>();
2205        table_metadata_manager
2206            .create_table_metadata(
2207                task.table_info,
2208                TableRouteValue::physical(vec![
2209                    RegionRoute {
2210                        region: Region::new_test(RegionId::new(table_id, 1)),
2211                        leader_peer: Some(Peer::empty(1)),
2212                        follower_peers: vec![Peer::empty(5)],
2213                        leader_state: None,
2214                        leader_down_since: None,
2215                    },
2216                    RegionRoute {
2217                        region: Region::new_test(RegionId::new(table_id, 2)),
2218                        leader_peer: Some(Peer::empty(2)),
2219                        follower_peers: vec![Peer::empty(4)],
2220                        leader_state: None,
2221                        leader_down_since: None,
2222                    },
2223                    RegionRoute {
2224                        region: Region::new_test(RegionId::new(table_id, 3)),
2225                        leader_peer: Some(Peer::empty(3)),
2226                        follower_peers: vec![],
2227                        leader_state: None,
2228                        leader_down_since: None,
2229                    },
2230                ]),
2231                serialized_options,
2232            )
2233            .await
2234            .unwrap();
2235        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2236        let table_route_value = table_metadata_manager
2237            .table_route_manager
2238            .table_route_storage()
2239            .get_with_raw_bytes(table_id)
2240            .await
2241            .unwrap()
2242            .unwrap();
2243        table_metadata_manager
2244            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2245            .await
2246            .unwrap();
2247        assert!(mem_kv.is_empty());
2248    }
2249
2250    #[tokio::test]
2251    async fn test_restore_table_metadata() {
2252        let mem_kv = Arc::new(MemoryKvBackend::default());
2253        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2254        let table_id = 1025;
2255        let table_name = "foo";
2256        let task = test_create_table_task(table_name, table_id);
2257        let options = create_mock_region_wal_options();
2258        let serialized_options = options
2259            .iter()
2260            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2261            .collect::<HashMap<_, _>>();
2262        table_metadata_manager
2263            .create_table_metadata(
2264                task.table_info,
2265                TableRouteValue::physical(vec![
2266                    RegionRoute {
2267                        region: Region::new_test(RegionId::new(table_id, 1)),
2268                        leader_peer: Some(Peer::empty(1)),
2269                        follower_peers: vec![Peer::empty(5)],
2270                        leader_state: None,
2271                        leader_down_since: None,
2272                    },
2273                    RegionRoute {
2274                        region: Region::new_test(RegionId::new(table_id, 2)),
2275                        leader_peer: Some(Peer::empty(2)),
2276                        follower_peers: vec![Peer::empty(4)],
2277                        leader_state: None,
2278                        leader_down_since: None,
2279                    },
2280                    RegionRoute {
2281                        region: Region::new_test(RegionId::new(table_id, 3)),
2282                        leader_peer: Some(Peer::empty(3)),
2283                        follower_peers: vec![],
2284                        leader_state: None,
2285                        leader_down_since: None,
2286                    },
2287                ]),
2288                serialized_options,
2289            )
2290            .await
2291            .unwrap();
2292        let expected_result = mem_kv.dump();
2293        let table_route_value = table_metadata_manager
2294            .table_route_manager
2295            .table_route_storage()
2296            .get_with_raw_bytes(table_id)
2297            .await
2298            .unwrap()
2299            .unwrap();
2300        let region_routes = table_route_value.region_routes().unwrap();
2301        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2302        let table_route_value = TableRouteValue::physical(region_routes.clone());
2303        table_metadata_manager
2304            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2305            .await
2306            .unwrap();
2307        table_metadata_manager
2308            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2309            .await
2310            .unwrap();
2311        let kvs = mem_kv.dump();
2312        assert_eq!(kvs, expected_result);
2313        // Should be ignored.
2314        table_metadata_manager
2315            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2316            .await
2317            .unwrap();
2318        let kvs = mem_kv.dump();
2319        assert_eq!(kvs, expected_result);
2320    }
2321
2322    #[tokio::test]
2323    async fn test_create_update_view_info() {
2324        let mem_kv = Arc::new(MemoryKvBackend::default());
2325        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2326
2327        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2328
2329        let view_id = view_info.ident.table_id;
2330
2331        let logical_plan: Vec<u8> = vec![1, 2, 3];
2332        let columns = vec!["a".to_string()];
2333        let plan_columns = vec!["number".to_string()];
2334        let table_names = new_test_table_names();
2335        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2336
2337        // Create metadata
2338        table_metadata_manager
2339            .create_view_metadata(
2340                view_info.clone(),
2341                logical_plan.clone(),
2342                table_names.clone(),
2343                columns.clone(),
2344                plan_columns.clone(),
2345                definition.to_string(),
2346            )
2347            .await
2348            .unwrap();
2349
2350        {
2351            // assert view info
2352            let current_view_info = table_metadata_manager
2353                .view_info_manager()
2354                .get(view_id)
2355                .await
2356                .unwrap()
2357                .unwrap()
2358                .into_inner();
2359            assert_eq!(current_view_info.view_info, logical_plan);
2360            assert_eq!(current_view_info.table_names, table_names);
2361            assert_eq!(current_view_info.definition, definition);
2362            assert_eq!(current_view_info.columns, columns);
2363            assert_eq!(current_view_info.plan_columns, plan_columns);
2364            // assert table info
2365            let current_table_info = table_metadata_manager
2366                .table_info_manager()
2367                .get(view_id)
2368                .await
2369                .unwrap()
2370                .unwrap()
2371                .into_inner();
2372            assert_eq!(current_table_info.table_info, view_info);
2373        }
2374
2375        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2376        let new_table_names = {
2377            let mut set = HashSet::new();
2378            set.insert(TableName {
2379                catalog_name: "greptime".to_string(),
2380                schema_name: "public".to_string(),
2381                table_name: "b_table".to_string(),
2382            });
2383            set.insert(TableName {
2384                catalog_name: "greptime".to_string(),
2385                schema_name: "public".to_string(),
2386                table_name: "c_table".to_string(),
2387            });
2388            set
2389        };
2390        let new_columns = vec!["b".to_string()];
2391        let new_plan_columns = vec!["number2".to_string()];
2392        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2393
2394        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2395            logical_plan.clone(),
2396            table_names,
2397            columns,
2398            plan_columns,
2399            definition.to_string(),
2400        ));
2401        // should be ok.
2402        table_metadata_manager
2403            .update_view_info(
2404                view_id,
2405                &current_view_info_value,
2406                new_logical_plan.clone(),
2407                new_table_names.clone(),
2408                new_columns.clone(),
2409                new_plan_columns.clone(),
2410                new_definition.to_string(),
2411            )
2412            .await
2413            .unwrap();
2414        // if table info was updated, it should be ok.
2415        table_metadata_manager
2416            .update_view_info(
2417                view_id,
2418                &current_view_info_value,
2419                new_logical_plan.clone(),
2420                new_table_names.clone(),
2421                new_columns.clone(),
2422                new_plan_columns.clone(),
2423                new_definition.to_string(),
2424            )
2425            .await
2426            .unwrap();
2427
2428        // updated view_info should equal the `new_logical_plan`
2429        let updated_view_info = table_metadata_manager
2430            .view_info_manager()
2431            .get(view_id)
2432            .await
2433            .unwrap()
2434            .unwrap()
2435            .into_inner();
2436        assert_eq!(updated_view_info.view_info, new_logical_plan);
2437        assert_eq!(updated_view_info.table_names, new_table_names);
2438        assert_eq!(updated_view_info.definition, new_definition);
2439        assert_eq!(updated_view_info.columns, new_columns);
2440        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2441
2442        let wrong_view_info = logical_plan.clone();
2443        let wrong_definition = "wrong_definition";
2444        let wrong_view_info_value =
2445            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2446                wrong_view_info,
2447                new_table_names.clone(),
2448                new_columns.clone(),
2449                new_plan_columns.clone(),
2450                wrong_definition.to_string(),
2451            ));
2452        // if the current_view_info_value is wrong, it should return an error.
2453        // The ABA problem.
2454        assert!(table_metadata_manager
2455            .update_view_info(
2456                view_id,
2457                &wrong_view_info_value,
2458                new_logical_plan.clone(),
2459                new_table_names.clone(),
2460                vec!["c".to_string()],
2461                vec!["number3".to_string()],
2462                wrong_definition.to_string(),
2463            )
2464            .await
2465            .is_err());
2466
2467        // The view_info is not changed.
2468        let current_view_info = table_metadata_manager
2469            .view_info_manager()
2470            .get(view_id)
2471            .await
2472            .unwrap()
2473            .unwrap()
2474            .into_inner();
2475        assert_eq!(current_view_info.view_info, new_logical_plan);
2476        assert_eq!(current_view_info.table_names, new_table_names);
2477        assert_eq!(current_view_info.definition, new_definition);
2478        assert_eq!(current_view_info.columns, new_columns);
2479        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2480    }
2481
2482    #[test]
2483    fn test_region_role_set_deserialize() {
2484        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2485        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2486        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2487        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2488
2489        let s = r#"[1, 2, 3]"#;
2490        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2491        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2492        assert!(region_role_set.follower_regions.is_empty());
2493    }
2494
2495    #[test]
2496    fn test_region_distribution_deserialize() {
2497        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2498        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2499        assert_eq!(region_distribution.len(), 2);
2500        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2501        assert!(region_distribution[&1].follower_regions.is_empty());
2502        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2503        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2504    }
2505}