common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod maintenance;
104pub mod node_address;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const MAINTENANCE_KEY: &str = "__maintenance";
168
169pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
170pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
171pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
172pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
173pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
174pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
175pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
176pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
177pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
178// The legacy topic key prefix is used to store the topic name in previous versions.
179pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
180pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
181
182/// The keys with these prefixes will be loaded into the cache when the leader starts.
183pub const CACHE_KEY_PREFIXES: [&str; 5] = [
184    TABLE_NAME_KEY_PREFIX,
185    CATALOG_NAME_KEY_PREFIX,
186    SCHEMA_NAME_KEY_PREFIX,
187    TABLE_ROUTE_PREFIX,
188    NODE_ADDRESS_PREFIX,
189];
190
191pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
192
193/// The id of flow.
194pub type FlowId = u32;
195/// The partition of flow.
196pub type FlowPartitionId = u32;
197
198lazy_static! {
199    pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
200}
201
202lazy_static! {
203    static ref TABLE_INFO_KEY_PATTERN: Regex =
204        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
205}
206
207lazy_static! {
208    static ref VIEW_INFO_KEY_PATTERN: Regex =
209        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
210}
211
212lazy_static! {
213    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
214        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
215}
216
217lazy_static! {
218    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
219        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
220}
221
222lazy_static! {
223    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
224        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
225    ))
226    .unwrap();
227}
228
229lazy_static! {
230    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
231    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
232        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
233    ))
234        .unwrap();
235}
236
237lazy_static! {
238    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
239    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
240        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
241    ))
242        .unwrap();
243}
244
245lazy_static! {
246    static ref NODE_ADDRESS_PATTERN: Regex =
247        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
248}
249
250lazy_static! {
251    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
252        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
253}
254
255lazy_static! {
256    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
257        "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
258    ))
259    .unwrap();
260}
261
262/// The key of metadata.
263pub trait MetadataKey<'a, T> {
264    fn to_bytes(&self) -> Vec<u8>;
265
266    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
267}
268
269#[derive(Debug, Clone, PartialEq)]
270pub struct BytesAdapter(Vec<u8>);
271
272impl From<Vec<u8>> for BytesAdapter {
273    fn from(value: Vec<u8>) -> Self {
274        Self(value)
275    }
276}
277
278impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
279    fn to_bytes(&self) -> Vec<u8> {
280        self.0.clone()
281    }
282
283    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
284        Ok(BytesAdapter(bytes.to_vec()))
285    }
286}
287
288pub(crate) trait MetadataKeyGetTxnOp {
289    fn build_get_op(
290        &self,
291    ) -> (
292        TxnOp,
293        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
294    );
295}
296
297pub trait MetadataValue {
298    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
299    where
300        Self: Sized;
301
302    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
303}
304
305pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
306
307pub struct TableMetadataManager {
308    table_name_manager: TableNameManager,
309    table_info_manager: TableInfoManager,
310    view_info_manager: ViewInfoManager,
311    datanode_table_manager: DatanodeTableManager,
312    catalog_manager: CatalogManager,
313    schema_manager: SchemaManager,
314    table_route_manager: TableRouteManager,
315    tombstone_manager: TombstoneManager,
316    topic_name_manager: TopicNameManager,
317    topic_region_manager: TopicRegionManager,
318    kv_backend: KvBackendRef,
319}
320
321#[macro_export]
322macro_rules! ensure_values {
323    ($got:expr, $expected_value:expr, $name:expr) => {
324        ensure!(
325            $got == $expected_value,
326            error::UnexpectedSnafu {
327                err_msg: format!(
328                    "Reads the different value: {:?} during {}, expected: {:?}",
329                    $got, $name, $expected_value
330                )
331            }
332        );
333    };
334}
335
336/// A struct containing a deserialized value(`inner`) and an original bytes.
337///
338/// - Serialize behaviors:
339///
340/// The `inner` field will be ignored.
341///
342/// - Deserialize behaviors:
343///
344/// The `inner` field will be deserialized from the `bytes` field.
345pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
346    // The original bytes of the inner.
347    bytes: Bytes,
348    // The value was deserialized from the original bytes.
349    inner: T,
350}
351
352impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
353    type Target = T;
354
355    fn deref(&self) -> &Self::Target {
356        &self.inner
357    }
358}
359
360impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
361    fn deref_mut(&mut self) -> &mut Self::Target {
362        &mut self.inner
363    }
364}
365
366impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        write!(
369            f,
370            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
371            self.inner, self.bytes
372        )
373    }
374}
375
376impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
377    /// - Serialize behaviors:
378    ///
379    /// The `inner` field will be ignored.
380    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
381    where
382        S: serde::Serializer,
383    {
384        // Safety: The original bytes are always JSON encoded.
385        // It's more efficiently than `serialize_bytes`.
386        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
387    }
388}
389
390impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
391    for DeserializedValueWithBytes<T>
392{
393    /// - Deserialize behaviors:
394    ///
395    /// The `inner` field will be deserialized from the `bytes` field.
396    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
397    where
398        D: serde::Deserializer<'de>,
399    {
400        let buf = String::deserialize(deserializer)?;
401        let bytes = Bytes::from(buf);
402
403        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
404            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
405
406        Ok(value)
407    }
408}
409
410impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
411    fn clone(&self) -> Self {
412        Self {
413            bytes: self.bytes.clone(),
414            inner: self.inner.clone(),
415        }
416    }
417}
418
419impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
420    /// Returns a struct containing a deserialized value and an original `bytes`.
421    /// It accepts original bytes of inner.
422    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
423        let inner = T::try_from_raw_value(&bytes)?;
424        Ok(Self { bytes, inner })
425    }
426
427    /// Returns a struct containing a deserialized value and an original `bytes`.
428    /// It accepts original bytes of inner.
429    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
430        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
431    }
432
433    pub fn into_inner(self) -> T {
434        self.inner
435    }
436
437    pub fn get_inner_ref(&self) -> &T {
438        &self.inner
439    }
440
441    /// Returns original `bytes`
442    pub fn get_raw_bytes(&self) -> Vec<u8> {
443        self.bytes.to_vec()
444    }
445
446    #[cfg(any(test, feature = "testing"))]
447    pub fn from_inner(inner: T) -> Self {
448        let bytes = serde_json::to_vec(&inner).unwrap();
449
450        Self {
451            bytes: Bytes::from(bytes),
452            inner,
453        }
454    }
455}
456
457impl TableMetadataManager {
458    pub fn new(kv_backend: KvBackendRef) -> Self {
459        TableMetadataManager {
460            table_name_manager: TableNameManager::new(kv_backend.clone()),
461            table_info_manager: TableInfoManager::new(kv_backend.clone()),
462            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
463            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
464            catalog_manager: CatalogManager::new(kv_backend.clone()),
465            schema_manager: SchemaManager::new(kv_backend.clone()),
466            table_route_manager: TableRouteManager::new(kv_backend.clone()),
467            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
468            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
469            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
470            kv_backend,
471        }
472    }
473
474    pub async fn init(&self) -> Result<()> {
475        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
476
477        self.catalog_manager().create(catalog_name, true).await?;
478
479        let internal_schemas = [
480            DEFAULT_SCHEMA_NAME,
481            INFORMATION_SCHEMA_NAME,
482            DEFAULT_PRIVATE_SCHEMA_NAME,
483        ];
484
485        for schema_name in internal_schemas {
486            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
487
488            self.schema_manager().create(schema_key, None, true).await?;
489        }
490
491        Ok(())
492    }
493
494    pub fn table_name_manager(&self) -> &TableNameManager {
495        &self.table_name_manager
496    }
497
498    pub fn table_info_manager(&self) -> &TableInfoManager {
499        &self.table_info_manager
500    }
501
502    pub fn view_info_manager(&self) -> &ViewInfoManager {
503        &self.view_info_manager
504    }
505
506    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
507        &self.datanode_table_manager
508    }
509
510    pub fn catalog_manager(&self) -> &CatalogManager {
511        &self.catalog_manager
512    }
513
514    pub fn schema_manager(&self) -> &SchemaManager {
515        &self.schema_manager
516    }
517
518    pub fn table_route_manager(&self) -> &TableRouteManager {
519        &self.table_route_manager
520    }
521
522    pub fn topic_name_manager(&self) -> &TopicNameManager {
523        &self.topic_name_manager
524    }
525
526    pub fn topic_region_manager(&self) -> &TopicRegionManager {
527        &self.topic_region_manager
528    }
529
530    #[cfg(feature = "testing")]
531    pub fn kv_backend(&self) -> &KvBackendRef {
532        &self.kv_backend
533    }
534
535    pub async fn get_full_table_info(
536        &self,
537        table_id: TableId,
538    ) -> Result<(
539        Option<DeserializedValueWithBytes<TableInfoValue>>,
540        Option<DeserializedValueWithBytes<TableRouteValue>>,
541    )> {
542        let table_info_key = TableInfoKey::new(table_id);
543        let table_route_key = TableRouteKey::new(table_id);
544        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
545        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
546
547        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
548        let mut res = self.kv_backend.txn(txn).await?;
549        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
550        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
551        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
552        Ok((table_info_value, table_route_value))
553    }
554
555    /// Creates metadata for view and returns an error if different metadata exists.
556    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
557    /// Parameters include:
558    /// - `view_info`: the encoded logical plan
559    /// - `table_names`: the resolved fully table names in logical plan
560    /// - `columns`: the view columns
561    /// - `plan_columns`: the original plan columns
562    /// - `definition`: The SQL to create the view
563    ///
564    pub async fn create_view_metadata(
565        &self,
566        view_info: RawTableInfo,
567        raw_logical_plan: Vec<u8>,
568        table_names: HashSet<TableName>,
569        columns: Vec<String>,
570        plan_columns: Vec<String>,
571        definition: String,
572    ) -> Result<()> {
573        let view_id = view_info.ident.table_id;
574
575        // Creates view name.
576        let view_name = TableNameKey::new(
577            &view_info.catalog_name,
578            &view_info.schema_name,
579            &view_info.name,
580        );
581        let create_table_name_txn = self
582            .table_name_manager()
583            .build_create_txn(&view_name, view_id)?;
584
585        // Creates table info.
586        let table_info_value = TableInfoValue::new(view_info);
587
588        let (create_table_info_txn, on_create_table_info_failure) = self
589            .table_info_manager()
590            .build_create_txn(view_id, &table_info_value)?;
591
592        // Creates view info
593        let view_info_value = ViewInfoValue::new(
594            raw_logical_plan,
595            table_names,
596            columns,
597            plan_columns,
598            definition,
599        );
600        let (create_view_info_txn, on_create_view_info_failure) = self
601            .view_info_manager()
602            .build_create_txn(view_id, &view_info_value)?;
603
604        let txn = Txn::merge_all(vec![
605            create_table_name_txn,
606            create_table_info_txn,
607            create_view_info_txn,
608        ]);
609
610        let mut r = self.kv_backend.txn(txn).await?;
611
612        // Checks whether metadata was already created.
613        if !r.succeeded {
614            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
615            let remote_table_info = on_create_table_info_failure(&mut set)?
616                .context(error::UnexpectedSnafu {
617                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
618                })?
619                .into_inner();
620
621            let remote_view_info = on_create_view_info_failure(&mut set)?
622                .context(error::UnexpectedSnafu {
623                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
624                })?
625                .into_inner();
626
627            let op_name = "the creating view metadata";
628            ensure_values!(remote_table_info, table_info_value, op_name);
629            ensure_values!(remote_view_info, view_info_value, op_name);
630        }
631
632        Ok(())
633    }
634
635    /// Creates metadata for table and returns an error if different metadata exists.
636    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
637    pub async fn create_table_metadata(
638        &self,
639        mut table_info: RawTableInfo,
640        table_route_value: TableRouteValue,
641        region_wal_options: HashMap<RegionNumber, String>,
642    ) -> Result<()> {
643        let region_numbers = table_route_value.region_numbers();
644        table_info.meta.region_numbers = region_numbers;
645        let table_id = table_info.ident.table_id;
646        let engine = table_info.meta.engine.clone();
647
648        // Creates table name.
649        let table_name = TableNameKey::new(
650            &table_info.catalog_name,
651            &table_info.schema_name,
652            &table_info.name,
653        );
654        let create_table_name_txn = self
655            .table_name_manager()
656            .build_create_txn(&table_name, table_id)?;
657
658        let region_options = table_info.to_region_options();
659        // Creates table info.
660        let table_info_value = TableInfoValue::new(table_info);
661        let (create_table_info_txn, on_create_table_info_failure) = self
662            .table_info_manager()
663            .build_create_txn(table_id, &table_info_value)?;
664
665        let (create_table_route_txn, on_create_table_route_failure) = self
666            .table_route_manager()
667            .table_route_storage()
668            .build_create_txn(table_id, &table_route_value)?;
669
670        let create_topic_region_txn = self
671            .topic_region_manager
672            .build_create_txn(table_id, &region_wal_options)?;
673
674        let mut txn = Txn::merge_all(vec![
675            create_table_name_txn,
676            create_table_info_txn,
677            create_table_route_txn,
678            create_topic_region_txn,
679        ]);
680
681        if let TableRouteValue::Physical(x) = &table_route_value {
682            let region_storage_path = table_info_value.region_storage_path();
683            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
684                table_id,
685                &engine,
686                &region_storage_path,
687                region_options,
688                region_wal_options,
689                region_distribution(&x.region_routes),
690            )?;
691            txn = txn.merge(create_datanode_table_txn);
692        }
693
694        let mut r = self.kv_backend.txn(txn).await?;
695
696        // Checks whether metadata was already created.
697        if !r.succeeded {
698            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
699            let remote_table_info = on_create_table_info_failure(&mut set)?
700                .context(error::UnexpectedSnafu {
701                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
702                })?
703                .into_inner();
704
705            let remote_table_route = on_create_table_route_failure(&mut set)?
706                .context(error::UnexpectedSnafu {
707                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
708                })?
709                .into_inner();
710
711            let op_name = "the creating table metadata";
712            ensure_values!(remote_table_info, table_info_value, op_name);
713            ensure_values!(remote_table_route, table_route_value, op_name);
714        }
715
716        Ok(())
717    }
718
719    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
720        // The batch size is max_txn_size / 3 because the size of the `tables_data`
721        // is 3 times the size of the `tables_data`.
722        self.kv_backend.max_txn_ops() / 3
723    }
724
725    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
726    pub async fn create_logical_tables_metadata(
727        &self,
728        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
729    ) -> Result<()> {
730        let len = tables_data.len();
731        let mut txns = Vec::with_capacity(3 * len);
732        struct OnFailure<F1, R1, F2, R2>
733        where
734            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
735            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
736        {
737            table_info_value: TableInfoValue,
738            on_create_table_info_failure: F1,
739            table_route_value: TableRouteValue,
740            on_create_table_route_failure: F2,
741        }
742        let mut on_failures = Vec::with_capacity(len);
743        for (mut table_info, table_route_value) in tables_data {
744            table_info.meta.region_numbers = table_route_value.region_numbers();
745            let table_id = table_info.ident.table_id;
746
747            // Creates table name.
748            let table_name = TableNameKey::new(
749                &table_info.catalog_name,
750                &table_info.schema_name,
751                &table_info.name,
752            );
753            let create_table_name_txn = self
754                .table_name_manager()
755                .build_create_txn(&table_name, table_id)?;
756            txns.push(create_table_name_txn);
757
758            // Creates table info.
759            let table_info_value = TableInfoValue::new(table_info);
760            let (create_table_info_txn, on_create_table_info_failure) =
761                self.table_info_manager()
762                    .build_create_txn(table_id, &table_info_value)?;
763            txns.push(create_table_info_txn);
764
765            let (create_table_route_txn, on_create_table_route_failure) = self
766                .table_route_manager()
767                .table_route_storage()
768                .build_create_txn(table_id, &table_route_value)?;
769            txns.push(create_table_route_txn);
770
771            on_failures.push(OnFailure {
772                table_info_value,
773                on_create_table_info_failure,
774                table_route_value,
775                on_create_table_route_failure,
776            });
777        }
778
779        let txn = Txn::merge_all(txns);
780        let mut r = self.kv_backend.txn(txn).await?;
781
782        // Checks whether metadata was already created.
783        if !r.succeeded {
784            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
785            for on_failure in on_failures {
786                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
787                    .context(error::UnexpectedSnafu {
788                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
789                    })?
790                    .into_inner();
791
792                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
793                    .context(error::UnexpectedSnafu {
794                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
795                    })?
796                    .into_inner();
797
798                let op_name = "the creating logical tables metadata";
799                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
800                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
801            }
802        }
803
804        Ok(())
805    }
806
807    fn table_metadata_keys(
808        &self,
809        table_id: TableId,
810        table_name: &TableName,
811        table_route_value: &TableRouteValue,
812        region_wal_options: &HashMap<RegionNumber, WalOptions>,
813    ) -> Result<Vec<Vec<u8>>> {
814        // Builds keys
815        let datanode_ids = if table_route_value.is_physical() {
816            region_distribution(table_route_value.region_routes()?)
817                .into_keys()
818                .collect()
819        } else {
820            vec![]
821        };
822        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
823        let table_name = TableNameKey::new(
824            &table_name.catalog_name,
825            &table_name.schema_name,
826            &table_name.table_name,
827        );
828        let table_info_key = TableInfoKey::new(table_id);
829        let table_route_key = TableRouteKey::new(table_id);
830        let datanode_table_keys = datanode_ids
831            .into_iter()
832            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
833            .collect::<HashSet<_>>();
834        let topic_region_map = self
835            .topic_region_manager
836            .get_topic_region_mapping(table_id, region_wal_options);
837        let topic_region_keys = topic_region_map
838            .iter()
839            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
840            .collect::<Vec<_>>();
841        keys.push(table_name.to_bytes());
842        keys.push(table_info_key.to_bytes());
843        keys.push(table_route_key.to_bytes());
844        for key in &datanode_table_keys {
845            keys.push(key.to_bytes());
846        }
847        for key in topic_region_keys {
848            keys.push(key.to_bytes());
849        }
850        Ok(keys)
851    }
852
853    /// Deletes metadata for table **logically**.
854    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
855    pub async fn delete_table_metadata(
856        &self,
857        table_id: TableId,
858        table_name: &TableName,
859        table_route_value: &TableRouteValue,
860        region_wal_options: &HashMap<RegionNumber, WalOptions>,
861    ) -> Result<()> {
862        let keys =
863            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
864        self.tombstone_manager.create(keys).await
865    }
866
867    /// Deletes metadata tombstone for table **permanently**.
868    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
869    pub async fn delete_table_metadata_tombstone(
870        &self,
871        table_id: TableId,
872        table_name: &TableName,
873        table_route_value: &TableRouteValue,
874        region_wal_options: &HashMap<RegionNumber, WalOptions>,
875    ) -> Result<()> {
876        let table_metadata_keys =
877            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
878        self.tombstone_manager.delete(table_metadata_keys).await
879    }
880
881    /// Restores metadata for table.
882    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
883    pub async fn restore_table_metadata(
884        &self,
885        table_id: TableId,
886        table_name: &TableName,
887        table_route_value: &TableRouteValue,
888        region_wal_options: &HashMap<RegionNumber, WalOptions>,
889    ) -> Result<()> {
890        let keys =
891            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
892        self.tombstone_manager.restore(keys).await
893    }
894
895    /// Deletes metadata for table **permanently**.
896    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
897    pub async fn destroy_table_metadata(
898        &self,
899        table_id: TableId,
900        table_name: &TableName,
901        table_route_value: &TableRouteValue,
902        region_wal_options: &HashMap<RegionNumber, WalOptions>,
903    ) -> Result<()> {
904        let keys =
905            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
906        let _ = self
907            .kv_backend
908            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
909            .await?;
910        Ok(())
911    }
912
913    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
914        let mut keys = Vec::with_capacity(3);
915        let view_name = TableNameKey::new(
916            &view_name.catalog_name,
917            &view_name.schema_name,
918            &view_name.table_name,
919        );
920        let table_info_key = TableInfoKey::new(view_id);
921        let view_info_key = ViewInfoKey::new(view_id);
922        keys.push(view_name.to_bytes());
923        keys.push(table_info_key.to_bytes());
924        keys.push(view_info_key.to_bytes());
925
926        Ok(keys)
927    }
928
929    /// Deletes metadata for view **permanently**.
930    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
931    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
932        let keys = self.view_info_keys(view_id, view_name)?;
933        let _ = self
934            .kv_backend
935            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
936            .await?;
937        Ok(())
938    }
939
940    /// Renames the table name and returns an error if different metadata exists.
941    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
942    /// and the new `TableNameKey` MUST be empty.
943    pub async fn rename_table(
944        &self,
945        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
946        new_table_name: String,
947    ) -> Result<()> {
948        let current_table_info = &current_table_info_value.table_info;
949        let table_id = current_table_info.ident.table_id;
950
951        let table_name_key = TableNameKey::new(
952            &current_table_info.catalog_name,
953            &current_table_info.schema_name,
954            &current_table_info.name,
955        );
956
957        let new_table_name_key = TableNameKey::new(
958            &current_table_info.catalog_name,
959            &current_table_info.schema_name,
960            &new_table_name,
961        );
962
963        // Updates table name.
964        let update_table_name_txn = self.table_name_manager().build_update_txn(
965            &table_name_key,
966            &new_table_name_key,
967            table_id,
968        )?;
969
970        let new_table_info_value = current_table_info_value
971            .inner
972            .with_update(move |table_info| {
973                table_info.name = new_table_name;
974            });
975
976        // Updates table info.
977        let (update_table_info_txn, on_update_table_info_failure) = self
978            .table_info_manager()
979            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
980
981        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
982
983        let mut r = self.kv_backend.txn(txn).await?;
984
985        // Checks whether metadata was already updated.
986        if !r.succeeded {
987            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
988            let remote_table_info = on_update_table_info_failure(&mut set)?
989                .context(error::UnexpectedSnafu {
990                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
991                })?
992                .into_inner();
993
994            let op_name = "the renaming table metadata";
995            ensure_values!(remote_table_info, new_table_info_value, op_name);
996        }
997
998        Ok(())
999    }
1000
1001    /// Updates table info and returns an error if different metadata exists.
1002    /// And cascade-ly update all redundant table options for each region
1003    /// if region_distribution is present.
1004    pub async fn update_table_info(
1005        &self,
1006        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1007        region_distribution: Option<RegionDistribution>,
1008        new_table_info: RawTableInfo,
1009    ) -> Result<()> {
1010        let table_id = current_table_info_value.table_info.ident.table_id;
1011        let new_table_info_value = current_table_info_value.update(new_table_info);
1012
1013        // Updates table info.
1014        let (update_table_info_txn, on_update_table_info_failure) = self
1015            .table_info_manager()
1016            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1017
1018        let txn = if let Some(region_distribution) = region_distribution {
1019            // region options induced from table info.
1020            let new_region_options = new_table_info_value.table_info.to_region_options();
1021            let update_datanode_table_options_txn = self
1022                .datanode_table_manager
1023                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1024                .await?;
1025            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1026        } else {
1027            update_table_info_txn
1028        };
1029
1030        let mut r = self.kv_backend.txn(txn).await?;
1031        // Checks whether metadata was already updated.
1032        if !r.succeeded {
1033            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1034            let remote_table_info = on_update_table_info_failure(&mut set)?
1035                .context(error::UnexpectedSnafu {
1036                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1037                })?
1038                .into_inner();
1039
1040            let op_name = "the updating table info";
1041            ensure_values!(remote_table_info, new_table_info_value, op_name);
1042        }
1043        Ok(())
1044    }
1045
1046    /// Updates view info and returns an error if different metadata exists.
1047    /// Parameters include:
1048    /// - `view_id`: the view id
1049    /// - `current_view_info_value`: the current view info for CAS checking
1050    /// - `new_view_info`: the encoded logical plan
1051    /// - `table_names`: the resolved fully table names in logical plan
1052    /// - `columns`: the view columns
1053    /// - `plan_columns`: the original plan columns
1054    /// - `definition`: The SQL to create the view
1055    ///
1056    #[allow(clippy::too_many_arguments)]
1057    pub async fn update_view_info(
1058        &self,
1059        view_id: TableId,
1060        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1061        new_view_info: Vec<u8>,
1062        table_names: HashSet<TableName>,
1063        columns: Vec<String>,
1064        plan_columns: Vec<String>,
1065        definition: String,
1066    ) -> Result<()> {
1067        let new_view_info_value = current_view_info_value.update(
1068            new_view_info,
1069            table_names,
1070            columns,
1071            plan_columns,
1072            definition,
1073        );
1074
1075        // Updates view info.
1076        let (update_view_info_txn, on_update_view_info_failure) = self
1077            .view_info_manager()
1078            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1079
1080        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1081
1082        // Checks whether metadata was already updated.
1083        if !r.succeeded {
1084            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1085            let remote_view_info = on_update_view_info_failure(&mut set)?
1086                .context(error::UnexpectedSnafu {
1087                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1088                })?
1089                .into_inner();
1090
1091            let op_name = "the updating view info";
1092            ensure_values!(remote_view_info, new_view_info_value, op_name);
1093        }
1094        Ok(())
1095    }
1096
1097    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1098        self.kv_backend.max_txn_ops()
1099    }
1100
1101    pub async fn batch_update_table_info_values(
1102        &self,
1103        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1104    ) -> Result<()> {
1105        let len = table_info_value_pairs.len();
1106        let mut txns = Vec::with_capacity(len);
1107        struct OnFailure<F, R>
1108        where
1109            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1110        {
1111            table_info_value: TableInfoValue,
1112            on_update_table_info_failure: F,
1113        }
1114        let mut on_failures = Vec::with_capacity(len);
1115
1116        for (table_info_value, new_table_info) in table_info_value_pairs {
1117            let table_id = table_info_value.table_info.ident.table_id;
1118
1119            let new_table_info_value = table_info_value.update(new_table_info);
1120
1121            let (update_table_info_txn, on_update_table_info_failure) =
1122                self.table_info_manager().build_update_txn(
1123                    table_id,
1124                    &table_info_value,
1125                    &new_table_info_value,
1126                )?;
1127
1128            txns.push(update_table_info_txn);
1129
1130            on_failures.push(OnFailure {
1131                table_info_value: new_table_info_value,
1132                on_update_table_info_failure,
1133            });
1134        }
1135
1136        let txn = Txn::merge_all(txns);
1137        let mut r = self.kv_backend.txn(txn).await?;
1138
1139        if !r.succeeded {
1140            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1141            for on_failure in on_failures {
1142                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1143                    .context(error::UnexpectedSnafu {
1144                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1145                    })?
1146                    .into_inner();
1147
1148                let op_name = "the batch updating table info";
1149                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1150            }
1151        }
1152
1153        Ok(())
1154    }
1155
1156    pub async fn update_table_route(
1157        &self,
1158        table_id: TableId,
1159        region_info: RegionInfo,
1160        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1161        new_region_routes: Vec<RegionRoute>,
1162        new_region_options: &HashMap<String, String>,
1163        new_region_wal_options: &HashMap<RegionNumber, String>,
1164    ) -> Result<()> {
1165        // Updates the datanode table key value pairs.
1166        let current_region_distribution =
1167            region_distribution(current_table_route_value.region_routes()?);
1168        let new_region_distribution = region_distribution(&new_region_routes);
1169
1170        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1171            table_id,
1172            region_info,
1173            current_region_distribution,
1174            new_region_distribution,
1175            new_region_options,
1176            new_region_wal_options,
1177        )?;
1178
1179        // Updates the table_route.
1180        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1181
1182        let (update_table_route_txn, on_update_table_route_failure) = self
1183            .table_route_manager()
1184            .table_route_storage()
1185            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1186
1187        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1188
1189        let mut r = self.kv_backend.txn(txn).await?;
1190
1191        // Checks whether metadata was already updated.
1192        if !r.succeeded {
1193            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1194            let remote_table_route = on_update_table_route_failure(&mut set)?
1195                .context(error::UnexpectedSnafu {
1196                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1197                })?
1198                .into_inner();
1199
1200            let op_name = "the updating table route";
1201            ensure_values!(remote_table_route, new_table_route_value, op_name);
1202        }
1203
1204        Ok(())
1205    }
1206
1207    /// Updates the leader status of the [RegionRoute].
1208    pub async fn update_leader_region_status<F>(
1209        &self,
1210        table_id: TableId,
1211        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1212        next_region_route_status: F,
1213    ) -> Result<()>
1214    where
1215        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1216    {
1217        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1218
1219        let mut updated = 0;
1220        for route in &mut new_region_routes {
1221            if let Some(state) = next_region_route_status(route) {
1222                if route.set_leader_state(state) {
1223                    updated += 1;
1224                }
1225            }
1226        }
1227
1228        if updated == 0 {
1229            warn!("No leader status updated");
1230            return Ok(());
1231        }
1232
1233        // Updates the table_route.
1234        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1235
1236        let (update_table_route_txn, on_update_table_route_failure) = self
1237            .table_route_manager()
1238            .table_route_storage()
1239            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1240
1241        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1242
1243        // Checks whether metadata was already updated.
1244        if !r.succeeded {
1245            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1246            let remote_table_route = on_update_table_route_failure(&mut set)?
1247                .context(error::UnexpectedSnafu {
1248                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1249                })?
1250                .into_inner();
1251
1252            let op_name = "the updating leader region status";
1253            ensure_values!(remote_table_route, new_table_route_value, op_name);
1254        }
1255
1256        Ok(())
1257    }
1258}
1259
1260#[macro_export]
1261macro_rules! impl_metadata_value {
1262    ($($val_ty: ty), *) => {
1263        $(
1264            impl $crate::key::MetadataValue for $val_ty {
1265                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1266                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1267                }
1268
1269                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1270                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1271                }
1272            }
1273        )*
1274    }
1275}
1276
1277macro_rules! impl_metadata_key_get_txn_op {
1278    ($($key: ty), *) => {
1279        $(
1280            impl $crate::key::MetadataKeyGetTxnOp for $key {
1281                /// Returns a [TxnOp] to retrieve the corresponding value
1282                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1283                fn build_get_op(
1284                    &self,
1285                ) -> (
1286                    TxnOp,
1287                    impl for<'a> FnMut(
1288                        &'a mut TxnOpGetResponseSet,
1289                    ) -> Option<Vec<u8>>,
1290                ) {
1291                    let raw_key = self.to_bytes();
1292                    (
1293                        TxnOp::Get(raw_key.clone()),
1294                        TxnOpGetResponseSet::filter(raw_key),
1295                    )
1296                }
1297            }
1298        )*
1299    }
1300}
1301
1302impl_metadata_key_get_txn_op! {
1303    TableNameKey<'_>,
1304    TableInfoKey,
1305    ViewInfoKey,
1306    TableRouteKey,
1307    DatanodeTableKey
1308}
1309
1310#[macro_export]
1311macro_rules! impl_optional_metadata_value {
1312    ($($val_ty: ty), *) => {
1313        $(
1314            impl $val_ty {
1315                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1316                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1317                }
1318
1319                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1320                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1321                }
1322            }
1323        )*
1324    }
1325}
1326
1327impl_metadata_value! {
1328    TableNameValue,
1329    TableInfoValue,
1330    ViewInfoValue,
1331    DatanodeTableValue,
1332    FlowInfoValue,
1333    FlowNameValue,
1334    FlowRouteValue,
1335    TableFlowValue,
1336    NodeAddressValue,
1337    SchemaNameValue,
1338    FlowStateValue,
1339    PoisonValue
1340}
1341
1342impl_optional_metadata_value! {
1343    CatalogNameValue,
1344    SchemaNameValue
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use std::collections::{BTreeMap, HashMap, HashSet};
1350    use std::sync::Arc;
1351
1352    use bytes::Bytes;
1353    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1354    use common_time::util::current_time_millis;
1355    use common_wal::options::{KafkaWalOptions, WalOptions};
1356    use futures::TryStreamExt;
1357    use store_api::storage::{RegionId, RegionNumber};
1358    use table::metadata::{RawTableInfo, TableInfo};
1359    use table::table_name::TableName;
1360
1361    use super::datanode_table::DatanodeTableKey;
1362    use super::test_utils;
1363    use crate::ddl::test_util::create_table::test_create_table_task;
1364    use crate::ddl::utils::region_storage_path;
1365    use crate::error::Result;
1366    use crate::key::datanode_table::RegionInfo;
1367    use crate::key::table_info::TableInfoValue;
1368    use crate::key::table_name::TableNameKey;
1369    use crate::key::table_route::TableRouteValue;
1370    use crate::key::{
1371        DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
1372    };
1373    use crate::kv_backend::memory::MemoryKvBackend;
1374    use crate::kv_backend::KvBackend;
1375    use crate::peer::Peer;
1376    use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1377    use crate::rpc::store::RangeRequest;
1378    use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1379
1380    #[test]
1381    fn test_deserialized_value_with_bytes() {
1382        let region_route = new_test_region_route();
1383        let region_routes = vec![region_route.clone()];
1384
1385        let expected_region_routes =
1386            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1387        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1388
1389        // Serialize behaviors:
1390        // The inner field will be ignored.
1391        let value = DeserializedValueWithBytes {
1392            // ignored
1393            inner: TableRouteValue::physical(region_routes.clone()),
1394            bytes: Bytes::from(expected.clone()),
1395        };
1396
1397        let encoded = serde_json::to_vec(&value).unwrap();
1398
1399        // Deserialize behaviors:
1400        // The inner field will be deserialized from the bytes field.
1401        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1402            serde_json::from_slice(&encoded).unwrap();
1403
1404        assert_eq!(decoded.inner, expected_region_routes);
1405        assert_eq!(decoded.bytes, expected);
1406    }
1407
1408    fn new_test_region_route() -> RegionRoute {
1409        new_region_route(1, 2)
1410    }
1411
1412    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1413        RegionRoute {
1414            region: Region {
1415                id: region_id.into(),
1416                name: "r1".to_string(),
1417                partition: None,
1418                attrs: BTreeMap::new(),
1419            },
1420            leader_peer: Some(Peer::new(datanode, "a2")),
1421            follower_peers: vec![],
1422            leader_state: None,
1423            leader_down_since: None,
1424        }
1425    }
1426
1427    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1428        test_utils::new_test_table_info(10, region_numbers)
1429    }
1430
1431    fn new_test_table_names() -> HashSet<TableName> {
1432        let mut set = HashSet::new();
1433        set.insert(TableName {
1434            catalog_name: "greptime".to_string(),
1435            schema_name: "public".to_string(),
1436            table_name: "a_table".to_string(),
1437        });
1438        set.insert(TableName {
1439            catalog_name: "greptime".to_string(),
1440            schema_name: "public".to_string(),
1441            table_name: "b_table".to_string(),
1442        });
1443        set
1444    }
1445
1446    async fn create_physical_table_metadata(
1447        table_metadata_manager: &TableMetadataManager,
1448        table_info: RawTableInfo,
1449        region_routes: Vec<RegionRoute>,
1450        region_wal_options: HashMap<RegionNumber, String>,
1451    ) -> Result<()> {
1452        table_metadata_manager
1453            .create_table_metadata(
1454                table_info,
1455                TableRouteValue::physical(region_routes),
1456                region_wal_options,
1457            )
1458            .await
1459    }
1460
1461    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1462        let topics = (0..2)
1463            .map(|i| format!("greptimedb_topic{}", i))
1464            .collect::<Vec<_>>();
1465        let wal_options = topics
1466            .iter()
1467            .map(|topic| {
1468                WalOptions::Kafka(KafkaWalOptions {
1469                    topic: topic.clone(),
1470                })
1471            })
1472            .collect::<Vec<_>>();
1473
1474        (0..16)
1475            .enumerate()
1476            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1477            .collect()
1478    }
1479
1480    #[tokio::test]
1481    async fn test_raft_engine_topic_region_map() {
1482        let mem_kv = Arc::new(MemoryKvBackend::default());
1483        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1484        let region_route = new_test_region_route();
1485        let region_routes = &vec![region_route.clone()];
1486        let table_info: RawTableInfo =
1487            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1488        let wal_allocator = WalOptionsAllocator::RaftEngine;
1489        let regions = (0..16).collect();
1490        let region_wal_options =
1491            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1492        create_physical_table_metadata(
1493            &table_metadata_manager,
1494            table_info.clone(),
1495            region_routes.clone(),
1496            region_wal_options.clone(),
1497        )
1498        .await
1499        .unwrap();
1500
1501        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1502        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1503        let resp = mem_kv.range(range_req).await.unwrap();
1504        // Should be empty because the topic region map is empty for raft engine.
1505        assert!(resp.kvs.is_empty());
1506    }
1507
1508    #[tokio::test]
1509    async fn test_create_table_metadata() {
1510        let mem_kv = Arc::new(MemoryKvBackend::default());
1511        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1512        let region_route = new_test_region_route();
1513        let region_routes = &vec![region_route.clone()];
1514        let table_info: RawTableInfo =
1515            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1516        let region_wal_options = create_mock_region_wal_options()
1517            .into_iter()
1518            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1519            .collect::<HashMap<_, _>>();
1520
1521        // creates metadata.
1522        create_physical_table_metadata(
1523            &table_metadata_manager,
1524            table_info.clone(),
1525            region_routes.clone(),
1526            region_wal_options.clone(),
1527        )
1528        .await
1529        .unwrap();
1530
1531        // if metadata was already created, it should be ok.
1532        assert!(create_physical_table_metadata(
1533            &table_metadata_manager,
1534            table_info.clone(),
1535            region_routes.clone(),
1536            region_wal_options.clone(),
1537        )
1538        .await
1539        .is_ok());
1540
1541        let mut modified_region_routes = region_routes.clone();
1542        modified_region_routes.push(region_route.clone());
1543        // if remote metadata was exists, it should return an error.
1544        assert!(create_physical_table_metadata(
1545            &table_metadata_manager,
1546            table_info.clone(),
1547            modified_region_routes,
1548            region_wal_options.clone(),
1549        )
1550        .await
1551        .is_err());
1552
1553        let (remote_table_info, remote_table_route) = table_metadata_manager
1554            .get_full_table_info(10)
1555            .await
1556            .unwrap();
1557
1558        assert_eq!(
1559            remote_table_info.unwrap().into_inner().table_info,
1560            table_info
1561        );
1562        assert_eq!(
1563            remote_table_route
1564                .unwrap()
1565                .into_inner()
1566                .region_routes()
1567                .unwrap(),
1568            region_routes
1569        );
1570
1571        for i in 0..2 {
1572            let region_number = i as u32;
1573            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1574            let topic = format!("greptimedb_topic{}", i);
1575            let regions = table_metadata_manager
1576                .topic_region_manager
1577                .regions(&topic)
1578                .await
1579                .unwrap();
1580            assert_eq!(regions.len(), 8);
1581            assert_eq!(regions[0], region_id);
1582        }
1583    }
1584
1585    #[tokio::test]
1586    async fn test_create_logic_tables_metadata() {
1587        let mem_kv = Arc::new(MemoryKvBackend::default());
1588        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1589        let region_route = new_test_region_route();
1590        let region_routes = vec![region_route.clone()];
1591        let table_info: RawTableInfo =
1592            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1593        let table_id = table_info.ident.table_id;
1594        let table_route_value = TableRouteValue::physical(region_routes.clone());
1595
1596        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1597        // creates metadata.
1598        table_metadata_manager
1599            .create_logical_tables_metadata(tables_data.clone())
1600            .await
1601            .unwrap();
1602
1603        // if metadata was already created, it should be ok.
1604        assert!(table_metadata_manager
1605            .create_logical_tables_metadata(tables_data)
1606            .await
1607            .is_ok());
1608
1609        let mut modified_region_routes = region_routes.clone();
1610        modified_region_routes.push(new_region_route(2, 3));
1611        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1612        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1613        // if remote metadata was exists, it should return an error.
1614        assert!(table_metadata_manager
1615            .create_logical_tables_metadata(modified_tables_data)
1616            .await
1617            .is_err());
1618
1619        let (remote_table_info, remote_table_route) = table_metadata_manager
1620            .get_full_table_info(table_id)
1621            .await
1622            .unwrap();
1623
1624        assert_eq!(
1625            remote_table_info.unwrap().into_inner().table_info,
1626            table_info
1627        );
1628        assert_eq!(
1629            remote_table_route
1630                .unwrap()
1631                .into_inner()
1632                .region_routes()
1633                .unwrap(),
1634            &region_routes
1635        );
1636    }
1637
1638    #[tokio::test]
1639    async fn test_create_many_logical_tables_metadata() {
1640        let kv_backend = Arc::new(MemoryKvBackend::default());
1641        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1642
1643        let mut tables_data = vec![];
1644        for i in 0..128 {
1645            let table_id = i + 1;
1646            let regin_number = table_id * 3;
1647            let region_id = RegionId::new(table_id, regin_number);
1648            let region_route = new_region_route(region_id.as_u64(), 2);
1649            let region_routes = vec![region_route.clone()];
1650            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1651                table_id,
1652                &format!("my_table_{}", table_id),
1653                region_routes.iter().map(|r| r.region.id.region_number()),
1654            )
1655            .into();
1656            let table_route_value = TableRouteValue::physical(region_routes.clone());
1657
1658            tables_data.push((table_info, table_route_value));
1659        }
1660
1661        // creates metadata.
1662        table_metadata_manager
1663            .create_logical_tables_metadata(tables_data)
1664            .await
1665            .unwrap();
1666    }
1667
1668    #[tokio::test]
1669    async fn test_delete_table_metadata() {
1670        let mem_kv = Arc::new(MemoryKvBackend::default());
1671        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1672        let region_route = new_test_region_route();
1673        let region_routes = &vec![region_route.clone()];
1674        let table_info: RawTableInfo =
1675            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1676        let table_id = table_info.ident.table_id;
1677        let datanode_id = 2;
1678        let region_wal_options = create_mock_region_wal_options();
1679        let serialized_region_wal_options = region_wal_options
1680            .iter()
1681            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1682            .collect::<HashMap<_, _>>();
1683
1684        // creates metadata.
1685        create_physical_table_metadata(
1686            &table_metadata_manager,
1687            table_info.clone(),
1688            region_routes.clone(),
1689            serialized_region_wal_options,
1690        )
1691        .await
1692        .unwrap();
1693
1694        let table_name = TableName::new(
1695            table_info.catalog_name,
1696            table_info.schema_name,
1697            table_info.name,
1698        );
1699        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1700        // deletes metadata.
1701        table_metadata_manager
1702            .delete_table_metadata(
1703                table_id,
1704                &table_name,
1705                table_route_value,
1706                &region_wal_options,
1707            )
1708            .await
1709            .unwrap();
1710        // Should be ignored.
1711        table_metadata_manager
1712            .delete_table_metadata(
1713                table_id,
1714                &table_name,
1715                table_route_value,
1716                &region_wal_options,
1717            )
1718            .await
1719            .unwrap();
1720        assert!(table_metadata_manager
1721            .table_info_manager()
1722            .get(table_id)
1723            .await
1724            .unwrap()
1725            .is_none());
1726        assert!(table_metadata_manager
1727            .table_route_manager()
1728            .table_route_storage()
1729            .get(table_id)
1730            .await
1731            .unwrap()
1732            .is_none());
1733        assert!(table_metadata_manager
1734            .datanode_table_manager()
1735            .tables(datanode_id)
1736            .try_collect::<Vec<_>>()
1737            .await
1738            .unwrap()
1739            .is_empty());
1740        // Checks removed values
1741        let table_info = table_metadata_manager
1742            .table_info_manager()
1743            .get(table_id)
1744            .await
1745            .unwrap();
1746        assert!(table_info.is_none());
1747        let table_route = table_metadata_manager
1748            .table_route_manager()
1749            .table_route_storage()
1750            .get(table_id)
1751            .await
1752            .unwrap();
1753        assert!(table_route.is_none());
1754        // Logical delete removes the topic region mapping as well.
1755        let regions = table_metadata_manager
1756            .topic_region_manager
1757            .regions("greptimedb_topic0")
1758            .await
1759            .unwrap();
1760        assert_eq!(regions.len(), 0);
1761        let regions = table_metadata_manager
1762            .topic_region_manager
1763            .regions("greptimedb_topic1")
1764            .await
1765            .unwrap();
1766        assert_eq!(regions.len(), 0);
1767    }
1768
1769    #[tokio::test]
1770    async fn test_rename_table() {
1771        let mem_kv = Arc::new(MemoryKvBackend::default());
1772        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1773        let region_route = new_test_region_route();
1774        let region_routes = vec![region_route.clone()];
1775        let table_info: RawTableInfo =
1776            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1777        let table_id = table_info.ident.table_id;
1778        // creates metadata.
1779        create_physical_table_metadata(
1780            &table_metadata_manager,
1781            table_info.clone(),
1782            region_routes.clone(),
1783            HashMap::new(),
1784        )
1785        .await
1786        .unwrap();
1787
1788        let new_table_name = "another_name".to_string();
1789        let table_info_value =
1790            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1791
1792        table_metadata_manager
1793            .rename_table(&table_info_value, new_table_name.clone())
1794            .await
1795            .unwrap();
1796        // if remote metadata was updated, it should be ok.
1797        table_metadata_manager
1798            .rename_table(&table_info_value, new_table_name.clone())
1799            .await
1800            .unwrap();
1801        let mut modified_table_info = table_info.clone();
1802        modified_table_info.name = "hi".to_string();
1803        let modified_table_info_value =
1804            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1805        // if the table_info_value is wrong, it should return an error.
1806        // The ABA problem.
1807        assert!(table_metadata_manager
1808            .rename_table(&modified_table_info_value, new_table_name.clone())
1809            .await
1810            .is_err());
1811
1812        let old_table_name = TableNameKey::new(
1813            &table_info.catalog_name,
1814            &table_info.schema_name,
1815            &table_info.name,
1816        );
1817        let new_table_name = TableNameKey::new(
1818            &table_info.catalog_name,
1819            &table_info.schema_name,
1820            &new_table_name,
1821        );
1822
1823        assert!(table_metadata_manager
1824            .table_name_manager()
1825            .get(old_table_name)
1826            .await
1827            .unwrap()
1828            .is_none());
1829
1830        assert_eq!(
1831            table_metadata_manager
1832                .table_name_manager()
1833                .get(new_table_name)
1834                .await
1835                .unwrap()
1836                .unwrap()
1837                .table_id(),
1838            table_id
1839        );
1840    }
1841
1842    #[tokio::test]
1843    async fn test_update_table_info() {
1844        let mem_kv = Arc::new(MemoryKvBackend::default());
1845        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1846        let region_route = new_test_region_route();
1847        let region_routes = vec![region_route.clone()];
1848        let table_info: RawTableInfo =
1849            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1850        let table_id = table_info.ident.table_id;
1851        // creates metadata.
1852        create_physical_table_metadata(
1853            &table_metadata_manager,
1854            table_info.clone(),
1855            region_routes.clone(),
1856            HashMap::new(),
1857        )
1858        .await
1859        .unwrap();
1860
1861        let mut new_table_info = table_info.clone();
1862        new_table_info.name = "hi".to_string();
1863        let current_table_info_value =
1864            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1865        // should be ok.
1866        table_metadata_manager
1867            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1868            .await
1869            .unwrap();
1870        // if table info was updated, it should be ok.
1871        table_metadata_manager
1872            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1873            .await
1874            .unwrap();
1875
1876        // updated table_info should equal the `new_table_info`
1877        let updated_table_info = table_metadata_manager
1878            .table_info_manager()
1879            .get(table_id)
1880            .await
1881            .unwrap()
1882            .unwrap()
1883            .into_inner();
1884        assert_eq!(updated_table_info.table_info, new_table_info);
1885
1886        let mut wrong_table_info = table_info.clone();
1887        wrong_table_info.name = "wrong".to_string();
1888        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1889            current_table_info_value.update(wrong_table_info),
1890        );
1891        // if the current_table_info_value is wrong, it should return an error.
1892        // The ABA problem.
1893        assert!(table_metadata_manager
1894            .update_table_info(&wrong_table_info_value, None, new_table_info)
1895            .await
1896            .is_err())
1897    }
1898
1899    #[tokio::test]
1900    async fn test_update_table_leader_region_status() {
1901        let mem_kv = Arc::new(MemoryKvBackend::default());
1902        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1903        let datanode = 1;
1904        let region_routes = vec![
1905            RegionRoute {
1906                region: Region {
1907                    id: 1.into(),
1908                    name: "r1".to_string(),
1909                    partition: None,
1910                    attrs: BTreeMap::new(),
1911                },
1912                leader_peer: Some(Peer::new(datanode, "a2")),
1913                leader_state: Some(LeaderState::Downgrading),
1914                follower_peers: vec![],
1915                leader_down_since: Some(current_time_millis()),
1916            },
1917            RegionRoute {
1918                region: Region {
1919                    id: 2.into(),
1920                    name: "r2".to_string(),
1921                    partition: None,
1922                    attrs: BTreeMap::new(),
1923                },
1924                leader_peer: Some(Peer::new(datanode, "a1")),
1925                leader_state: None,
1926                follower_peers: vec![],
1927                leader_down_since: None,
1928            },
1929        ];
1930        let table_info: RawTableInfo =
1931            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1932        let table_id = table_info.ident.table_id;
1933        let current_table_route_value = DeserializedValueWithBytes::from_inner(
1934            TableRouteValue::physical(region_routes.clone()),
1935        );
1936
1937        // creates metadata.
1938        create_physical_table_metadata(
1939            &table_metadata_manager,
1940            table_info.clone(),
1941            region_routes.clone(),
1942            HashMap::new(),
1943        )
1944        .await
1945        .unwrap();
1946
1947        table_metadata_manager
1948            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
1949                if region_route.leader_state.is_some() {
1950                    None
1951                } else {
1952                    Some(Some(LeaderState::Downgrading))
1953                }
1954            })
1955            .await
1956            .unwrap();
1957
1958        let updated_route_value = table_metadata_manager
1959            .table_route_manager()
1960            .table_route_storage()
1961            .get(table_id)
1962            .await
1963            .unwrap()
1964            .unwrap();
1965
1966        assert_eq!(
1967            updated_route_value.region_routes().unwrap()[0].leader_state,
1968            Some(LeaderState::Downgrading)
1969        );
1970
1971        assert!(updated_route_value.region_routes().unwrap()[0]
1972            .leader_down_since
1973            .is_some());
1974
1975        assert_eq!(
1976            updated_route_value.region_routes().unwrap()[1].leader_state,
1977            Some(LeaderState::Downgrading)
1978        );
1979        assert!(updated_route_value.region_routes().unwrap()[1]
1980            .leader_down_since
1981            .is_some());
1982    }
1983
1984    async fn assert_datanode_table(
1985        table_metadata_manager: &TableMetadataManager,
1986        table_id: u32,
1987        region_routes: &[RegionRoute],
1988    ) {
1989        let region_distribution = region_distribution(region_routes);
1990        for (datanode, regions) in region_distribution {
1991            let got = table_metadata_manager
1992                .datanode_table_manager()
1993                .get(&DatanodeTableKey::new(datanode, table_id))
1994                .await
1995                .unwrap()
1996                .unwrap();
1997
1998            assert_eq!(got.regions, regions)
1999        }
2000    }
2001
2002    #[tokio::test]
2003    async fn test_update_table_route() {
2004        let mem_kv = Arc::new(MemoryKvBackend::default());
2005        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2006        let region_route = new_test_region_route();
2007        let region_routes = vec![region_route.clone()];
2008        let table_info: RawTableInfo =
2009            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2010        let table_id = table_info.ident.table_id;
2011        let engine = table_info.meta.engine.as_str();
2012        let region_storage_path =
2013            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2014        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2015            TableRouteValue::physical(region_routes.clone()),
2016        );
2017
2018        // creates metadata.
2019        create_physical_table_metadata(
2020            &table_metadata_manager,
2021            table_info.clone(),
2022            region_routes.clone(),
2023            HashMap::new(),
2024        )
2025        .await
2026        .unwrap();
2027
2028        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2029        let new_region_routes = vec![
2030            new_region_route(1, 1),
2031            new_region_route(2, 2),
2032            new_region_route(3, 3),
2033        ];
2034        // it should be ok.
2035        table_metadata_manager
2036            .update_table_route(
2037                table_id,
2038                RegionInfo {
2039                    engine: engine.to_string(),
2040                    region_storage_path: region_storage_path.to_string(),
2041                    region_options: HashMap::new(),
2042                    region_wal_options: HashMap::new(),
2043                },
2044                &current_table_route_value,
2045                new_region_routes.clone(),
2046                &HashMap::new(),
2047                &HashMap::new(),
2048            )
2049            .await
2050            .unwrap();
2051        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2052
2053        // if the table route was updated. it should be ok.
2054        table_metadata_manager
2055            .update_table_route(
2056                table_id,
2057                RegionInfo {
2058                    engine: engine.to_string(),
2059                    region_storage_path: region_storage_path.to_string(),
2060                    region_options: HashMap::new(),
2061                    region_wal_options: HashMap::new(),
2062                },
2063                &current_table_route_value,
2064                new_region_routes.clone(),
2065                &HashMap::new(),
2066                &HashMap::new(),
2067            )
2068            .await
2069            .unwrap();
2070
2071        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2072            current_table_route_value
2073                .inner
2074                .update(new_region_routes.clone())
2075                .unwrap(),
2076        );
2077        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2078        // it should be ok.
2079        table_metadata_manager
2080            .update_table_route(
2081                table_id,
2082                RegionInfo {
2083                    engine: engine.to_string(),
2084                    region_storage_path: region_storage_path.to_string(),
2085                    region_options: HashMap::new(),
2086                    region_wal_options: HashMap::new(),
2087                },
2088                &current_table_route_value,
2089                new_region_routes.clone(),
2090                &HashMap::new(),
2091                &HashMap::new(),
2092            )
2093            .await
2094            .unwrap();
2095        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2096
2097        // if the current_table_route_value is wrong, it should return an error.
2098        // The ABA problem.
2099        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2100            current_table_route_value
2101                .update(vec![
2102                    new_region_route(1, 1),
2103                    new_region_route(2, 2),
2104                    new_region_route(3, 3),
2105                    new_region_route(4, 4),
2106                ])
2107                .unwrap(),
2108        );
2109        assert!(table_metadata_manager
2110            .update_table_route(
2111                table_id,
2112                RegionInfo {
2113                    engine: engine.to_string(),
2114                    region_storage_path: region_storage_path.to_string(),
2115                    region_options: HashMap::new(),
2116                    region_wal_options: HashMap::new(),
2117                },
2118                &wrong_table_route_value,
2119                new_region_routes,
2120                &HashMap::new(),
2121                &HashMap::new(),
2122            )
2123            .await
2124            .is_err());
2125    }
2126
2127    #[tokio::test]
2128    async fn test_destroy_table_metadata() {
2129        let mem_kv = Arc::new(MemoryKvBackend::default());
2130        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2131        let table_id = 1025;
2132        let table_name = "foo";
2133        let task = test_create_table_task(table_name, table_id);
2134        let options = create_mock_region_wal_options();
2135        let serialized_options = options
2136            .iter()
2137            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2138            .collect::<HashMap<_, _>>();
2139        table_metadata_manager
2140            .create_table_metadata(
2141                task.table_info,
2142                TableRouteValue::physical(vec![
2143                    RegionRoute {
2144                        region: Region::new_test(RegionId::new(table_id, 1)),
2145                        leader_peer: Some(Peer::empty(1)),
2146                        follower_peers: vec![Peer::empty(5)],
2147                        leader_state: None,
2148                        leader_down_since: None,
2149                    },
2150                    RegionRoute {
2151                        region: Region::new_test(RegionId::new(table_id, 2)),
2152                        leader_peer: Some(Peer::empty(2)),
2153                        follower_peers: vec![Peer::empty(4)],
2154                        leader_state: None,
2155                        leader_down_since: None,
2156                    },
2157                    RegionRoute {
2158                        region: Region::new_test(RegionId::new(table_id, 3)),
2159                        leader_peer: Some(Peer::empty(3)),
2160                        follower_peers: vec![],
2161                        leader_state: None,
2162                        leader_down_since: None,
2163                    },
2164                ]),
2165                serialized_options,
2166            )
2167            .await
2168            .unwrap();
2169        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2170        let table_route_value = table_metadata_manager
2171            .table_route_manager
2172            .table_route_storage()
2173            .get_with_raw_bytes(table_id)
2174            .await
2175            .unwrap()
2176            .unwrap();
2177        table_metadata_manager
2178            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2179            .await
2180            .unwrap();
2181        assert!(mem_kv.is_empty());
2182    }
2183
2184    #[tokio::test]
2185    async fn test_restore_table_metadata() {
2186        let mem_kv = Arc::new(MemoryKvBackend::default());
2187        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2188        let table_id = 1025;
2189        let table_name = "foo";
2190        let task = test_create_table_task(table_name, table_id);
2191        let options = create_mock_region_wal_options();
2192        let serialized_options = options
2193            .iter()
2194            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2195            .collect::<HashMap<_, _>>();
2196        table_metadata_manager
2197            .create_table_metadata(
2198                task.table_info,
2199                TableRouteValue::physical(vec![
2200                    RegionRoute {
2201                        region: Region::new_test(RegionId::new(table_id, 1)),
2202                        leader_peer: Some(Peer::empty(1)),
2203                        follower_peers: vec![Peer::empty(5)],
2204                        leader_state: None,
2205                        leader_down_since: None,
2206                    },
2207                    RegionRoute {
2208                        region: Region::new_test(RegionId::new(table_id, 2)),
2209                        leader_peer: Some(Peer::empty(2)),
2210                        follower_peers: vec![Peer::empty(4)],
2211                        leader_state: None,
2212                        leader_down_since: None,
2213                    },
2214                    RegionRoute {
2215                        region: Region::new_test(RegionId::new(table_id, 3)),
2216                        leader_peer: Some(Peer::empty(3)),
2217                        follower_peers: vec![],
2218                        leader_state: None,
2219                        leader_down_since: None,
2220                    },
2221                ]),
2222                serialized_options,
2223            )
2224            .await
2225            .unwrap();
2226        let expected_result = mem_kv.dump();
2227        let table_route_value = table_metadata_manager
2228            .table_route_manager
2229            .table_route_storage()
2230            .get_with_raw_bytes(table_id)
2231            .await
2232            .unwrap()
2233            .unwrap();
2234        let region_routes = table_route_value.region_routes().unwrap();
2235        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2236        let table_route_value = TableRouteValue::physical(region_routes.clone());
2237        table_metadata_manager
2238            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2239            .await
2240            .unwrap();
2241        table_metadata_manager
2242            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2243            .await
2244            .unwrap();
2245        let kvs = mem_kv.dump();
2246        assert_eq!(kvs, expected_result);
2247        // Should be ignored.
2248        table_metadata_manager
2249            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2250            .await
2251            .unwrap();
2252        let kvs = mem_kv.dump();
2253        assert_eq!(kvs, expected_result);
2254    }
2255
2256    #[tokio::test]
2257    async fn test_create_update_view_info() {
2258        let mem_kv = Arc::new(MemoryKvBackend::default());
2259        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2260
2261        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2262
2263        let view_id = view_info.ident.table_id;
2264
2265        let logical_plan: Vec<u8> = vec![1, 2, 3];
2266        let columns = vec!["a".to_string()];
2267        let plan_columns = vec!["number".to_string()];
2268        let table_names = new_test_table_names();
2269        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2270
2271        // Create metadata
2272        table_metadata_manager
2273            .create_view_metadata(
2274                view_info.clone(),
2275                logical_plan.clone(),
2276                table_names.clone(),
2277                columns.clone(),
2278                plan_columns.clone(),
2279                definition.to_string(),
2280            )
2281            .await
2282            .unwrap();
2283
2284        {
2285            // assert view info
2286            let current_view_info = table_metadata_manager
2287                .view_info_manager()
2288                .get(view_id)
2289                .await
2290                .unwrap()
2291                .unwrap()
2292                .into_inner();
2293            assert_eq!(current_view_info.view_info, logical_plan);
2294            assert_eq!(current_view_info.table_names, table_names);
2295            assert_eq!(current_view_info.definition, definition);
2296            assert_eq!(current_view_info.columns, columns);
2297            assert_eq!(current_view_info.plan_columns, plan_columns);
2298            // assert table info
2299            let current_table_info = table_metadata_manager
2300                .table_info_manager()
2301                .get(view_id)
2302                .await
2303                .unwrap()
2304                .unwrap()
2305                .into_inner();
2306            assert_eq!(current_table_info.table_info, view_info);
2307        }
2308
2309        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2310        let new_table_names = {
2311            let mut set = HashSet::new();
2312            set.insert(TableName {
2313                catalog_name: "greptime".to_string(),
2314                schema_name: "public".to_string(),
2315                table_name: "b_table".to_string(),
2316            });
2317            set.insert(TableName {
2318                catalog_name: "greptime".to_string(),
2319                schema_name: "public".to_string(),
2320                table_name: "c_table".to_string(),
2321            });
2322            set
2323        };
2324        let new_columns = vec!["b".to_string()];
2325        let new_plan_columns = vec!["number2".to_string()];
2326        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2327
2328        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2329            logical_plan.clone(),
2330            table_names,
2331            columns,
2332            plan_columns,
2333            definition.to_string(),
2334        ));
2335        // should be ok.
2336        table_metadata_manager
2337            .update_view_info(
2338                view_id,
2339                &current_view_info_value,
2340                new_logical_plan.clone(),
2341                new_table_names.clone(),
2342                new_columns.clone(),
2343                new_plan_columns.clone(),
2344                new_definition.to_string(),
2345            )
2346            .await
2347            .unwrap();
2348        // if table info was updated, it should be ok.
2349        table_metadata_manager
2350            .update_view_info(
2351                view_id,
2352                &current_view_info_value,
2353                new_logical_plan.clone(),
2354                new_table_names.clone(),
2355                new_columns.clone(),
2356                new_plan_columns.clone(),
2357                new_definition.to_string(),
2358            )
2359            .await
2360            .unwrap();
2361
2362        // updated view_info should equal the `new_logical_plan`
2363        let updated_view_info = table_metadata_manager
2364            .view_info_manager()
2365            .get(view_id)
2366            .await
2367            .unwrap()
2368            .unwrap()
2369            .into_inner();
2370        assert_eq!(updated_view_info.view_info, new_logical_plan);
2371        assert_eq!(updated_view_info.table_names, new_table_names);
2372        assert_eq!(updated_view_info.definition, new_definition);
2373        assert_eq!(updated_view_info.columns, new_columns);
2374        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2375
2376        let wrong_view_info = logical_plan.clone();
2377        let wrong_definition = "wrong_definition";
2378        let wrong_view_info_value =
2379            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2380                wrong_view_info,
2381                new_table_names.clone(),
2382                new_columns.clone(),
2383                new_plan_columns.clone(),
2384                wrong_definition.to_string(),
2385            ));
2386        // if the current_view_info_value is wrong, it should return an error.
2387        // The ABA problem.
2388        assert!(table_metadata_manager
2389            .update_view_info(
2390                view_id,
2391                &wrong_view_info_value,
2392                new_logical_plan.clone(),
2393                new_table_names.clone(),
2394                vec!["c".to_string()],
2395                vec!["number3".to_string()],
2396                wrong_definition.to_string(),
2397            )
2398            .await
2399            .is_err());
2400
2401        // The view_info is not changed.
2402        let current_view_info = table_metadata_manager
2403            .view_info_manager()
2404            .get(view_id)
2405            .await
2406            .unwrap()
2407            .unwrap()
2408            .into_inner();
2409        assert_eq!(current_view_info.view_info, new_logical_plan);
2410        assert_eq!(current_view_info.table_names, new_table_names);
2411        assert_eq!(current_view_info.definition, new_definition);
2412        assert_eq!(current_view_info.columns, new_columns);
2413        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2414    }
2415}