common_meta/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This mod defines all the keys used in the metadata store (Metasrv).
16//! Specifically, there are these kinds of keys:
17//!
18//! 1. Datanode table key: `__dn_table/{datanode_id}/{table_id}`
19//!     - The value is a [DatanodeTableValue] struct; it contains `table_id` and the regions that
20//!       belong to this Datanode.
21//!     - This key is primary used in the startup of Datanode, to let Datanode know which tables
22//!       and regions it should open.
23//!
24//! 2. Table info key: `__table_info/{table_id}`
25//!     - The value is a [TableInfoValue] struct; it contains the whole table info (like column
26//!       schemas).
27//!     - This key is mainly used in constructing the table in Datanode and Frontend.
28//!
29//! 3. Catalog name key: `__catalog_name/{catalog_name}`
30//!     - Indices all catalog names
31//!
32//! 4. Schema name key: `__schema_name/{catalog_name}/{schema_name}`
33//!     - Indices all schema names belong to the {catalog_name}
34//!
35//! 5. Table name key: `__table_name/{catalog_name}/{schema_name}/{table_name}`
36//!     - The value is a [TableNameValue] struct; it contains the table id.
37//!     - Used in the table name to table id lookup.
38//!
39//! 6. Flow info key: `__flow/info/{flow_id}`
40//!     - Stores metadata of the flow.
41//!
42//! 7. Flow route key: `__flow/route/{flow_id}/{partition_id}`
43//!     - Stores route of the flow.
44//!
45//! 8. Flow name key: `__flow/name/{catalog}/{flow_name}`
46//!     - Mapping {catalog}/{flow_name} to {flow_id}
47//!
48//! 9. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}`
49//!     - Mapping {flownode_id} to {flow_id}
50//!
51//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
52//!     - Mapping source table's {table_id} to {flownode_id}
53//!     - Used in `Flownode` booting.
54//!
55//! 11. View info key: `__view_info/{view_id}`
56//!     - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
57//!     - This key is mainly used in constructing the view in Datanode and Frontend.
58//!
59//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
60//!     - The key is used to track existing topics in Kafka.
61//!     - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
62//!       the highest entry id that has been pruned from the remote WAL.
63//!     - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
64//!
65//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
66//!     - Mapping {topic_name} to {region_id}
67//!
68//! All keys have related managers. The managers take care of the serialization and deserialization
69//! of keys and values, and the interaction with the underlying KV store backend.
70//!
71//! To simplify the managers used in struct fields and function parameters, we define "unify"
72//! table metadata manager: [TableMetadataManager]
73//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager).
74//! It contains all the managers defined above. It's recommended to just use this manager only.
75//!
76//! The whole picture of flow keys will be like this:
77//!
78//! __flow/
79//!   info/
80//!     {flow_id}
81//!   route/
82//!     {flow_id}/
83//!      {partition_id}
84//!
85//!    name/
86//!      {catalog_name}
87//!        {flow_name}
88//!
89//!    flownode/
90//!      {flownode_id}/
91//!        {flow_id}/
92//!          {partition_id}
93//!
94//!    source_table/
95//!      {table_id}/
96//!        {flownode_id}/
97//!          {flow_id}/
98//!            {partition_id}
99
100pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
168pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
169pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
170
171pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
172pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
173pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
174pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
175pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
176pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
177pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
178pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
179pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
180// The legacy topic key prefix is used to store the topic name in previous versions.
181pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
182pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
183
184/// The keys with these prefixes will be loaded into the cache when the leader starts.
185pub const CACHE_KEY_PREFIXES: [&str; 5] = [
186    TABLE_NAME_KEY_PREFIX,
187    CATALOG_NAME_KEY_PREFIX,
188    SCHEMA_NAME_KEY_PREFIX,
189    TABLE_ROUTE_PREFIX,
190    NODE_ADDRESS_PREFIX,
191];
192
193/// A set of regions with the same role.
194#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
195pub struct RegionRoleSet {
196    /// Leader regions.
197    pub leader_regions: Vec<RegionNumber>,
198    /// Follower regions.
199    pub follower_regions: Vec<RegionNumber>,
200}
201
202impl<'de> Deserialize<'de> for RegionRoleSet {
203    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
204    where
205        D: serde::Deserializer<'de>,
206    {
207        #[derive(Deserialize)]
208        #[serde(untagged)]
209        enum RegionRoleSetOrLeaderOnly {
210            Full {
211                leader_regions: Vec<RegionNumber>,
212                follower_regions: Vec<RegionNumber>,
213            },
214            LeaderOnly(Vec<RegionNumber>),
215        }
216        match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
217            RegionRoleSetOrLeaderOnly::Full {
218                leader_regions,
219                follower_regions,
220            } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
221            RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
222                Ok(RegionRoleSet::new(leader_regions, vec![]))
223            }
224        }
225    }
226}
227
228impl RegionRoleSet {
229    /// Create a new region role set.
230    pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
231        Self {
232            leader_regions,
233            follower_regions,
234        }
235    }
236
237    /// Add a leader region to the set.
238    pub fn add_leader_region(&mut self, region_number: RegionNumber) {
239        self.leader_regions.push(region_number);
240    }
241
242    /// Add a follower region to the set.
243    pub fn add_follower_region(&mut self, region_number: RegionNumber) {
244        self.follower_regions.push(region_number);
245    }
246
247    /// Sort the regions.
248    pub fn sort(&mut self) {
249        self.follower_regions.sort();
250        self.leader_regions.sort();
251    }
252}
253
254/// The distribution of regions.
255///
256/// The key is the datanode id, the value is the region role set.
257pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
258
259/// The id of flow.
260pub type FlowId = u32;
261/// The partition of flow.
262pub type FlowPartitionId = u32;
263
264lazy_static! {
265    pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
266}
267
268lazy_static! {
269    static ref TABLE_INFO_KEY_PATTERN: Regex =
270        Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
271}
272
273lazy_static! {
274    static ref VIEW_INFO_KEY_PATTERN: Regex =
275        Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
276}
277
278lazy_static! {
279    static ref TABLE_ROUTE_KEY_PATTERN: Regex =
280        Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
281}
282
283lazy_static! {
284    static ref DATANODE_TABLE_KEY_PATTERN: Regex =
285        Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
286}
287
288lazy_static! {
289    static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
290        "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
291    ))
292    .unwrap();
293}
294
295lazy_static! {
296    /// CATALOG_NAME_KEY: {CATALOG_NAME_KEY_PREFIX}/{catalog_name}
297    static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
298        "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
299    ))
300        .unwrap();
301}
302
303lazy_static! {
304    /// SCHEMA_NAME_KEY: {SCHEMA_NAME_KEY_PREFIX}/{catalog_name}/{schema_name}
305    static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
306        "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
307    ))
308        .unwrap();
309}
310
311lazy_static! {
312    static ref NODE_ADDRESS_PATTERN: Regex =
313        Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
314}
315
316lazy_static! {
317    pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
318        Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
319}
320
321lazy_static! {
322    pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
323        "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
324    ))
325    .unwrap();
326}
327
328/// The key of metadata.
329pub trait MetadataKey<'a, T> {
330    fn to_bytes(&self) -> Vec<u8>;
331
332    fn from_bytes(bytes: &'a [u8]) -> Result<T>;
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub struct BytesAdapter(Vec<u8>);
337
338impl From<Vec<u8>> for BytesAdapter {
339    fn from(value: Vec<u8>) -> Self {
340        Self(value)
341    }
342}
343
344impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
345    fn to_bytes(&self) -> Vec<u8> {
346        self.0.clone()
347    }
348
349    fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
350        Ok(BytesAdapter(bytes.to_vec()))
351    }
352}
353
354pub(crate) trait MetadataKeyGetTxnOp {
355    fn build_get_op(
356        &self,
357    ) -> (
358        TxnOp,
359        impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
360    );
361}
362
363pub trait MetadataValue {
364    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
365    where
366        Self: Sized;
367
368    fn try_as_raw_value(&self) -> Result<Vec<u8>>;
369}
370
371pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
372
373pub struct TableMetadataManager {
374    table_name_manager: TableNameManager,
375    table_info_manager: TableInfoManager,
376    view_info_manager: ViewInfoManager,
377    datanode_table_manager: DatanodeTableManager,
378    catalog_manager: CatalogManager,
379    schema_manager: SchemaManager,
380    table_route_manager: TableRouteManager,
381    tombstone_manager: TombstoneManager,
382    topic_name_manager: TopicNameManager,
383    topic_region_manager: TopicRegionManager,
384    kv_backend: KvBackendRef,
385}
386
387#[macro_export]
388macro_rules! ensure_values {
389    ($got:expr, $expected_value:expr, $name:expr) => {
390        ensure!(
391            $got == $expected_value,
392            error::UnexpectedSnafu {
393                err_msg: format!(
394                    "Reads the different value: {:?} during {}, expected: {:?}",
395                    $got, $name, $expected_value
396                )
397            }
398        );
399    };
400}
401
402/// A struct containing a deserialized value(`inner`) and an original bytes.
403///
404/// - Serialize behaviors:
405///
406/// The `inner` field will be ignored.
407///
408/// - Deserialize behaviors:
409///
410/// The `inner` field will be deserialized from the `bytes` field.
411pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
412    // The original bytes of the inner.
413    bytes: Bytes,
414    // The value was deserialized from the original bytes.
415    inner: T,
416}
417
418impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
419    type Target = T;
420
421    fn deref(&self) -> &Self::Target {
422        &self.inner
423    }
424}
425
426impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
427    fn deref_mut(&mut self) -> &mut Self::Target {
428        &mut self.inner
429    }
430}
431
432impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        write!(
435            f,
436            "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
437            self.inner, self.bytes
438        )
439    }
440}
441
442impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
443    /// - Serialize behaviors:
444    ///
445    /// The `inner` field will be ignored.
446    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
447    where
448        S: serde::Serializer,
449    {
450        // Safety: The original bytes are always JSON encoded.
451        // It's more efficiently than `serialize_bytes`.
452        serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
453    }
454}
455
456impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
457    for DeserializedValueWithBytes<T>
458{
459    /// - Deserialize behaviors:
460    ///
461    /// The `inner` field will be deserialized from the `bytes` field.
462    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
463    where
464        D: serde::Deserializer<'de>,
465    {
466        let buf = String::deserialize(deserializer)?;
467        let bytes = Bytes::from(buf);
468
469        let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
470            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
471
472        Ok(value)
473    }
474}
475
476impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
477    fn clone(&self) -> Self {
478        Self {
479            bytes: self.bytes.clone(),
480            inner: self.inner.clone(),
481        }
482    }
483}
484
485impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
486    /// Returns a struct containing a deserialized value and an original `bytes`.
487    /// It accepts original bytes of inner.
488    pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
489        let inner = T::try_from_raw_value(&bytes)?;
490        Ok(Self { bytes, inner })
491    }
492
493    /// Returns a struct containing a deserialized value and an original `bytes`.
494    /// It accepts original bytes of inner.
495    pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
496        Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
497    }
498
499    pub fn into_inner(self) -> T {
500        self.inner
501    }
502
503    pub fn get_inner_ref(&self) -> &T {
504        &self.inner
505    }
506
507    /// Returns original `bytes`
508    pub fn get_raw_bytes(&self) -> Vec<u8> {
509        self.bytes.to_vec()
510    }
511
512    #[cfg(any(test, feature = "testing"))]
513    pub fn from_inner(inner: T) -> Self {
514        let bytes = serde_json::to_vec(&inner).unwrap();
515
516        Self {
517            bytes: Bytes::from(bytes),
518            inner,
519        }
520    }
521}
522
523impl TableMetadataManager {
524    pub fn new(kv_backend: KvBackendRef) -> Self {
525        TableMetadataManager {
526            table_name_manager: TableNameManager::new(kv_backend.clone()),
527            table_info_manager: TableInfoManager::new(kv_backend.clone()),
528            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
529            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
530            catalog_manager: CatalogManager::new(kv_backend.clone()),
531            schema_manager: SchemaManager::new(kv_backend.clone()),
532            table_route_manager: TableRouteManager::new(kv_backend.clone()),
533            tombstone_manager: TombstoneManager::new(kv_backend.clone()),
534            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
535            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
536            kv_backend,
537        }
538    }
539
540    /// Creates a new `TableMetadataManager` with a custom tombstone prefix.
541    pub fn new_with_custom_tombstone_prefix(
542        kv_backend: KvBackendRef,
543        tombstone_prefix: &str,
544    ) -> Self {
545        Self {
546            table_name_manager: TableNameManager::new(kv_backend.clone()),
547            table_info_manager: TableInfoManager::new(kv_backend.clone()),
548            view_info_manager: ViewInfoManager::new(kv_backend.clone()),
549            datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
550            catalog_manager: CatalogManager::new(kv_backend.clone()),
551            schema_manager: SchemaManager::new(kv_backend.clone()),
552            table_route_manager: TableRouteManager::new(kv_backend.clone()),
553            tombstone_manager: TombstoneManager::new_with_prefix(
554                kv_backend.clone(),
555                tombstone_prefix,
556            ),
557            topic_name_manager: TopicNameManager::new(kv_backend.clone()),
558            topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
559            kv_backend,
560        }
561    }
562
563    pub async fn init(&self) -> Result<()> {
564        let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
565
566        self.catalog_manager().create(catalog_name, true).await?;
567
568        let internal_schemas = [
569            DEFAULT_SCHEMA_NAME,
570            INFORMATION_SCHEMA_NAME,
571            DEFAULT_PRIVATE_SCHEMA_NAME,
572        ];
573
574        for schema_name in internal_schemas {
575            let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
576
577            self.schema_manager().create(schema_key, None, true).await?;
578        }
579
580        Ok(())
581    }
582
583    pub fn table_name_manager(&self) -> &TableNameManager {
584        &self.table_name_manager
585    }
586
587    pub fn table_info_manager(&self) -> &TableInfoManager {
588        &self.table_info_manager
589    }
590
591    pub fn view_info_manager(&self) -> &ViewInfoManager {
592        &self.view_info_manager
593    }
594
595    pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
596        &self.datanode_table_manager
597    }
598
599    pub fn catalog_manager(&self) -> &CatalogManager {
600        &self.catalog_manager
601    }
602
603    pub fn schema_manager(&self) -> &SchemaManager {
604        &self.schema_manager
605    }
606
607    pub fn table_route_manager(&self) -> &TableRouteManager {
608        &self.table_route_manager
609    }
610
611    pub fn topic_name_manager(&self) -> &TopicNameManager {
612        &self.topic_name_manager
613    }
614
615    pub fn topic_region_manager(&self) -> &TopicRegionManager {
616        &self.topic_region_manager
617    }
618
619    #[cfg(feature = "testing")]
620    pub fn kv_backend(&self) -> &KvBackendRef {
621        &self.kv_backend
622    }
623
624    pub async fn get_full_table_info(
625        &self,
626        table_id: TableId,
627    ) -> Result<(
628        Option<DeserializedValueWithBytes<TableInfoValue>>,
629        Option<DeserializedValueWithBytes<TableRouteValue>>,
630    )> {
631        let table_info_key = TableInfoKey::new(table_id);
632        let table_route_key = TableRouteKey::new(table_id);
633        let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
634        let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
635
636        let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
637        let mut res = self.kv_backend.txn(txn).await?;
638        let mut set = TxnOpGetResponseSet::from(&mut res.responses);
639        let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
640        let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
641        Ok((table_info_value, table_route_value))
642    }
643
644    /// Creates metadata for view and returns an error if different metadata exists.
645    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
646    /// Parameters include:
647    /// - `view_info`: the encoded logical plan
648    /// - `table_names`: the resolved fully table names in logical plan
649    /// - `columns`: the view columns
650    /// - `plan_columns`: the original plan columns
651    /// - `definition`: The SQL to create the view
652    ///
653    pub async fn create_view_metadata(
654        &self,
655        view_info: RawTableInfo,
656        raw_logical_plan: Vec<u8>,
657        table_names: HashSet<TableName>,
658        columns: Vec<String>,
659        plan_columns: Vec<String>,
660        definition: String,
661    ) -> Result<()> {
662        let view_id = view_info.ident.table_id;
663
664        // Creates view name.
665        let view_name = TableNameKey::new(
666            &view_info.catalog_name,
667            &view_info.schema_name,
668            &view_info.name,
669        );
670        let create_table_name_txn = self
671            .table_name_manager()
672            .build_create_txn(&view_name, view_id)?;
673
674        // Creates table info.
675        let table_info_value = TableInfoValue::new(view_info);
676
677        let (create_table_info_txn, on_create_table_info_failure) = self
678            .table_info_manager()
679            .build_create_txn(view_id, &table_info_value)?;
680
681        // Creates view info
682        let view_info_value = ViewInfoValue::new(
683            raw_logical_plan,
684            table_names,
685            columns,
686            plan_columns,
687            definition,
688        );
689        let (create_view_info_txn, on_create_view_info_failure) = self
690            .view_info_manager()
691            .build_create_txn(view_id, &view_info_value)?;
692
693        let txn = Txn::merge_all(vec![
694            create_table_name_txn,
695            create_table_info_txn,
696            create_view_info_txn,
697        ]);
698
699        let mut r = self.kv_backend.txn(txn).await?;
700
701        // Checks whether metadata was already created.
702        if !r.succeeded {
703            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
704            let remote_table_info = on_create_table_info_failure(&mut set)?
705                .context(error::UnexpectedSnafu {
706                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
707                })?
708                .into_inner();
709
710            let remote_view_info = on_create_view_info_failure(&mut set)?
711                .context(error::UnexpectedSnafu {
712                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
713                })?
714                .into_inner();
715
716            let op_name = "the creating view metadata";
717            ensure_values!(remote_table_info, table_info_value, op_name);
718            ensure_values!(remote_view_info, view_info_value, op_name);
719        }
720
721        Ok(())
722    }
723
724    /// Creates metadata for table and returns an error if different metadata exists.
725    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
726    pub async fn create_table_metadata(
727        &self,
728        mut table_info: RawTableInfo,
729        table_route_value: TableRouteValue,
730        region_wal_options: HashMap<RegionNumber, String>,
731    ) -> Result<()> {
732        let region_numbers = table_route_value.region_numbers();
733        table_info.meta.region_numbers = region_numbers;
734        let table_id = table_info.ident.table_id;
735        let engine = table_info.meta.engine.clone();
736
737        // Creates table name.
738        let table_name = TableNameKey::new(
739            &table_info.catalog_name,
740            &table_info.schema_name,
741            &table_info.name,
742        );
743        let create_table_name_txn = self
744            .table_name_manager()
745            .build_create_txn(&table_name, table_id)?;
746
747        let region_options = table_info.to_region_options();
748        // Creates table info.
749        let table_info_value = TableInfoValue::new(table_info);
750        let (create_table_info_txn, on_create_table_info_failure) = self
751            .table_info_manager()
752            .build_create_txn(table_id, &table_info_value)?;
753
754        let (create_table_route_txn, on_create_table_route_failure) = self
755            .table_route_manager()
756            .table_route_storage()
757            .build_create_txn(table_id, &table_route_value)?;
758
759        let create_topic_region_txn = self
760            .topic_region_manager
761            .build_create_txn(table_id, &region_wal_options)?;
762
763        let mut txn = Txn::merge_all(vec![
764            create_table_name_txn,
765            create_table_info_txn,
766            create_table_route_txn,
767            create_topic_region_txn,
768        ]);
769
770        if let TableRouteValue::Physical(x) = &table_route_value {
771            let region_storage_path = table_info_value.region_storage_path();
772            let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
773                table_id,
774                &engine,
775                &region_storage_path,
776                region_options,
777                region_wal_options,
778                region_distribution(&x.region_routes),
779            )?;
780            txn = txn.merge(create_datanode_table_txn);
781        }
782
783        let mut r = self.kv_backend.txn(txn).await?;
784
785        // Checks whether metadata was already created.
786        if !r.succeeded {
787            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
788            let remote_table_info = on_create_table_info_failure(&mut set)?
789                .context(error::UnexpectedSnafu {
790                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
791                })?
792                .into_inner();
793
794            let remote_table_route = on_create_table_route_failure(&mut set)?
795                .context(error::UnexpectedSnafu {
796                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
797                })?
798                .into_inner();
799
800            let op_name = "the creating table metadata";
801            ensure_values!(remote_table_info, table_info_value, op_name);
802            ensure_values!(remote_table_route, table_route_value, op_name);
803        }
804
805        Ok(())
806    }
807
808    pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
809        // The batch size is max_txn_size / 3 because the size of the `tables_data`
810        // is 3 times the size of the `tables_data`.
811        self.kv_backend.max_txn_ops() / 3
812    }
813
814    /// Creates metadata for multiple logical tables and return an error if different metadata exists.
815    pub async fn create_logical_tables_metadata(
816        &self,
817        tables_data: Vec<(RawTableInfo, TableRouteValue)>,
818    ) -> Result<()> {
819        let len = tables_data.len();
820        let mut txns = Vec::with_capacity(3 * len);
821        struct OnFailure<F1, R1, F2, R2>
822        where
823            F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
824            F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
825        {
826            table_info_value: TableInfoValue,
827            on_create_table_info_failure: F1,
828            table_route_value: TableRouteValue,
829            on_create_table_route_failure: F2,
830        }
831        let mut on_failures = Vec::with_capacity(len);
832        for (mut table_info, table_route_value) in tables_data {
833            table_info.meta.region_numbers = table_route_value.region_numbers();
834            let table_id = table_info.ident.table_id;
835
836            // Creates table name.
837            let table_name = TableNameKey::new(
838                &table_info.catalog_name,
839                &table_info.schema_name,
840                &table_info.name,
841            );
842            let create_table_name_txn = self
843                .table_name_manager()
844                .build_create_txn(&table_name, table_id)?;
845            txns.push(create_table_name_txn);
846
847            // Creates table info.
848            let table_info_value = TableInfoValue::new(table_info);
849            let (create_table_info_txn, on_create_table_info_failure) =
850                self.table_info_manager()
851                    .build_create_txn(table_id, &table_info_value)?;
852            txns.push(create_table_info_txn);
853
854            let (create_table_route_txn, on_create_table_route_failure) = self
855                .table_route_manager()
856                .table_route_storage()
857                .build_create_txn(table_id, &table_route_value)?;
858            txns.push(create_table_route_txn);
859
860            on_failures.push(OnFailure {
861                table_info_value,
862                on_create_table_info_failure,
863                table_route_value,
864                on_create_table_route_failure,
865            });
866        }
867
868        let txn = Txn::merge_all(txns);
869        let mut r = self.kv_backend.txn(txn).await?;
870
871        // Checks whether metadata was already created.
872        if !r.succeeded {
873            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
874            for on_failure in on_failures {
875                let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
876                    .context(error::UnexpectedSnafu {
877                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
878                    })?
879                    .into_inner();
880
881                let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
882                    .context(error::UnexpectedSnafu {
883                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
884                    })?
885                    .into_inner();
886
887                let op_name = "the creating logical tables metadata";
888                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
889                ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
890            }
891        }
892
893        Ok(())
894    }
895
896    fn table_metadata_keys(
897        &self,
898        table_id: TableId,
899        table_name: &TableName,
900        table_route_value: &TableRouteValue,
901        region_wal_options: &HashMap<RegionNumber, WalOptions>,
902    ) -> Result<Vec<Vec<u8>>> {
903        // Builds keys
904        let datanode_ids = if table_route_value.is_physical() {
905            region_distribution(table_route_value.region_routes()?)
906                .into_keys()
907                .collect()
908        } else {
909            vec![]
910        };
911        let mut keys = Vec::with_capacity(3 + datanode_ids.len());
912        let table_name = TableNameKey::new(
913            &table_name.catalog_name,
914            &table_name.schema_name,
915            &table_name.table_name,
916        );
917        let table_info_key = TableInfoKey::new(table_id);
918        let table_route_key = TableRouteKey::new(table_id);
919        let datanode_table_keys = datanode_ids
920            .into_iter()
921            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
922            .collect::<HashSet<_>>();
923        let topic_region_map = self
924            .topic_region_manager
925            .get_topic_region_mapping(table_id, region_wal_options);
926        let topic_region_keys = topic_region_map
927            .iter()
928            .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
929            .collect::<Vec<_>>();
930        keys.push(table_name.to_bytes());
931        keys.push(table_info_key.to_bytes());
932        keys.push(table_route_key.to_bytes());
933        for key in &datanode_table_keys {
934            keys.push(key.to_bytes());
935        }
936        for key in topic_region_keys {
937            keys.push(key.to_bytes());
938        }
939        Ok(keys)
940    }
941
942    /// Deletes metadata for table **logically**.
943    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
944    pub async fn delete_table_metadata(
945        &self,
946        table_id: TableId,
947        table_name: &TableName,
948        table_route_value: &TableRouteValue,
949        region_wal_options: &HashMap<RegionNumber, WalOptions>,
950    ) -> Result<()> {
951        let keys =
952            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
953        self.tombstone_manager.create(keys).await.map(|_| ())
954    }
955
956    /// Deletes metadata tombstone for table **permanently**.
957    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
958    pub async fn delete_table_metadata_tombstone(
959        &self,
960        table_id: TableId,
961        table_name: &TableName,
962        table_route_value: &TableRouteValue,
963        region_wal_options: &HashMap<RegionNumber, WalOptions>,
964    ) -> Result<()> {
965        let table_metadata_keys =
966            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
967        self.tombstone_manager
968            .delete(table_metadata_keys)
969            .await
970            .map(|_| ())
971    }
972
973    /// Restores metadata for table.
974    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
975    pub async fn restore_table_metadata(
976        &self,
977        table_id: TableId,
978        table_name: &TableName,
979        table_route_value: &TableRouteValue,
980        region_wal_options: &HashMap<RegionNumber, WalOptions>,
981    ) -> Result<()> {
982        let keys =
983            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
984        self.tombstone_manager.restore(keys).await.map(|_| ())
985    }
986
987    /// Deletes metadata for table **permanently**.
988    /// The caller MUST ensure it has the exclusive access to `TableNameKey`.
989    pub async fn destroy_table_metadata(
990        &self,
991        table_id: TableId,
992        table_name: &TableName,
993        table_route_value: &TableRouteValue,
994        region_wal_options: &HashMap<RegionNumber, WalOptions>,
995    ) -> Result<()> {
996        let keys =
997            self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
998        let _ = self
999            .kv_backend
1000            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1001            .await?;
1002        Ok(())
1003    }
1004
1005    fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1006        let mut keys = Vec::with_capacity(3);
1007        let view_name = TableNameKey::new(
1008            &view_name.catalog_name,
1009            &view_name.schema_name,
1010            &view_name.table_name,
1011        );
1012        let table_info_key = TableInfoKey::new(view_id);
1013        let view_info_key = ViewInfoKey::new(view_id);
1014        keys.push(view_name.to_bytes());
1015        keys.push(table_info_key.to_bytes());
1016        keys.push(view_info_key.to_bytes());
1017
1018        Ok(keys)
1019    }
1020
1021    /// Deletes metadata for view **permanently**.
1022    /// The caller MUST ensure it has the exclusive access to `ViewNameKey`.
1023    pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1024        let keys = self.view_info_keys(view_id, view_name)?;
1025        let _ = self
1026            .kv_backend
1027            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1028            .await?;
1029        Ok(())
1030    }
1031
1032    /// Renames the table name and returns an error if different metadata exists.
1033    /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
1034    /// and the new `TableNameKey` MUST be empty.
1035    pub async fn rename_table(
1036        &self,
1037        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1038        new_table_name: String,
1039    ) -> Result<()> {
1040        let current_table_info = &current_table_info_value.table_info;
1041        let table_id = current_table_info.ident.table_id;
1042
1043        let table_name_key = TableNameKey::new(
1044            &current_table_info.catalog_name,
1045            &current_table_info.schema_name,
1046            &current_table_info.name,
1047        );
1048
1049        let new_table_name_key = TableNameKey::new(
1050            &current_table_info.catalog_name,
1051            &current_table_info.schema_name,
1052            &new_table_name,
1053        );
1054
1055        // Updates table name.
1056        let update_table_name_txn = self.table_name_manager().build_update_txn(
1057            &table_name_key,
1058            &new_table_name_key,
1059            table_id,
1060        )?;
1061
1062        let new_table_info_value = current_table_info_value
1063            .inner
1064            .with_update(move |table_info| {
1065                table_info.name = new_table_name;
1066            });
1067
1068        // Updates table info.
1069        let (update_table_info_txn, on_update_table_info_failure) = self
1070            .table_info_manager()
1071            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1072
1073        let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1074
1075        let mut r = self.kv_backend.txn(txn).await?;
1076
1077        // Checks whether metadata was already updated.
1078        if !r.succeeded {
1079            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1080            let remote_table_info = on_update_table_info_failure(&mut set)?
1081                .context(error::UnexpectedSnafu {
1082                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1083                })?
1084                .into_inner();
1085
1086            let op_name = "the renaming table metadata";
1087            ensure_values!(remote_table_info, new_table_info_value, op_name);
1088        }
1089
1090        Ok(())
1091    }
1092
1093    /// Updates table info and returns an error if different metadata exists.
1094    /// And cascade-ly update all redundant table options for each region
1095    /// if region_distribution is present.
1096    pub async fn update_table_info(
1097        &self,
1098        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1099        region_distribution: Option<RegionDistribution>,
1100        new_table_info: RawTableInfo,
1101    ) -> Result<()> {
1102        let table_id = current_table_info_value.table_info.ident.table_id;
1103        let new_table_info_value = current_table_info_value.update(new_table_info);
1104
1105        // Updates table info.
1106        let (update_table_info_txn, on_update_table_info_failure) = self
1107            .table_info_manager()
1108            .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1109
1110        let txn = if let Some(region_distribution) = region_distribution {
1111            // region options induced from table info.
1112            let new_region_options = new_table_info_value.table_info.to_region_options();
1113            let update_datanode_table_options_txn = self
1114                .datanode_table_manager
1115                .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1116                .await?;
1117            Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1118        } else {
1119            update_table_info_txn
1120        };
1121
1122        let mut r = self.kv_backend.txn(txn).await?;
1123        // Checks whether metadata was already updated.
1124        if !r.succeeded {
1125            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1126            let remote_table_info = on_update_table_info_failure(&mut set)?
1127                .context(error::UnexpectedSnafu {
1128                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
1129                })?
1130                .into_inner();
1131
1132            let op_name = "the updating table info";
1133            ensure_values!(remote_table_info, new_table_info_value, op_name);
1134        }
1135        Ok(())
1136    }
1137
1138    /// Updates view info and returns an error if different metadata exists.
1139    /// Parameters include:
1140    /// - `view_id`: the view id
1141    /// - `current_view_info_value`: the current view info for CAS checking
1142    /// - `new_view_info`: the encoded logical plan
1143    /// - `table_names`: the resolved fully table names in logical plan
1144    /// - `columns`: the view columns
1145    /// - `plan_columns`: the original plan columns
1146    /// - `definition`: The SQL to create the view
1147    ///
1148    #[allow(clippy::too_many_arguments)]
1149    pub async fn update_view_info(
1150        &self,
1151        view_id: TableId,
1152        current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1153        new_view_info: Vec<u8>,
1154        table_names: HashSet<TableName>,
1155        columns: Vec<String>,
1156        plan_columns: Vec<String>,
1157        definition: String,
1158    ) -> Result<()> {
1159        let new_view_info_value = current_view_info_value.update(
1160            new_view_info,
1161            table_names,
1162            columns,
1163            plan_columns,
1164            definition,
1165        );
1166
1167        // Updates view info.
1168        let (update_view_info_txn, on_update_view_info_failure) = self
1169            .view_info_manager()
1170            .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1171
1172        let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1173
1174        // Checks whether metadata was already updated.
1175        if !r.succeeded {
1176            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1177            let remote_view_info = on_update_view_info_failure(&mut set)?
1178                .context(error::UnexpectedSnafu {
1179                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
1180                })?
1181                .into_inner();
1182
1183            let op_name = "the updating view info";
1184            ensure_values!(remote_view_info, new_view_info_value, op_name);
1185        }
1186        Ok(())
1187    }
1188
1189    pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1190        self.kv_backend.max_txn_ops()
1191    }
1192
1193    pub async fn batch_update_table_info_values(
1194        &self,
1195        table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1196    ) -> Result<()> {
1197        let len = table_info_value_pairs.len();
1198        let mut txns = Vec::with_capacity(len);
1199        struct OnFailure<F, R>
1200        where
1201            F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1202        {
1203            table_info_value: TableInfoValue,
1204            on_update_table_info_failure: F,
1205        }
1206        let mut on_failures = Vec::with_capacity(len);
1207
1208        for (table_info_value, new_table_info) in table_info_value_pairs {
1209            let table_id = table_info_value.table_info.ident.table_id;
1210
1211            let new_table_info_value = table_info_value.update(new_table_info);
1212
1213            let (update_table_info_txn, on_update_table_info_failure) =
1214                self.table_info_manager().build_update_txn(
1215                    table_id,
1216                    &table_info_value,
1217                    &new_table_info_value,
1218                )?;
1219
1220            txns.push(update_table_info_txn);
1221
1222            on_failures.push(OnFailure {
1223                table_info_value: new_table_info_value,
1224                on_update_table_info_failure,
1225            });
1226        }
1227
1228        let txn = Txn::merge_all(txns);
1229        let mut r = self.kv_backend.txn(txn).await?;
1230
1231        if !r.succeeded {
1232            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1233            for on_failure in on_failures {
1234                let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1235                    .context(error::UnexpectedSnafu {
1236                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
1237                    })?
1238                    .into_inner();
1239
1240                let op_name = "the batch updating table info";
1241                ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1242            }
1243        }
1244
1245        Ok(())
1246    }
1247
1248    pub async fn update_table_route(
1249        &self,
1250        table_id: TableId,
1251        region_info: RegionInfo,
1252        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1253        new_region_routes: Vec<RegionRoute>,
1254        new_region_options: &HashMap<String, String>,
1255        new_region_wal_options: &HashMap<RegionNumber, String>,
1256    ) -> Result<()> {
1257        // Updates the datanode table key value pairs.
1258        let current_region_distribution =
1259            region_distribution(current_table_route_value.region_routes()?);
1260        let new_region_distribution = region_distribution(&new_region_routes);
1261
1262        let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1263            table_id,
1264            region_info,
1265            current_region_distribution,
1266            new_region_distribution,
1267            new_region_options,
1268            new_region_wal_options,
1269        )?;
1270
1271        // Updates the table_route.
1272        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1273
1274        let (update_table_route_txn, on_update_table_route_failure) = self
1275            .table_route_manager()
1276            .table_route_storage()
1277            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1278
1279        let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1280
1281        let mut r = self.kv_backend.txn(txn).await?;
1282
1283        // Checks whether metadata was already updated.
1284        if !r.succeeded {
1285            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1286            let remote_table_route = on_update_table_route_failure(&mut set)?
1287                .context(error::UnexpectedSnafu {
1288                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
1289                })?
1290                .into_inner();
1291
1292            let op_name = "the updating table route";
1293            ensure_values!(remote_table_route, new_table_route_value, op_name);
1294        }
1295
1296        Ok(())
1297    }
1298
1299    /// Updates the leader status of the [RegionRoute].
1300    pub async fn update_leader_region_status<F>(
1301        &self,
1302        table_id: TableId,
1303        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1304        next_region_route_status: F,
1305    ) -> Result<()>
1306    where
1307        F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1308    {
1309        let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1310
1311        let mut updated = 0;
1312        for route in &mut new_region_routes {
1313            if let Some(state) = next_region_route_status(route) {
1314                if route.set_leader_state(state) {
1315                    updated += 1;
1316                }
1317            }
1318        }
1319
1320        if updated == 0 {
1321            warn!("No leader status updated");
1322            return Ok(());
1323        }
1324
1325        // Updates the table_route.
1326        let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1327
1328        let (update_table_route_txn, on_update_table_route_failure) = self
1329            .table_route_manager()
1330            .table_route_storage()
1331            .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1332
1333        let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1334
1335        // Checks whether metadata was already updated.
1336        if !r.succeeded {
1337            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1338            let remote_table_route = on_update_table_route_failure(&mut set)?
1339                .context(error::UnexpectedSnafu {
1340                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1341                })?
1342                .into_inner();
1343
1344            let op_name = "the updating leader region status";
1345            ensure_values!(remote_table_route, new_table_route_value, op_name);
1346        }
1347
1348        Ok(())
1349    }
1350}
1351
1352#[macro_export]
1353macro_rules! impl_metadata_value {
1354    ($($val_ty: ty), *) => {
1355        $(
1356            impl $crate::key::MetadataValue for $val_ty {
1357                fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1358                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1359                }
1360
1361                fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1362                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1363                }
1364            }
1365        )*
1366    }
1367}
1368
1369macro_rules! impl_metadata_key_get_txn_op {
1370    ($($key: ty), *) => {
1371        $(
1372            impl $crate::key::MetadataKeyGetTxnOp for $key {
1373                /// Returns a [TxnOp] to retrieve the corresponding value
1374                /// and a filter to retrieve the value from the [TxnOpGetResponseSet]
1375                fn build_get_op(
1376                    &self,
1377                ) -> (
1378                    TxnOp,
1379                    impl for<'a> FnMut(
1380                        &'a mut TxnOpGetResponseSet,
1381                    ) -> Option<Vec<u8>>,
1382                ) {
1383                    let raw_key = self.to_bytes();
1384                    (
1385                        TxnOp::Get(raw_key.clone()),
1386                        TxnOpGetResponseSet::filter(raw_key),
1387                    )
1388                }
1389            }
1390        )*
1391    }
1392}
1393
1394impl_metadata_key_get_txn_op! {
1395    TableNameKey<'_>,
1396    TableInfoKey,
1397    ViewInfoKey,
1398    TableRouteKey,
1399    DatanodeTableKey
1400}
1401
1402#[macro_export]
1403macro_rules! impl_optional_metadata_value {
1404    ($($val_ty: ty), *) => {
1405        $(
1406            impl $val_ty {
1407                pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1408                    serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1409                }
1410
1411                pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1412                    serde_json::to_vec(self).context(SerdeJsonSnafu)
1413                }
1414            }
1415        )*
1416    }
1417}
1418
1419impl_metadata_value! {
1420    TableNameValue,
1421    TableInfoValue,
1422    ViewInfoValue,
1423    DatanodeTableValue,
1424    FlowInfoValue,
1425    FlowNameValue,
1426    FlowRouteValue,
1427    TableFlowValue,
1428    NodeAddressValue,
1429    SchemaNameValue,
1430    FlowStateValue,
1431    PoisonValue
1432}
1433
1434impl_optional_metadata_value! {
1435    CatalogNameValue,
1436    SchemaNameValue
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use std::collections::{BTreeMap, HashMap, HashSet};
1442    use std::sync::Arc;
1443
1444    use bytes::Bytes;
1445    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1446    use common_time::util::current_time_millis;
1447    use common_wal::options::{KafkaWalOptions, WalOptions};
1448    use futures::TryStreamExt;
1449    use store_api::storage::{RegionId, RegionNumber};
1450    use table::metadata::{RawTableInfo, TableInfo};
1451    use table::table_name::TableName;
1452
1453    use super::datanode_table::DatanodeTableKey;
1454    use super::test_utils;
1455    use crate::ddl::test_util::create_table::test_create_table_task;
1456    use crate::ddl::utils::region_storage_path;
1457    use crate::error::Result;
1458    use crate::key::datanode_table::RegionInfo;
1459    use crate::key::table_info::TableInfoValue;
1460    use crate::key::table_name::TableNameKey;
1461    use crate::key::table_route::TableRouteValue;
1462    use crate::key::{
1463        DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1464        ViewInfoValue, TOPIC_REGION_PREFIX,
1465    };
1466    use crate::kv_backend::memory::MemoryKvBackend;
1467    use crate::kv_backend::KvBackend;
1468    use crate::peer::Peer;
1469    use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1470    use crate::rpc::store::RangeRequest;
1471    use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1472
1473    #[test]
1474    fn test_deserialized_value_with_bytes() {
1475        let region_route = new_test_region_route();
1476        let region_routes = vec![region_route.clone()];
1477
1478        let expected_region_routes =
1479            TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1480        let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1481
1482        // Serialize behaviors:
1483        // The inner field will be ignored.
1484        let value = DeserializedValueWithBytes {
1485            // ignored
1486            inner: TableRouteValue::physical(region_routes.clone()),
1487            bytes: Bytes::from(expected.clone()),
1488        };
1489
1490        let encoded = serde_json::to_vec(&value).unwrap();
1491
1492        // Deserialize behaviors:
1493        // The inner field will be deserialized from the bytes field.
1494        let decoded: DeserializedValueWithBytes<TableRouteValue> =
1495            serde_json::from_slice(&encoded).unwrap();
1496
1497        assert_eq!(decoded.inner, expected_region_routes);
1498        assert_eq!(decoded.bytes, expected);
1499    }
1500
1501    fn new_test_region_route() -> RegionRoute {
1502        new_region_route(1, 2)
1503    }
1504
1505    fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1506        RegionRoute {
1507            region: Region {
1508                id: region_id.into(),
1509                name: "r1".to_string(),
1510                partition: None,
1511                attrs: BTreeMap::new(),
1512            },
1513            leader_peer: Some(Peer::new(datanode, "a2")),
1514            follower_peers: vec![],
1515            leader_state: None,
1516            leader_down_since: None,
1517        }
1518    }
1519
1520    fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1521        test_utils::new_test_table_info(10, region_numbers)
1522    }
1523
1524    fn new_test_table_names() -> HashSet<TableName> {
1525        let mut set = HashSet::new();
1526        set.insert(TableName {
1527            catalog_name: "greptime".to_string(),
1528            schema_name: "public".to_string(),
1529            table_name: "a_table".to_string(),
1530        });
1531        set.insert(TableName {
1532            catalog_name: "greptime".to_string(),
1533            schema_name: "public".to_string(),
1534            table_name: "b_table".to_string(),
1535        });
1536        set
1537    }
1538
1539    async fn create_physical_table_metadata(
1540        table_metadata_manager: &TableMetadataManager,
1541        table_info: RawTableInfo,
1542        region_routes: Vec<RegionRoute>,
1543        region_wal_options: HashMap<RegionNumber, String>,
1544    ) -> Result<()> {
1545        table_metadata_manager
1546            .create_table_metadata(
1547                table_info,
1548                TableRouteValue::physical(region_routes),
1549                region_wal_options,
1550            )
1551            .await
1552    }
1553
1554    fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1555        let topics = (0..2)
1556            .map(|i| format!("greptimedb_topic{}", i))
1557            .collect::<Vec<_>>();
1558        let wal_options = topics
1559            .iter()
1560            .map(|topic| {
1561                WalOptions::Kafka(KafkaWalOptions {
1562                    topic: topic.clone(),
1563                })
1564            })
1565            .collect::<Vec<_>>();
1566
1567        (0..16)
1568            .enumerate()
1569            .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1570            .collect()
1571    }
1572
1573    #[tokio::test]
1574    async fn test_raft_engine_topic_region_map() {
1575        let mem_kv = Arc::new(MemoryKvBackend::default());
1576        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1577        let region_route = new_test_region_route();
1578        let region_routes = &vec![region_route.clone()];
1579        let table_info: RawTableInfo =
1580            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1581        let wal_allocator = WalOptionsAllocator::RaftEngine;
1582        let regions = (0..16).collect();
1583        let region_wal_options =
1584            allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1585        create_physical_table_metadata(
1586            &table_metadata_manager,
1587            table_info.clone(),
1588            region_routes.clone(),
1589            region_wal_options.clone(),
1590        )
1591        .await
1592        .unwrap();
1593
1594        let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1595        let range_req = RangeRequest::new().with_prefix(topic_region_key);
1596        let resp = mem_kv.range(range_req).await.unwrap();
1597        // Should be empty because the topic region map is empty for raft engine.
1598        assert!(resp.kvs.is_empty());
1599    }
1600
1601    #[tokio::test]
1602    async fn test_create_table_metadata() {
1603        let mem_kv = Arc::new(MemoryKvBackend::default());
1604        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1605        let region_route = new_test_region_route();
1606        let region_routes = &vec![region_route.clone()];
1607        let table_info: RawTableInfo =
1608            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1609        let region_wal_options = create_mock_region_wal_options()
1610            .into_iter()
1611            .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1612            .collect::<HashMap<_, _>>();
1613
1614        // creates metadata.
1615        create_physical_table_metadata(
1616            &table_metadata_manager,
1617            table_info.clone(),
1618            region_routes.clone(),
1619            region_wal_options.clone(),
1620        )
1621        .await
1622        .unwrap();
1623
1624        // if metadata was already created, it should be ok.
1625        assert!(create_physical_table_metadata(
1626            &table_metadata_manager,
1627            table_info.clone(),
1628            region_routes.clone(),
1629            region_wal_options.clone(),
1630        )
1631        .await
1632        .is_ok());
1633
1634        let mut modified_region_routes = region_routes.clone();
1635        modified_region_routes.push(region_route.clone());
1636        // if remote metadata was exists, it should return an error.
1637        assert!(create_physical_table_metadata(
1638            &table_metadata_manager,
1639            table_info.clone(),
1640            modified_region_routes,
1641            region_wal_options.clone(),
1642        )
1643        .await
1644        .is_err());
1645
1646        let (remote_table_info, remote_table_route) = table_metadata_manager
1647            .get_full_table_info(10)
1648            .await
1649            .unwrap();
1650
1651        assert_eq!(
1652            remote_table_info.unwrap().into_inner().table_info,
1653            table_info
1654        );
1655        assert_eq!(
1656            remote_table_route
1657                .unwrap()
1658                .into_inner()
1659                .region_routes()
1660                .unwrap(),
1661            region_routes
1662        );
1663
1664        for i in 0..2 {
1665            let region_number = i as u32;
1666            let region_id = RegionId::new(table_info.ident.table_id, region_number);
1667            let topic = format!("greptimedb_topic{}", i);
1668            let regions = table_metadata_manager
1669                .topic_region_manager
1670                .regions(&topic)
1671                .await
1672                .unwrap();
1673            assert_eq!(regions.len(), 8);
1674            assert_eq!(regions[0], region_id);
1675        }
1676    }
1677
1678    #[tokio::test]
1679    async fn test_create_logic_tables_metadata() {
1680        let mem_kv = Arc::new(MemoryKvBackend::default());
1681        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1682        let region_route = new_test_region_route();
1683        let region_routes = vec![region_route.clone()];
1684        let table_info: RawTableInfo =
1685            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1686        let table_id = table_info.ident.table_id;
1687        let table_route_value = TableRouteValue::physical(region_routes.clone());
1688
1689        let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1690        // creates metadata.
1691        table_metadata_manager
1692            .create_logical_tables_metadata(tables_data.clone())
1693            .await
1694            .unwrap();
1695
1696        // if metadata was already created, it should be ok.
1697        assert!(table_metadata_manager
1698            .create_logical_tables_metadata(tables_data)
1699            .await
1700            .is_ok());
1701
1702        let mut modified_region_routes = region_routes.clone();
1703        modified_region_routes.push(new_region_route(2, 3));
1704        let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1705        let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1706        // if remote metadata was exists, it should return an error.
1707        assert!(table_metadata_manager
1708            .create_logical_tables_metadata(modified_tables_data)
1709            .await
1710            .is_err());
1711
1712        let (remote_table_info, remote_table_route) = table_metadata_manager
1713            .get_full_table_info(table_id)
1714            .await
1715            .unwrap();
1716
1717        assert_eq!(
1718            remote_table_info.unwrap().into_inner().table_info,
1719            table_info
1720        );
1721        assert_eq!(
1722            remote_table_route
1723                .unwrap()
1724                .into_inner()
1725                .region_routes()
1726                .unwrap(),
1727            &region_routes
1728        );
1729    }
1730
1731    #[tokio::test]
1732    async fn test_create_many_logical_tables_metadata() {
1733        let kv_backend = Arc::new(MemoryKvBackend::default());
1734        let table_metadata_manager = TableMetadataManager::new(kv_backend);
1735
1736        let mut tables_data = vec![];
1737        for i in 0..128 {
1738            let table_id = i + 1;
1739            let regin_number = table_id * 3;
1740            let region_id = RegionId::new(table_id, regin_number);
1741            let region_route = new_region_route(region_id.as_u64(), 2);
1742            let region_routes = vec![region_route.clone()];
1743            let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1744                table_id,
1745                &format!("my_table_{}", table_id),
1746                region_routes.iter().map(|r| r.region.id.region_number()),
1747            )
1748            .into();
1749            let table_route_value = TableRouteValue::physical(region_routes.clone());
1750
1751            tables_data.push((table_info, table_route_value));
1752        }
1753
1754        // creates metadata.
1755        table_metadata_manager
1756            .create_logical_tables_metadata(tables_data)
1757            .await
1758            .unwrap();
1759    }
1760
1761    #[tokio::test]
1762    async fn test_delete_table_metadata() {
1763        let mem_kv = Arc::new(MemoryKvBackend::default());
1764        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1765        let region_route = new_test_region_route();
1766        let region_routes = &vec![region_route.clone()];
1767        let table_info: RawTableInfo =
1768            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1769        let table_id = table_info.ident.table_id;
1770        let datanode_id = 2;
1771        let region_wal_options = create_mock_region_wal_options();
1772        let serialized_region_wal_options = region_wal_options
1773            .iter()
1774            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1775            .collect::<HashMap<_, _>>();
1776
1777        // creates metadata.
1778        create_physical_table_metadata(
1779            &table_metadata_manager,
1780            table_info.clone(),
1781            region_routes.clone(),
1782            serialized_region_wal_options,
1783        )
1784        .await
1785        .unwrap();
1786
1787        let table_name = TableName::new(
1788            table_info.catalog_name,
1789            table_info.schema_name,
1790            table_info.name,
1791        );
1792        let table_route_value = &TableRouteValue::physical(region_routes.clone());
1793        // deletes metadata.
1794        table_metadata_manager
1795            .delete_table_metadata(
1796                table_id,
1797                &table_name,
1798                table_route_value,
1799                &region_wal_options,
1800            )
1801            .await
1802            .unwrap();
1803        // Should be ignored.
1804        table_metadata_manager
1805            .delete_table_metadata(
1806                table_id,
1807                &table_name,
1808                table_route_value,
1809                &region_wal_options,
1810            )
1811            .await
1812            .unwrap();
1813        assert!(table_metadata_manager
1814            .table_info_manager()
1815            .get(table_id)
1816            .await
1817            .unwrap()
1818            .is_none());
1819        assert!(table_metadata_manager
1820            .table_route_manager()
1821            .table_route_storage()
1822            .get(table_id)
1823            .await
1824            .unwrap()
1825            .is_none());
1826        assert!(table_metadata_manager
1827            .datanode_table_manager()
1828            .tables(datanode_id)
1829            .try_collect::<Vec<_>>()
1830            .await
1831            .unwrap()
1832            .is_empty());
1833        // Checks removed values
1834        let table_info = table_metadata_manager
1835            .table_info_manager()
1836            .get(table_id)
1837            .await
1838            .unwrap();
1839        assert!(table_info.is_none());
1840        let table_route = table_metadata_manager
1841            .table_route_manager()
1842            .table_route_storage()
1843            .get(table_id)
1844            .await
1845            .unwrap();
1846        assert!(table_route.is_none());
1847        // Logical delete removes the topic region mapping as well.
1848        let regions = table_metadata_manager
1849            .topic_region_manager
1850            .regions("greptimedb_topic0")
1851            .await
1852            .unwrap();
1853        assert_eq!(regions.len(), 0);
1854        let regions = table_metadata_manager
1855            .topic_region_manager
1856            .regions("greptimedb_topic1")
1857            .await
1858            .unwrap();
1859        assert_eq!(regions.len(), 0);
1860    }
1861
1862    #[tokio::test]
1863    async fn test_rename_table() {
1864        let mem_kv = Arc::new(MemoryKvBackend::default());
1865        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1866        let region_route = new_test_region_route();
1867        let region_routes = vec![region_route.clone()];
1868        let table_info: RawTableInfo =
1869            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1870        let table_id = table_info.ident.table_id;
1871        // creates metadata.
1872        create_physical_table_metadata(
1873            &table_metadata_manager,
1874            table_info.clone(),
1875            region_routes.clone(),
1876            HashMap::new(),
1877        )
1878        .await
1879        .unwrap();
1880
1881        let new_table_name = "another_name".to_string();
1882        let table_info_value =
1883            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1884
1885        table_metadata_manager
1886            .rename_table(&table_info_value, new_table_name.clone())
1887            .await
1888            .unwrap();
1889        // if remote metadata was updated, it should be ok.
1890        table_metadata_manager
1891            .rename_table(&table_info_value, new_table_name.clone())
1892            .await
1893            .unwrap();
1894        let mut modified_table_info = table_info.clone();
1895        modified_table_info.name = "hi".to_string();
1896        let modified_table_info_value =
1897            DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1898        // if the table_info_value is wrong, it should return an error.
1899        // The ABA problem.
1900        assert!(table_metadata_manager
1901            .rename_table(&modified_table_info_value, new_table_name.clone())
1902            .await
1903            .is_err());
1904
1905        let old_table_name = TableNameKey::new(
1906            &table_info.catalog_name,
1907            &table_info.schema_name,
1908            &table_info.name,
1909        );
1910        let new_table_name = TableNameKey::new(
1911            &table_info.catalog_name,
1912            &table_info.schema_name,
1913            &new_table_name,
1914        );
1915
1916        assert!(table_metadata_manager
1917            .table_name_manager()
1918            .get(old_table_name)
1919            .await
1920            .unwrap()
1921            .is_none());
1922
1923        assert_eq!(
1924            table_metadata_manager
1925                .table_name_manager()
1926                .get(new_table_name)
1927                .await
1928                .unwrap()
1929                .unwrap()
1930                .table_id(),
1931            table_id
1932        );
1933    }
1934
1935    #[tokio::test]
1936    async fn test_update_table_info() {
1937        let mem_kv = Arc::new(MemoryKvBackend::default());
1938        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1939        let region_route = new_test_region_route();
1940        let region_routes = vec![region_route.clone()];
1941        let table_info: RawTableInfo =
1942            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1943        let table_id = table_info.ident.table_id;
1944        // creates metadata.
1945        create_physical_table_metadata(
1946            &table_metadata_manager,
1947            table_info.clone(),
1948            region_routes.clone(),
1949            HashMap::new(),
1950        )
1951        .await
1952        .unwrap();
1953
1954        let mut new_table_info = table_info.clone();
1955        new_table_info.name = "hi".to_string();
1956        let current_table_info_value =
1957            DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1958        // should be ok.
1959        table_metadata_manager
1960            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1961            .await
1962            .unwrap();
1963        // if table info was updated, it should be ok.
1964        table_metadata_manager
1965            .update_table_info(&current_table_info_value, None, new_table_info.clone())
1966            .await
1967            .unwrap();
1968
1969        // updated table_info should equal the `new_table_info`
1970        let updated_table_info = table_metadata_manager
1971            .table_info_manager()
1972            .get(table_id)
1973            .await
1974            .unwrap()
1975            .unwrap()
1976            .into_inner();
1977        assert_eq!(updated_table_info.table_info, new_table_info);
1978
1979        let mut wrong_table_info = table_info.clone();
1980        wrong_table_info.name = "wrong".to_string();
1981        let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1982            current_table_info_value.update(wrong_table_info),
1983        );
1984        // if the current_table_info_value is wrong, it should return an error.
1985        // The ABA problem.
1986        assert!(table_metadata_manager
1987            .update_table_info(&wrong_table_info_value, None, new_table_info)
1988            .await
1989            .is_err())
1990    }
1991
1992    #[tokio::test]
1993    async fn test_update_table_leader_region_status() {
1994        let mem_kv = Arc::new(MemoryKvBackend::default());
1995        let table_metadata_manager = TableMetadataManager::new(mem_kv);
1996        let datanode = 1;
1997        let region_routes = vec![
1998            RegionRoute {
1999                region: Region {
2000                    id: 1.into(),
2001                    name: "r1".to_string(),
2002                    partition: None,
2003                    attrs: BTreeMap::new(),
2004                },
2005                leader_peer: Some(Peer::new(datanode, "a2")),
2006                leader_state: Some(LeaderState::Downgrading),
2007                follower_peers: vec![],
2008                leader_down_since: Some(current_time_millis()),
2009            },
2010            RegionRoute {
2011                region: Region {
2012                    id: 2.into(),
2013                    name: "r2".to_string(),
2014                    partition: None,
2015                    attrs: BTreeMap::new(),
2016                },
2017                leader_peer: Some(Peer::new(datanode, "a1")),
2018                leader_state: None,
2019                follower_peers: vec![],
2020                leader_down_since: None,
2021            },
2022        ];
2023        let table_info: RawTableInfo =
2024            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2025        let table_id = table_info.ident.table_id;
2026        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2027            TableRouteValue::physical(region_routes.clone()),
2028        );
2029
2030        // creates metadata.
2031        create_physical_table_metadata(
2032            &table_metadata_manager,
2033            table_info.clone(),
2034            region_routes.clone(),
2035            HashMap::new(),
2036        )
2037        .await
2038        .unwrap();
2039
2040        table_metadata_manager
2041            .update_leader_region_status(table_id, &current_table_route_value, |region_route| {
2042                if region_route.leader_state.is_some() {
2043                    None
2044                } else {
2045                    Some(Some(LeaderState::Downgrading))
2046                }
2047            })
2048            .await
2049            .unwrap();
2050
2051        let updated_route_value = table_metadata_manager
2052            .table_route_manager()
2053            .table_route_storage()
2054            .get(table_id)
2055            .await
2056            .unwrap()
2057            .unwrap();
2058
2059        assert_eq!(
2060            updated_route_value.region_routes().unwrap()[0].leader_state,
2061            Some(LeaderState::Downgrading)
2062        );
2063
2064        assert!(updated_route_value.region_routes().unwrap()[0]
2065            .leader_down_since
2066            .is_some());
2067
2068        assert_eq!(
2069            updated_route_value.region_routes().unwrap()[1].leader_state,
2070            Some(LeaderState::Downgrading)
2071        );
2072        assert!(updated_route_value.region_routes().unwrap()[1]
2073            .leader_down_since
2074            .is_some());
2075    }
2076
2077    async fn assert_datanode_table(
2078        table_metadata_manager: &TableMetadataManager,
2079        table_id: u32,
2080        region_routes: &[RegionRoute],
2081    ) {
2082        let region_distribution = region_distribution(region_routes);
2083        for (datanode, regions) in region_distribution {
2084            let got = table_metadata_manager
2085                .datanode_table_manager()
2086                .get(&DatanodeTableKey::new(datanode, table_id))
2087                .await
2088                .unwrap()
2089                .unwrap();
2090
2091            assert_eq!(got.regions, regions.leader_regions);
2092            assert_eq!(got.follower_regions, regions.follower_regions);
2093        }
2094    }
2095
2096    #[tokio::test]
2097    async fn test_update_table_route() {
2098        let mem_kv = Arc::new(MemoryKvBackend::default());
2099        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2100        let region_route = new_test_region_route();
2101        let region_routes = vec![region_route.clone()];
2102        let table_info: RawTableInfo =
2103            new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2104        let table_id = table_info.ident.table_id;
2105        let engine = table_info.meta.engine.as_str();
2106        let region_storage_path =
2107            region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2108        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2109            TableRouteValue::physical(region_routes.clone()),
2110        );
2111
2112        // creates metadata.
2113        create_physical_table_metadata(
2114            &table_metadata_manager,
2115            table_info.clone(),
2116            region_routes.clone(),
2117            HashMap::new(),
2118        )
2119        .await
2120        .unwrap();
2121
2122        assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
2123        let new_region_routes = vec![
2124            new_region_route(1, 1),
2125            new_region_route(2, 2),
2126            new_region_route(3, 3),
2127        ];
2128        // it should be ok.
2129        table_metadata_manager
2130            .update_table_route(
2131                table_id,
2132                RegionInfo {
2133                    engine: engine.to_string(),
2134                    region_storage_path: region_storage_path.to_string(),
2135                    region_options: HashMap::new(),
2136                    region_wal_options: HashMap::new(),
2137                },
2138                &current_table_route_value,
2139                new_region_routes.clone(),
2140                &HashMap::new(),
2141                &HashMap::new(),
2142            )
2143            .await
2144            .unwrap();
2145        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2146
2147        // if the table route was updated. it should be ok.
2148        table_metadata_manager
2149            .update_table_route(
2150                table_id,
2151                RegionInfo {
2152                    engine: engine.to_string(),
2153                    region_storage_path: region_storage_path.to_string(),
2154                    region_options: HashMap::new(),
2155                    region_wal_options: HashMap::new(),
2156                },
2157                &current_table_route_value,
2158                new_region_routes.clone(),
2159                &HashMap::new(),
2160                &HashMap::new(),
2161            )
2162            .await
2163            .unwrap();
2164
2165        let current_table_route_value = DeserializedValueWithBytes::from_inner(
2166            current_table_route_value
2167                .inner
2168                .update(new_region_routes.clone())
2169                .unwrap(),
2170        );
2171        let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2172        // it should be ok.
2173        table_metadata_manager
2174            .update_table_route(
2175                table_id,
2176                RegionInfo {
2177                    engine: engine.to_string(),
2178                    region_storage_path: region_storage_path.to_string(),
2179                    region_options: HashMap::new(),
2180                    region_wal_options: HashMap::new(),
2181                },
2182                &current_table_route_value,
2183                new_region_routes.clone(),
2184                &HashMap::new(),
2185                &HashMap::new(),
2186            )
2187            .await
2188            .unwrap();
2189        assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2190
2191        // if the current_table_route_value is wrong, it should return an error.
2192        // The ABA problem.
2193        let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2194            current_table_route_value
2195                .update(vec![
2196                    new_region_route(1, 1),
2197                    new_region_route(2, 2),
2198                    new_region_route(3, 3),
2199                    new_region_route(4, 4),
2200                ])
2201                .unwrap(),
2202        );
2203        assert!(table_metadata_manager
2204            .update_table_route(
2205                table_id,
2206                RegionInfo {
2207                    engine: engine.to_string(),
2208                    region_storage_path: region_storage_path.to_string(),
2209                    region_options: HashMap::new(),
2210                    region_wal_options: HashMap::new(),
2211                },
2212                &wrong_table_route_value,
2213                new_region_routes,
2214                &HashMap::new(),
2215                &HashMap::new(),
2216            )
2217            .await
2218            .is_err());
2219    }
2220
2221    #[tokio::test]
2222    async fn test_destroy_table_metadata() {
2223        let mem_kv = Arc::new(MemoryKvBackend::default());
2224        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2225        let table_id = 1025;
2226        let table_name = "foo";
2227        let task = test_create_table_task(table_name, table_id);
2228        let options = create_mock_region_wal_options();
2229        let serialized_options = options
2230            .iter()
2231            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2232            .collect::<HashMap<_, _>>();
2233        table_metadata_manager
2234            .create_table_metadata(
2235                task.table_info,
2236                TableRouteValue::physical(vec![
2237                    RegionRoute {
2238                        region: Region::new_test(RegionId::new(table_id, 1)),
2239                        leader_peer: Some(Peer::empty(1)),
2240                        follower_peers: vec![Peer::empty(5)],
2241                        leader_state: None,
2242                        leader_down_since: None,
2243                    },
2244                    RegionRoute {
2245                        region: Region::new_test(RegionId::new(table_id, 2)),
2246                        leader_peer: Some(Peer::empty(2)),
2247                        follower_peers: vec![Peer::empty(4)],
2248                        leader_state: None,
2249                        leader_down_since: None,
2250                    },
2251                    RegionRoute {
2252                        region: Region::new_test(RegionId::new(table_id, 3)),
2253                        leader_peer: Some(Peer::empty(3)),
2254                        follower_peers: vec![],
2255                        leader_state: None,
2256                        leader_down_since: None,
2257                    },
2258                ]),
2259                serialized_options,
2260            )
2261            .await
2262            .unwrap();
2263        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2264        let table_route_value = table_metadata_manager
2265            .table_route_manager
2266            .table_route_storage()
2267            .get_with_raw_bytes(table_id)
2268            .await
2269            .unwrap()
2270            .unwrap();
2271        table_metadata_manager
2272            .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2273            .await
2274            .unwrap();
2275        assert!(mem_kv.is_empty());
2276    }
2277
2278    #[tokio::test]
2279    async fn test_restore_table_metadata() {
2280        let mem_kv = Arc::new(MemoryKvBackend::default());
2281        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2282        let table_id = 1025;
2283        let table_name = "foo";
2284        let task = test_create_table_task(table_name, table_id);
2285        let options = create_mock_region_wal_options();
2286        let serialized_options = options
2287            .iter()
2288            .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2289            .collect::<HashMap<_, _>>();
2290        table_metadata_manager
2291            .create_table_metadata(
2292                task.table_info,
2293                TableRouteValue::physical(vec![
2294                    RegionRoute {
2295                        region: Region::new_test(RegionId::new(table_id, 1)),
2296                        leader_peer: Some(Peer::empty(1)),
2297                        follower_peers: vec![Peer::empty(5)],
2298                        leader_state: None,
2299                        leader_down_since: None,
2300                    },
2301                    RegionRoute {
2302                        region: Region::new_test(RegionId::new(table_id, 2)),
2303                        leader_peer: Some(Peer::empty(2)),
2304                        follower_peers: vec![Peer::empty(4)],
2305                        leader_state: None,
2306                        leader_down_since: None,
2307                    },
2308                    RegionRoute {
2309                        region: Region::new_test(RegionId::new(table_id, 3)),
2310                        leader_peer: Some(Peer::empty(3)),
2311                        follower_peers: vec![],
2312                        leader_state: None,
2313                        leader_down_since: None,
2314                    },
2315                ]),
2316                serialized_options,
2317            )
2318            .await
2319            .unwrap();
2320        let expected_result = mem_kv.dump();
2321        let table_route_value = table_metadata_manager
2322            .table_route_manager
2323            .table_route_storage()
2324            .get_with_raw_bytes(table_id)
2325            .await
2326            .unwrap()
2327            .unwrap();
2328        let region_routes = table_route_value.region_routes().unwrap();
2329        let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2330        let table_route_value = TableRouteValue::physical(region_routes.clone());
2331        table_metadata_manager
2332            .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2333            .await
2334            .unwrap();
2335        table_metadata_manager
2336            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2337            .await
2338            .unwrap();
2339        let kvs = mem_kv.dump();
2340        assert_eq!(kvs, expected_result);
2341        // Should be ignored.
2342        table_metadata_manager
2343            .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2344            .await
2345            .unwrap();
2346        let kvs = mem_kv.dump();
2347        assert_eq!(kvs, expected_result);
2348    }
2349
2350    #[tokio::test]
2351    async fn test_create_update_view_info() {
2352        let mem_kv = Arc::new(MemoryKvBackend::default());
2353        let table_metadata_manager = TableMetadataManager::new(mem_kv);
2354
2355        let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2356
2357        let view_id = view_info.ident.table_id;
2358
2359        let logical_plan: Vec<u8> = vec![1, 2, 3];
2360        let columns = vec!["a".to_string()];
2361        let plan_columns = vec!["number".to_string()];
2362        let table_names = new_test_table_names();
2363        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2364
2365        // Create metadata
2366        table_metadata_manager
2367            .create_view_metadata(
2368                view_info.clone(),
2369                logical_plan.clone(),
2370                table_names.clone(),
2371                columns.clone(),
2372                plan_columns.clone(),
2373                definition.to_string(),
2374            )
2375            .await
2376            .unwrap();
2377
2378        {
2379            // assert view info
2380            let current_view_info = table_metadata_manager
2381                .view_info_manager()
2382                .get(view_id)
2383                .await
2384                .unwrap()
2385                .unwrap()
2386                .into_inner();
2387            assert_eq!(current_view_info.view_info, logical_plan);
2388            assert_eq!(current_view_info.table_names, table_names);
2389            assert_eq!(current_view_info.definition, definition);
2390            assert_eq!(current_view_info.columns, columns);
2391            assert_eq!(current_view_info.plan_columns, plan_columns);
2392            // assert table info
2393            let current_table_info = table_metadata_manager
2394                .table_info_manager()
2395                .get(view_id)
2396                .await
2397                .unwrap()
2398                .unwrap()
2399                .into_inner();
2400            assert_eq!(current_table_info.table_info, view_info);
2401        }
2402
2403        let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2404        let new_table_names = {
2405            let mut set = HashSet::new();
2406            set.insert(TableName {
2407                catalog_name: "greptime".to_string(),
2408                schema_name: "public".to_string(),
2409                table_name: "b_table".to_string(),
2410            });
2411            set.insert(TableName {
2412                catalog_name: "greptime".to_string(),
2413                schema_name: "public".to_string(),
2414                table_name: "c_table".to_string(),
2415            });
2416            set
2417        };
2418        let new_columns = vec!["b".to_string()];
2419        let new_plan_columns = vec!["number2".to_string()];
2420        let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2421
2422        let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2423            logical_plan.clone(),
2424            table_names,
2425            columns,
2426            plan_columns,
2427            definition.to_string(),
2428        ));
2429        // should be ok.
2430        table_metadata_manager
2431            .update_view_info(
2432                view_id,
2433                &current_view_info_value,
2434                new_logical_plan.clone(),
2435                new_table_names.clone(),
2436                new_columns.clone(),
2437                new_plan_columns.clone(),
2438                new_definition.to_string(),
2439            )
2440            .await
2441            .unwrap();
2442        // if table info was updated, it should be ok.
2443        table_metadata_manager
2444            .update_view_info(
2445                view_id,
2446                &current_view_info_value,
2447                new_logical_plan.clone(),
2448                new_table_names.clone(),
2449                new_columns.clone(),
2450                new_plan_columns.clone(),
2451                new_definition.to_string(),
2452            )
2453            .await
2454            .unwrap();
2455
2456        // updated view_info should equal the `new_logical_plan`
2457        let updated_view_info = table_metadata_manager
2458            .view_info_manager()
2459            .get(view_id)
2460            .await
2461            .unwrap()
2462            .unwrap()
2463            .into_inner();
2464        assert_eq!(updated_view_info.view_info, new_logical_plan);
2465        assert_eq!(updated_view_info.table_names, new_table_names);
2466        assert_eq!(updated_view_info.definition, new_definition);
2467        assert_eq!(updated_view_info.columns, new_columns);
2468        assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2469
2470        let wrong_view_info = logical_plan.clone();
2471        let wrong_definition = "wrong_definition";
2472        let wrong_view_info_value =
2473            DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2474                wrong_view_info,
2475                new_table_names.clone(),
2476                new_columns.clone(),
2477                new_plan_columns.clone(),
2478                wrong_definition.to_string(),
2479            ));
2480        // if the current_view_info_value is wrong, it should return an error.
2481        // The ABA problem.
2482        assert!(table_metadata_manager
2483            .update_view_info(
2484                view_id,
2485                &wrong_view_info_value,
2486                new_logical_plan.clone(),
2487                new_table_names.clone(),
2488                vec!["c".to_string()],
2489                vec!["number3".to_string()],
2490                wrong_definition.to_string(),
2491            )
2492            .await
2493            .is_err());
2494
2495        // The view_info is not changed.
2496        let current_view_info = table_metadata_manager
2497            .view_info_manager()
2498            .get(view_id)
2499            .await
2500            .unwrap()
2501            .unwrap()
2502            .into_inner();
2503        assert_eq!(current_view_info.view_info, new_logical_plan);
2504        assert_eq!(current_view_info.table_names, new_table_names);
2505        assert_eq!(current_view_info.definition, new_definition);
2506        assert_eq!(current_view_info.columns, new_columns);
2507        assert_eq!(current_view_info.plan_columns, new_plan_columns);
2508    }
2509
2510    #[test]
2511    fn test_region_role_set_deserialize() {
2512        let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2513        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2514        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2515        assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2516
2517        let s = r#"[1, 2, 3]"#;
2518        let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2519        assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2520        assert!(region_role_set.follower_regions.is_empty());
2521    }
2522
2523    #[test]
2524    fn test_region_distribution_deserialize() {
2525        let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2526        let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2527        assert_eq!(region_distribution.len(), 2);
2528        assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2529        assert!(region_distribution[&1].follower_regions.is_empty());
2530        assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2531        assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2532    }
2533}