1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{RawTableInfo, TableId};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190pub const ELECTION_KEY: &str = "__metasrv_election";
192pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197 TABLE_NAME_KEY_PREFIX,
198 CATALOG_NAME_KEY_PREFIX,
199 SCHEMA_NAME_KEY_PREFIX,
200 TABLE_ROUTE_PREFIX,
201 NODE_ADDRESS_PREFIX,
202];
203
204#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207 pub leader_regions: Vec<RegionNumber>,
209 pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215 where
216 D: serde::Deserializer<'de>,
217 {
218 #[derive(Deserialize)]
219 #[serde(untagged)]
220 enum RegionRoleSetOrLeaderOnly {
221 Full {
222 leader_regions: Vec<RegionNumber>,
223 follower_regions: Vec<RegionNumber>,
224 },
225 LeaderOnly(Vec<RegionNumber>),
226 }
227 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228 RegionRoleSetOrLeaderOnly::Full {
229 leader_regions,
230 follower_regions,
231 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233 Ok(RegionRoleSet::new(leader_regions, vec![]))
234 }
235 }
236 }
237}
238
239impl RegionRoleSet {
240 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242 Self {
243 leader_regions,
244 follower_regions,
245 }
246 }
247
248 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250 self.leader_regions.push(region_number);
251 }
252
253 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255 self.follower_regions.push(region_number);
256 }
257
258 pub fn sort(&mut self) {
260 self.follower_regions.sort();
261 self.leader_regions.sort();
262 }
263}
264
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270pub type FlowId = u32;
272pub type FlowPartitionId = u32;
274
275lazy_static! {
276 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280 static ref TABLE_INFO_KEY_PATTERN: Regex =
281 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285 static ref VIEW_INFO_KEY_PATTERN: Regex =
286 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315 ))
316 .unwrap();
317}
318
319lazy_static! {
320 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323 ))
324 .unwrap();
325}
326
327lazy_static! {
328 static ref NODE_ADDRESS_PATTERN: Regex =
329 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340 ))
341 .unwrap();
342}
343
344pub trait MetadataKey<'a, T> {
346 fn to_bytes(&self) -> Vec<u8>;
347
348 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355 fn from(value: Vec<u8>) -> Self {
356 Self(value)
357 }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361 fn to_bytes(&self) -> Vec<u8> {
362 self.0.clone()
363 }
364
365 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366 Ok(BytesAdapter(bytes.to_vec()))
367 }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371 fn build_get_op(
372 &self,
373 ) -> (
374 TxnOp,
375 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376 );
377}
378
379pub trait MetadataValue {
380 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381 where
382 Self: Sized;
383
384 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390 table_name_manager: TableNameManager,
391 table_info_manager: TableInfoManager,
392 view_info_manager: ViewInfoManager,
393 datanode_table_manager: DatanodeTableManager,
394 catalog_manager: CatalogManager,
395 schema_manager: SchemaManager,
396 table_route_manager: TableRouteManager,
397 table_repart_manager: TableRepartManager,
398 tombstone_manager: TombstoneManager,
399 topic_name_manager: TopicNameManager,
400 topic_region_manager: TopicRegionManager,
401 kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406 ($got:expr, $expected_value:expr, $name:expr) => {
407 ensure!(
408 $got == $expected_value,
409 error::UnexpectedSnafu {
410 err_msg: format!(
411 "Reads the different value: {:?} during {}, expected: {:?}",
412 $got, $name, $expected_value
413 )
414 }
415 );
416 };
417}
418
419pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429 bytes: Bytes,
431 inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436 type Target = T;
437
438 fn deref(&self) -> &Self::Target {
439 &self.inner
440 }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444 fn deref_mut(&mut self) -> &mut Self::Target {
445 &mut self.inner
446 }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(
452 f,
453 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454 self.inner, self.bytes
455 )
456 }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464 where
465 S: serde::Serializer,
466 {
467 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470 }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474 for DeserializedValueWithBytes<T>
475{
476 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480 where
481 D: serde::Deserializer<'de>,
482 {
483 let buf = String::deserialize(deserializer)?;
484 let bytes = Bytes::from(buf);
485
486 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489 Ok(value)
490 }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494 fn clone(&self) -> Self {
495 Self {
496 bytes: self.bytes.clone(),
497 inner: self.inner.clone(),
498 }
499 }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506 let inner = T::try_from_raw_value(&bytes)?;
507 Ok(Self { bytes, inner })
508 }
509
510 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514 }
515
516 pub fn into_inner(self) -> T {
517 self.inner
518 }
519
520 pub fn get_inner_ref(&self) -> &T {
521 &self.inner
522 }
523
524 pub fn get_raw_bytes(&self) -> Vec<u8> {
526 self.bytes.to_vec()
527 }
528
529 #[cfg(any(test, feature = "testing"))]
530 pub fn from_inner(inner: T) -> Self {
531 let bytes = serde_json::to_vec(&inner).unwrap();
532
533 Self {
534 bytes: Bytes::from(bytes),
535 inner,
536 }
537 }
538}
539
540impl TableMetadataManager {
541 pub fn new(kv_backend: KvBackendRef) -> Self {
542 TableMetadataManager {
543 table_name_manager: TableNameManager::new(kv_backend.clone()),
544 table_info_manager: TableInfoManager::new(kv_backend.clone()),
545 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547 catalog_manager: CatalogManager::new(kv_backend.clone()),
548 schema_manager: SchemaManager::new(kv_backend.clone()),
549 table_route_manager: TableRouteManager::new(kv_backend.clone()),
550 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554 kv_backend,
555 }
556 }
557
558 pub fn new_with_custom_tombstone_prefix(
560 kv_backend: KvBackendRef,
561 tombstone_prefix: &str,
562 ) -> Self {
563 Self {
564 table_name_manager: TableNameManager::new(kv_backend.clone()),
565 table_info_manager: TableInfoManager::new(kv_backend.clone()),
566 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568 catalog_manager: CatalogManager::new(kv_backend.clone()),
569 schema_manager: SchemaManager::new(kv_backend.clone()),
570 table_route_manager: TableRouteManager::new(kv_backend.clone()),
571 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572 tombstone_manager: TombstoneManager::new_with_prefix(
573 kv_backend.clone(),
574 tombstone_prefix,
575 ),
576 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578 kv_backend,
579 }
580 }
581
582 pub async fn init(&self) -> Result<()> {
583 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585 self.catalog_manager().create(catalog_name, true).await?;
586
587 let internal_schemas = [
588 DEFAULT_SCHEMA_NAME,
589 INFORMATION_SCHEMA_NAME,
590 DEFAULT_PRIVATE_SCHEMA_NAME,
591 ];
592
593 for schema_name in internal_schemas {
594 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596 self.schema_manager().create(schema_key, None, true).await?;
597 }
598
599 Ok(())
600 }
601
602 pub fn table_name_manager(&self) -> &TableNameManager {
603 &self.table_name_manager
604 }
605
606 pub fn table_info_manager(&self) -> &TableInfoManager {
607 &self.table_info_manager
608 }
609
610 pub fn view_info_manager(&self) -> &ViewInfoManager {
611 &self.view_info_manager
612 }
613
614 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615 &self.datanode_table_manager
616 }
617
618 pub fn catalog_manager(&self) -> &CatalogManager {
619 &self.catalog_manager
620 }
621
622 pub fn schema_manager(&self) -> &SchemaManager {
623 &self.schema_manager
624 }
625
626 pub fn table_route_manager(&self) -> &TableRouteManager {
627 &self.table_route_manager
628 }
629
630 pub fn table_repart_manager(&self) -> &TableRepartManager {
631 &self.table_repart_manager
632 }
633
634 pub fn topic_name_manager(&self) -> &TopicNameManager {
635 &self.topic_name_manager
636 }
637
638 pub fn topic_region_manager(&self) -> &TopicRegionManager {
639 &self.topic_region_manager
640 }
641
642 pub fn kv_backend(&self) -> &KvBackendRef {
643 &self.kv_backend
644 }
645
646 pub async fn get_full_table_info(
647 &self,
648 table_id: TableId,
649 ) -> Result<(
650 Option<DeserializedValueWithBytes<TableInfoValue>>,
651 Option<DeserializedValueWithBytes<TableRouteValue>>,
652 )> {
653 let table_info_key = TableInfoKey::new(table_id);
654 let table_route_key = TableRouteKey::new(table_id);
655 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659 let mut res = self.kv_backend.txn(txn).await?;
660 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663 Ok((table_info_value, table_route_value))
664 }
665
666 pub async fn create_view_metadata(
676 &self,
677 view_info: RawTableInfo,
678 raw_logical_plan: Vec<u8>,
679 table_names: HashSet<TableName>,
680 columns: Vec<String>,
681 plan_columns: Vec<String>,
682 definition: String,
683 ) -> Result<()> {
684 let view_id = view_info.ident.table_id;
685
686 let view_name = TableNameKey::new(
688 &view_info.catalog_name,
689 &view_info.schema_name,
690 &view_info.name,
691 );
692 let create_table_name_txn = self
693 .table_name_manager()
694 .build_create_txn(&view_name, view_id)?;
695
696 let table_info_value = TableInfoValue::new(view_info);
698
699 let (create_table_info_txn, on_create_table_info_failure) = self
700 .table_info_manager()
701 .build_create_txn(view_id, &table_info_value)?;
702
703 let view_info_value = ViewInfoValue::new(
705 raw_logical_plan,
706 table_names,
707 columns,
708 plan_columns,
709 definition,
710 );
711 let (create_view_info_txn, on_create_view_info_failure) = self
712 .view_info_manager()
713 .build_create_txn(view_id, &view_info_value)?;
714
715 let txn = Txn::merge_all(vec![
716 create_table_name_txn,
717 create_table_info_txn,
718 create_view_info_txn,
719 ]);
720
721 let mut r = self.kv_backend.txn(txn).await?;
722
723 if !r.succeeded {
725 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726 let remote_table_info = on_create_table_info_failure(&mut set)?
727 .context(error::UnexpectedSnafu {
728 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729 })?
730 .into_inner();
731
732 let remote_view_info = on_create_view_info_failure(&mut set)?
733 .context(error::UnexpectedSnafu {
734 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735 })?
736 .into_inner();
737
738 let op_name = "the creating view metadata";
739 ensure_values!(remote_table_info, table_info_value, op_name);
740 ensure_values!(remote_view_info, view_info_value, op_name);
741 }
742
743 Ok(())
744 }
745
746 pub async fn create_table_metadata(
749 &self,
750 table_info: RawTableInfo,
751 table_route_value: TableRouteValue,
752 region_wal_options: HashMap<RegionNumber, String>,
753 ) -> Result<()> {
754 let table_id = table_info.ident.table_id;
755 let engine = table_info.meta.engine.clone();
756
757 let table_name = TableNameKey::new(
759 &table_info.catalog_name,
760 &table_info.schema_name,
761 &table_info.name,
762 );
763 let create_table_name_txn = self
764 .table_name_manager()
765 .build_create_txn(&table_name, table_id)?;
766
767 let region_options = table_info.to_region_options();
768 let table_info_value = TableInfoValue::new(table_info);
770 let (create_table_info_txn, on_create_table_info_failure) = self
771 .table_info_manager()
772 .build_create_txn(table_id, &table_info_value)?;
773
774 let (create_table_route_txn, on_create_table_route_failure) = self
775 .table_route_manager()
776 .table_route_storage()
777 .build_create_txn(table_id, &table_route_value)?;
778
779 let create_topic_region_txn = self
780 .topic_region_manager
781 .build_create_txn(table_id, ®ion_wal_options)?;
782
783 let mut txn = Txn::merge_all(vec![
784 create_table_name_txn,
785 create_table_info_txn,
786 create_table_route_txn,
787 create_topic_region_txn,
788 ]);
789
790 if let TableRouteValue::Physical(x) = &table_route_value {
791 let region_storage_path = table_info_value.region_storage_path();
792 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793 table_id,
794 &engine,
795 ®ion_storage_path,
796 region_options,
797 region_wal_options,
798 region_distribution(&x.region_routes),
799 )?;
800 txn = txn.merge(create_datanode_table_txn);
801 }
802
803 let mut r = self.kv_backend.txn(txn).await?;
804
805 if !r.succeeded {
807 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808 let remote_table_info = on_create_table_info_failure(&mut set)?
809 .context(error::UnexpectedSnafu {
810 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811 })?
812 .into_inner();
813
814 let remote_table_route = on_create_table_route_failure(&mut set)?
815 .context(error::UnexpectedSnafu {
816 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817 })?
818 .into_inner();
819
820 let op_name = "the creating table metadata";
821 ensure_values!(remote_table_info, table_info_value, op_name);
822 ensure_values!(remote_table_route, table_route_value, op_name);
823 }
824
825 Ok(())
826 }
827
828 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829 self.kv_backend.max_txn_ops() / 3
832 }
833
834 pub async fn create_logical_tables_metadata(
836 &self,
837 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
838 ) -> Result<()> {
839 let len = tables_data.len();
840 let mut txns = Vec::with_capacity(3 * len);
841 struct OnFailure<F1, R1, F2, R2>
842 where
843 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845 {
846 table_info_value: TableInfoValue,
847 on_create_table_info_failure: F1,
848 table_route_value: TableRouteValue,
849 on_create_table_route_failure: F2,
850 }
851 let mut on_failures = Vec::with_capacity(len);
852 for (table_info, table_route_value) in tables_data {
853 let table_id = table_info.ident.table_id;
854
855 let table_name = TableNameKey::new(
857 &table_info.catalog_name,
858 &table_info.schema_name,
859 &table_info.name,
860 );
861 let create_table_name_txn = self
862 .table_name_manager()
863 .build_create_txn(&table_name, table_id)?;
864 txns.push(create_table_name_txn);
865
866 let table_info_value = TableInfoValue::new(table_info);
868 let (create_table_info_txn, on_create_table_info_failure) =
869 self.table_info_manager()
870 .build_create_txn(table_id, &table_info_value)?;
871 txns.push(create_table_info_txn);
872
873 let (create_table_route_txn, on_create_table_route_failure) = self
874 .table_route_manager()
875 .table_route_storage()
876 .build_create_txn(table_id, &table_route_value)?;
877 txns.push(create_table_route_txn);
878
879 on_failures.push(OnFailure {
880 table_info_value,
881 on_create_table_info_failure,
882 table_route_value,
883 on_create_table_route_failure,
884 });
885 }
886
887 let txn = Txn::merge_all(txns);
888 let mut r = self.kv_backend.txn(txn).await?;
889
890 if !r.succeeded {
892 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893 for on_failure in on_failures {
894 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895 .context(error::UnexpectedSnafu {
896 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897 })?
898 .into_inner();
899
900 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901 .context(error::UnexpectedSnafu {
902 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903 })?
904 .into_inner();
905
906 let op_name = "the creating logical tables metadata";
907 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909 }
910 }
911
912 Ok(())
913 }
914
915 fn table_metadata_keys(
916 &self,
917 table_id: TableId,
918 table_name: &TableName,
919 table_route_value: &TableRouteValue,
920 region_wal_options: &HashMap<RegionNumber, WalOptions>,
921 ) -> Result<Vec<Vec<u8>>> {
922 let datanode_ids = if table_route_value.is_physical() {
924 region_distribution(table_route_value.region_routes()?)
925 .into_keys()
926 .collect()
927 } else {
928 vec![]
929 };
930 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931 let table_name = TableNameKey::new(
932 &table_name.catalog_name,
933 &table_name.schema_name,
934 &table_name.table_name,
935 );
936 let table_info_key = TableInfoKey::new(table_id);
937 let table_route_key = TableRouteKey::new(table_id);
938 let table_repart_key = TableRepartKey::new(table_id);
939 let datanode_table_keys = datanode_ids
940 .into_iter()
941 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942 .collect::<HashSet<_>>();
943 let topic_region_map = self
944 .topic_region_manager
945 .get_topic_region_mapping(table_id, region_wal_options);
946 let topic_region_keys = topic_region_map
947 .iter()
948 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949 .collect::<Vec<_>>();
950 keys.push(table_name.to_bytes());
951 keys.push(table_info_key.to_bytes());
952 keys.push(table_route_key.to_bytes());
953 keys.push(table_repart_key.to_bytes());
954 for key in &datanode_table_keys {
955 keys.push(key.to_bytes());
956 }
957 for key in topic_region_keys {
958 keys.push(key.to_bytes());
959 }
960 Ok(keys)
961 }
962
963 pub async fn delete_table_metadata(
966 &self,
967 table_id: TableId,
968 table_name: &TableName,
969 table_route_value: &TableRouteValue,
970 region_wal_options: &HashMap<RegionNumber, WalOptions>,
971 ) -> Result<()> {
972 let keys =
973 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974 self.tombstone_manager.create(keys).await.map(|_| ())
975 }
976
977 pub async fn delete_table_metadata_tombstone(
980 &self,
981 table_id: TableId,
982 table_name: &TableName,
983 table_route_value: &TableRouteValue,
984 region_wal_options: &HashMap<RegionNumber, WalOptions>,
985 ) -> Result<()> {
986 let table_metadata_keys =
987 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988 self.tombstone_manager
989 .delete(table_metadata_keys)
990 .await
991 .map(|_| ())
992 }
993
994 pub async fn restore_table_metadata(
997 &self,
998 table_id: TableId,
999 table_name: &TableName,
1000 table_route_value: &TableRouteValue,
1001 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002 ) -> Result<()> {
1003 let keys =
1004 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005 self.tombstone_manager.restore(keys).await.map(|_| ())
1006 }
1007
1008 pub async fn destroy_table_metadata(
1011 &self,
1012 table_id: TableId,
1013 table_name: &TableName,
1014 table_route_value: &TableRouteValue,
1015 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016 ) -> Result<()> {
1017 let keys =
1018 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019 let _ = self
1020 .kv_backend
1021 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022 .await?;
1023 Ok(())
1024 }
1025
1026 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027 let mut keys = Vec::with_capacity(3);
1028 let view_name = TableNameKey::new(
1029 &view_name.catalog_name,
1030 &view_name.schema_name,
1031 &view_name.table_name,
1032 );
1033 let table_info_key = TableInfoKey::new(view_id);
1034 let view_info_key = ViewInfoKey::new(view_id);
1035 keys.push(view_name.to_bytes());
1036 keys.push(table_info_key.to_bytes());
1037 keys.push(view_info_key.to_bytes());
1038
1039 Ok(keys)
1040 }
1041
1042 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045 let keys = self.view_info_keys(view_id, view_name)?;
1046 let _ = self
1047 .kv_backend
1048 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049 .await?;
1050 Ok(())
1051 }
1052
1053 pub async fn rename_table(
1057 &self,
1058 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059 new_table_name: String,
1060 ) -> Result<()> {
1061 let current_table_info = ¤t_table_info_value.table_info;
1062 let table_id = current_table_info.ident.table_id;
1063
1064 let table_name_key = TableNameKey::new(
1065 ¤t_table_info.catalog_name,
1066 ¤t_table_info.schema_name,
1067 ¤t_table_info.name,
1068 );
1069
1070 let new_table_name_key = TableNameKey::new(
1071 ¤t_table_info.catalog_name,
1072 ¤t_table_info.schema_name,
1073 &new_table_name,
1074 );
1075
1076 let update_table_name_txn = self.table_name_manager().build_update_txn(
1078 &table_name_key,
1079 &new_table_name_key,
1080 table_id,
1081 )?;
1082
1083 let new_table_info_value = current_table_info_value
1084 .inner
1085 .with_update(move |table_info| {
1086 table_info.name = new_table_name;
1087 });
1088
1089 let (update_table_info_txn, on_update_table_info_failure) = self
1091 .table_info_manager()
1092 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096 let mut r = self.kv_backend.txn(txn).await?;
1097
1098 if !r.succeeded {
1100 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101 let remote_table_info = on_update_table_info_failure(&mut set)?
1102 .context(error::UnexpectedSnafu {
1103 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104 })?
1105 .into_inner();
1106
1107 let op_name = "the renaming table metadata";
1108 ensure_values!(remote_table_info, new_table_info_value, op_name);
1109 }
1110
1111 Ok(())
1112 }
1113
1114 pub async fn update_table_info(
1118 &self,
1119 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120 region_distribution: Option<RegionDistribution>,
1121 new_table_info: RawTableInfo,
1122 ) -> Result<()> {
1123 let table_id = current_table_info_value.table_info.ident.table_id;
1124 let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126 let (update_table_info_txn, on_update_table_info_failure) = self
1128 .table_info_manager()
1129 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131 let txn = if let Some(region_distribution) = region_distribution {
1132 let new_region_options = new_table_info_value.table_info.to_region_options();
1134 let update_datanode_table_options_txn = self
1135 .datanode_table_manager
1136 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137 .await?;
1138 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139 } else {
1140 update_table_info_txn
1141 };
1142
1143 let mut r = self.kv_backend.txn(txn).await?;
1144 if !r.succeeded {
1146 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147 let remote_table_info = on_update_table_info_failure(&mut set)?
1148 .context(error::UnexpectedSnafu {
1149 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150 })?
1151 .into_inner();
1152
1153 let op_name = "the updating table info";
1154 ensure_values!(remote_table_info, new_table_info_value, op_name);
1155 }
1156 Ok(())
1157 }
1158
1159 #[allow(clippy::too_many_arguments)]
1170 pub async fn update_view_info(
1171 &self,
1172 view_id: TableId,
1173 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174 new_view_info: Vec<u8>,
1175 table_names: HashSet<TableName>,
1176 columns: Vec<String>,
1177 plan_columns: Vec<String>,
1178 definition: String,
1179 ) -> Result<()> {
1180 let new_view_info_value = current_view_info_value.update(
1181 new_view_info,
1182 table_names,
1183 columns,
1184 plan_columns,
1185 definition,
1186 );
1187
1188 let (update_view_info_txn, on_update_view_info_failure) = self
1190 .view_info_manager()
1191 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195 if !r.succeeded {
1197 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198 let remote_view_info = on_update_view_info_failure(&mut set)?
1199 .context(error::UnexpectedSnafu {
1200 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201 })?
1202 .into_inner();
1203
1204 let op_name = "the updating view info";
1205 ensure_values!(remote_view_info, new_view_info_value, op_name);
1206 }
1207 Ok(())
1208 }
1209
1210 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211 self.kv_backend.max_txn_ops()
1212 }
1213
1214 pub async fn batch_update_table_info_values(
1215 &self,
1216 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1217 ) -> Result<()> {
1218 let len = table_info_value_pairs.len();
1219 let mut txns = Vec::with_capacity(len);
1220 struct OnFailure<F, R>
1221 where
1222 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223 {
1224 table_info_value: TableInfoValue,
1225 on_update_table_info_failure: F,
1226 }
1227 let mut on_failures = Vec::with_capacity(len);
1228
1229 for (table_info_value, new_table_info) in table_info_value_pairs {
1230 let table_id = table_info_value.table_info.ident.table_id;
1231
1232 let new_table_info_value = table_info_value.update(new_table_info);
1233
1234 let (update_table_info_txn, on_update_table_info_failure) =
1235 self.table_info_manager().build_update_txn(
1236 table_id,
1237 &table_info_value,
1238 &new_table_info_value,
1239 )?;
1240
1241 txns.push(update_table_info_txn);
1242
1243 on_failures.push(OnFailure {
1244 table_info_value: new_table_info_value,
1245 on_update_table_info_failure,
1246 });
1247 }
1248
1249 let txn = Txn::merge_all(txns);
1250 let mut r = self.kv_backend.txn(txn).await?;
1251
1252 if !r.succeeded {
1253 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254 for on_failure in on_failures {
1255 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256 .context(error::UnexpectedSnafu {
1257 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258 })?
1259 .into_inner();
1260
1261 let op_name = "the batch updating table info";
1262 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268
1269 pub async fn update_table_route(
1270 &self,
1271 table_id: TableId,
1272 region_info: RegionInfo,
1273 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274 new_region_routes: Vec<RegionRoute>,
1275 new_region_options: &HashMap<String, String>,
1276 new_region_wal_options: &HashMap<RegionNumber, String>,
1277 ) -> Result<()> {
1278 let current_region_distribution =
1280 region_distribution(current_table_route_value.region_routes()?);
1281 let new_region_distribution = region_distribution(&new_region_routes);
1282
1283 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1284 table_id,
1285 region_info,
1286 current_region_distribution,
1287 new_region_distribution,
1288 new_region_options,
1289 new_region_wal_options,
1290 )?;
1291
1292 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1294
1295 let (update_table_route_txn, on_update_table_route_failure) = self
1296 .table_route_manager()
1297 .table_route_storage()
1298 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1299
1300 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1301
1302 let mut r = self.kv_backend.txn(txn).await?;
1303
1304 if !r.succeeded {
1306 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1307 let remote_table_route = on_update_table_route_failure(&mut set)?
1308 .context(error::UnexpectedSnafu {
1309 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1310 })?
1311 .into_inner();
1312
1313 let op_name = "the updating table route";
1314 ensure_values!(remote_table_route, new_table_route_value, op_name);
1315 }
1316
1317 Ok(())
1318 }
1319
1320 pub async fn update_leader_region_status<F>(
1322 &self,
1323 table_id: TableId,
1324 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1325 next_region_route_status: F,
1326 ) -> Result<()>
1327 where
1328 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1329 {
1330 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1331
1332 let mut updated = 0;
1333 for route in &mut new_region_routes {
1334 if let Some(state) = next_region_route_status(route)
1335 && route.set_leader_state(state)
1336 {
1337 updated += 1;
1338 }
1339 }
1340
1341 if updated == 0 {
1342 warn!("No leader status updated");
1343 return Ok(());
1344 }
1345
1346 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1348
1349 let (update_table_route_txn, on_update_table_route_failure) = self
1350 .table_route_manager()
1351 .table_route_storage()
1352 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1353
1354 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1355
1356 if !r.succeeded {
1358 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1359 let remote_table_route = on_update_table_route_failure(&mut set)?
1360 .context(error::UnexpectedSnafu {
1361 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1362 })?
1363 .into_inner();
1364
1365 let op_name = "the updating leader region status";
1366 ensure_values!(remote_table_route, new_table_route_value, op_name);
1367 }
1368
1369 Ok(())
1370 }
1371}
1372
1373#[macro_export]
1374macro_rules! impl_metadata_value {
1375 ($($val_ty: ty), *) => {
1376 $(
1377 impl $crate::key::MetadataValue for $val_ty {
1378 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1379 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1380 }
1381
1382 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1383 serde_json::to_vec(self).context(SerdeJsonSnafu)
1384 }
1385 }
1386 )*
1387 }
1388}
1389
1390macro_rules! impl_metadata_key_get_txn_op {
1391 ($($key: ty), *) => {
1392 $(
1393 impl $crate::key::MetadataKeyGetTxnOp for $key {
1394 fn build_get_op(
1397 &self,
1398 ) -> (
1399 TxnOp,
1400 impl for<'a> FnMut(
1401 &'a mut TxnOpGetResponseSet,
1402 ) -> Option<Vec<u8>>,
1403 ) {
1404 let raw_key = self.to_bytes();
1405 (
1406 TxnOp::Get(raw_key.clone()),
1407 TxnOpGetResponseSet::filter(raw_key),
1408 )
1409 }
1410 }
1411 )*
1412 }
1413}
1414
1415impl_metadata_key_get_txn_op! {
1416 TableNameKey<'_>,
1417 TableInfoKey,
1418 ViewInfoKey,
1419 TableRouteKey,
1420 DatanodeTableKey
1421}
1422
1423#[macro_export]
1424macro_rules! impl_optional_metadata_value {
1425 ($($val_ty: ty), *) => {
1426 $(
1427 impl $val_ty {
1428 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1429 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1430 }
1431
1432 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1433 serde_json::to_vec(self).context(SerdeJsonSnafu)
1434 }
1435 }
1436 )*
1437 }
1438}
1439
1440impl_metadata_value! {
1441 TableNameValue,
1442 TableInfoValue,
1443 ViewInfoValue,
1444 DatanodeTableValue,
1445 FlowInfoValue,
1446 FlowNameValue,
1447 FlowRouteValue,
1448 TableFlowValue,
1449 NodeAddressValue,
1450 SchemaNameValue,
1451 FlowStateValue,
1452 PoisonValue,
1453 TopicRegionValue
1454}
1455
1456impl_optional_metadata_value! {
1457 CatalogNameValue,
1458 SchemaNameValue
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use std::collections::{BTreeMap, HashMap, HashSet};
1464 use std::sync::Arc;
1465
1466 use bytes::Bytes;
1467 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1468 use common_time::util::current_time_millis;
1469 use common_wal::options::{KafkaWalOptions, WalOptions};
1470 use futures::TryStreamExt;
1471 use store_api::storage::{RegionId, RegionNumber};
1472 use table::metadata::{RawTableInfo, TableInfo};
1473 use table::table_name::TableName;
1474
1475 use super::datanode_table::DatanodeTableKey;
1476 use super::test_utils;
1477 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1478 use crate::ddl::test_util::create_table::test_create_table_task;
1479 use crate::ddl::utils::region_storage_path;
1480 use crate::error::Result;
1481 use crate::key::datanode_table::RegionInfo;
1482 use crate::key::table_info::TableInfoValue;
1483 use crate::key::table_name::TableNameKey;
1484 use crate::key::table_route::TableRouteValue;
1485 use crate::key::{
1486 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1487 TableMetadataManager, ViewInfoValue,
1488 };
1489 use crate::kv_backend::KvBackend;
1490 use crate::kv_backend::memory::MemoryKvBackend;
1491 use crate::peer::Peer;
1492 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1493 use crate::rpc::store::RangeRequest;
1494 use crate::wal_provider::WalProvider;
1495
1496 #[test]
1497 fn test_deserialized_value_with_bytes() {
1498 let region_route = new_test_region_route();
1499 let region_routes = vec![region_route.clone()];
1500
1501 let expected_region_routes =
1502 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1503 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1504
1505 let value = DeserializedValueWithBytes {
1508 inner: TableRouteValue::physical(region_routes.clone()),
1510 bytes: Bytes::from(expected.clone()),
1511 };
1512
1513 let encoded = serde_json::to_vec(&value).unwrap();
1514
1515 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1518 serde_json::from_slice(&encoded).unwrap();
1519
1520 assert_eq!(decoded.inner, expected_region_routes);
1521 assert_eq!(decoded.bytes, expected);
1522 }
1523
1524 fn new_test_region_route() -> RegionRoute {
1525 new_region_route(1, 2)
1526 }
1527
1528 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1529 RegionRoute {
1530 region: Region {
1531 id: region_id.into(),
1532 name: "r1".to_string(),
1533 partition: None,
1534 attrs: BTreeMap::new(),
1535 partition_expr: Default::default(),
1536 },
1537 leader_peer: Some(Peer::new(datanode, "a2")),
1538 follower_peers: vec![],
1539 leader_state: None,
1540 leader_down_since: None,
1541 }
1542 }
1543
1544 fn new_test_table_info() -> TableInfo {
1545 test_utils::new_test_table_info(10)
1546 }
1547
1548 fn new_test_table_names() -> HashSet<TableName> {
1549 let mut set = HashSet::new();
1550 set.insert(TableName {
1551 catalog_name: "greptime".to_string(),
1552 schema_name: "public".to_string(),
1553 table_name: "a_table".to_string(),
1554 });
1555 set.insert(TableName {
1556 catalog_name: "greptime".to_string(),
1557 schema_name: "public".to_string(),
1558 table_name: "b_table".to_string(),
1559 });
1560 set
1561 }
1562
1563 async fn create_physical_table_metadata(
1564 table_metadata_manager: &TableMetadataManager,
1565 table_info: RawTableInfo,
1566 region_routes: Vec<RegionRoute>,
1567 region_wal_options: HashMap<RegionNumber, String>,
1568 ) -> Result<()> {
1569 table_metadata_manager
1570 .create_table_metadata(
1571 table_info,
1572 TableRouteValue::physical(region_routes),
1573 region_wal_options,
1574 )
1575 .await
1576 }
1577
1578 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1579 let topics = (0..2)
1580 .map(|i| format!("greptimedb_topic{}", i))
1581 .collect::<Vec<_>>();
1582 let wal_options = topics
1583 .iter()
1584 .map(|topic| {
1585 WalOptions::Kafka(KafkaWalOptions {
1586 topic: topic.clone(),
1587 })
1588 })
1589 .collect::<Vec<_>>();
1590
1591 (0..16)
1592 .enumerate()
1593 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1594 .collect()
1595 }
1596
1597 #[tokio::test]
1598 async fn test_raft_engine_topic_region_map() {
1599 let mem_kv = Arc::new(MemoryKvBackend::default());
1600 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1601 let region_route = new_test_region_route();
1602 let region_routes = &vec![region_route.clone()];
1603 let table_info: RawTableInfo = new_test_table_info().into();
1604 let wal_provider = WalProvider::RaftEngine;
1605 let regions: Vec<_> = (0..16).collect();
1606 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1607 create_physical_table_metadata(
1608 &table_metadata_manager,
1609 table_info.clone(),
1610 region_routes.clone(),
1611 region_wal_options.clone(),
1612 )
1613 .await
1614 .unwrap();
1615
1616 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1617 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1618 let resp = mem_kv.range(range_req).await.unwrap();
1619 assert!(resp.kvs.is_empty());
1621 }
1622
1623 #[tokio::test]
1624 async fn test_create_table_metadata() {
1625 let mem_kv = Arc::new(MemoryKvBackend::default());
1626 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1627 let region_route = new_test_region_route();
1628 let region_routes = &vec![region_route.clone()];
1629 let table_info: RawTableInfo = new_test_table_info().into();
1630 let region_wal_options = create_mock_region_wal_options()
1631 .into_iter()
1632 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1633 .collect::<HashMap<_, _>>();
1634
1635 create_physical_table_metadata(
1637 &table_metadata_manager,
1638 table_info.clone(),
1639 region_routes.clone(),
1640 region_wal_options.clone(),
1641 )
1642 .await
1643 .unwrap();
1644
1645 assert!(
1647 create_physical_table_metadata(
1648 &table_metadata_manager,
1649 table_info.clone(),
1650 region_routes.clone(),
1651 region_wal_options.clone(),
1652 )
1653 .await
1654 .is_ok()
1655 );
1656
1657 let mut modified_region_routes = region_routes.clone();
1658 modified_region_routes.push(region_route.clone());
1659 assert!(
1661 create_physical_table_metadata(
1662 &table_metadata_manager,
1663 table_info.clone(),
1664 modified_region_routes,
1665 region_wal_options.clone(),
1666 )
1667 .await
1668 .is_err()
1669 );
1670
1671 let (remote_table_info, remote_table_route) = table_metadata_manager
1672 .get_full_table_info(10)
1673 .await
1674 .unwrap();
1675
1676 assert_eq!(
1677 remote_table_info.unwrap().into_inner().table_info,
1678 table_info
1679 );
1680 assert_eq!(
1681 remote_table_route
1682 .unwrap()
1683 .into_inner()
1684 .region_routes()
1685 .unwrap(),
1686 region_routes
1687 );
1688
1689 for i in 0..2 {
1690 let region_number = i as u32;
1691 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1692 let topic = format!("greptimedb_topic{}", i);
1693 let regions = table_metadata_manager
1694 .topic_region_manager
1695 .regions(&topic)
1696 .await
1697 .unwrap()
1698 .into_keys()
1699 .collect::<Vec<_>>();
1700 assert_eq!(regions.len(), 8);
1701 assert!(regions.contains(®ion_id));
1702 }
1703 }
1704
1705 #[tokio::test]
1706 async fn test_create_logic_tables_metadata() {
1707 let mem_kv = Arc::new(MemoryKvBackend::default());
1708 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1709 let region_route = new_test_region_route();
1710 let region_routes = vec![region_route.clone()];
1711 let table_info: RawTableInfo = new_test_table_info().into();
1712 let table_id = table_info.ident.table_id;
1713 let table_route_value = TableRouteValue::physical(region_routes.clone());
1714
1715 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1716 table_metadata_manager
1718 .create_logical_tables_metadata(tables_data.clone())
1719 .await
1720 .unwrap();
1721
1722 assert!(
1724 table_metadata_manager
1725 .create_logical_tables_metadata(tables_data)
1726 .await
1727 .is_ok()
1728 );
1729
1730 let mut modified_region_routes = region_routes.clone();
1731 modified_region_routes.push(new_region_route(2, 3));
1732 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1733 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1734 assert!(
1736 table_metadata_manager
1737 .create_logical_tables_metadata(modified_tables_data)
1738 .await
1739 .is_err()
1740 );
1741
1742 let (remote_table_info, remote_table_route) = table_metadata_manager
1743 .get_full_table_info(table_id)
1744 .await
1745 .unwrap();
1746
1747 assert_eq!(
1748 remote_table_info.unwrap().into_inner().table_info,
1749 table_info
1750 );
1751 assert_eq!(
1752 remote_table_route
1753 .unwrap()
1754 .into_inner()
1755 .region_routes()
1756 .unwrap(),
1757 ®ion_routes
1758 );
1759 }
1760
1761 #[tokio::test]
1762 async fn test_create_many_logical_tables_metadata() {
1763 let kv_backend = Arc::new(MemoryKvBackend::default());
1764 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1765
1766 let mut tables_data = vec![];
1767 for i in 0..128 {
1768 let table_id = i + 1;
1769 let regin_number = table_id * 3;
1770 let region_id = RegionId::new(table_id, regin_number);
1771 let region_route = new_region_route(region_id.as_u64(), 2);
1772 let region_routes = vec![region_route.clone()];
1773 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1774 table_id,
1775 &format!("my_table_{}", table_id),
1776 )
1777 .into();
1778 let table_route_value = TableRouteValue::physical(region_routes.clone());
1779
1780 tables_data.push((table_info, table_route_value));
1781 }
1782
1783 table_metadata_manager
1785 .create_logical_tables_metadata(tables_data)
1786 .await
1787 .unwrap();
1788 }
1789
1790 #[tokio::test]
1791 async fn test_delete_table_metadata() {
1792 let mem_kv = Arc::new(MemoryKvBackend::default());
1793 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1794 let region_route = new_test_region_route();
1795 let region_routes = &vec![region_route.clone()];
1796 let table_info: RawTableInfo = new_test_table_info().into();
1797 let table_id = table_info.ident.table_id;
1798 let datanode_id = 2;
1799 let region_wal_options = create_mock_region_wal_options();
1800 let serialized_region_wal_options = region_wal_options
1801 .iter()
1802 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1803 .collect::<HashMap<_, _>>();
1804
1805 create_physical_table_metadata(
1807 &table_metadata_manager,
1808 table_info.clone(),
1809 region_routes.clone(),
1810 serialized_region_wal_options,
1811 )
1812 .await
1813 .unwrap();
1814
1815 let table_name = TableName::new(
1816 table_info.catalog_name,
1817 table_info.schema_name,
1818 table_info.name,
1819 );
1820 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1821 table_metadata_manager
1823 .delete_table_metadata(
1824 table_id,
1825 &table_name,
1826 table_route_value,
1827 ®ion_wal_options,
1828 )
1829 .await
1830 .unwrap();
1831 table_metadata_manager
1833 .delete_table_metadata(
1834 table_id,
1835 &table_name,
1836 table_route_value,
1837 ®ion_wal_options,
1838 )
1839 .await
1840 .unwrap();
1841 assert!(
1842 table_metadata_manager
1843 .table_info_manager()
1844 .get(table_id)
1845 .await
1846 .unwrap()
1847 .is_none()
1848 );
1849 assert!(
1850 table_metadata_manager
1851 .table_route_manager()
1852 .table_route_storage()
1853 .get(table_id)
1854 .await
1855 .unwrap()
1856 .is_none()
1857 );
1858 assert!(
1859 table_metadata_manager
1860 .datanode_table_manager()
1861 .tables(datanode_id)
1862 .try_collect::<Vec<_>>()
1863 .await
1864 .unwrap()
1865 .is_empty()
1866 );
1867 let table_info = table_metadata_manager
1869 .table_info_manager()
1870 .get(table_id)
1871 .await
1872 .unwrap();
1873 assert!(table_info.is_none());
1874 let table_route = table_metadata_manager
1875 .table_route_manager()
1876 .table_route_storage()
1877 .get(table_id)
1878 .await
1879 .unwrap();
1880 assert!(table_route.is_none());
1881 let regions = table_metadata_manager
1883 .topic_region_manager
1884 .regions("greptimedb_topic0")
1885 .await
1886 .unwrap();
1887 assert_eq!(regions.len(), 0);
1888 let regions = table_metadata_manager
1889 .topic_region_manager
1890 .regions("greptimedb_topic1")
1891 .await
1892 .unwrap();
1893 assert_eq!(regions.len(), 0);
1894 }
1895
1896 #[tokio::test]
1897 async fn test_rename_table() {
1898 let mem_kv = Arc::new(MemoryKvBackend::default());
1899 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1900 let region_route = new_test_region_route();
1901 let region_routes = vec![region_route.clone()];
1902 let table_info: RawTableInfo = new_test_table_info().into();
1903 let table_id = table_info.ident.table_id;
1904 create_physical_table_metadata(
1906 &table_metadata_manager,
1907 table_info.clone(),
1908 region_routes.clone(),
1909 HashMap::new(),
1910 )
1911 .await
1912 .unwrap();
1913
1914 let new_table_name = "another_name".to_string();
1915 let table_info_value =
1916 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1917
1918 table_metadata_manager
1919 .rename_table(&table_info_value, new_table_name.clone())
1920 .await
1921 .unwrap();
1922 table_metadata_manager
1924 .rename_table(&table_info_value, new_table_name.clone())
1925 .await
1926 .unwrap();
1927 let mut modified_table_info = table_info.clone();
1928 modified_table_info.name = "hi".to_string();
1929 let modified_table_info_value =
1930 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1931 assert!(
1934 table_metadata_manager
1935 .rename_table(&modified_table_info_value, new_table_name.clone())
1936 .await
1937 .is_err()
1938 );
1939
1940 let old_table_name = TableNameKey::new(
1941 &table_info.catalog_name,
1942 &table_info.schema_name,
1943 &table_info.name,
1944 );
1945 let new_table_name = TableNameKey::new(
1946 &table_info.catalog_name,
1947 &table_info.schema_name,
1948 &new_table_name,
1949 );
1950
1951 assert!(
1952 table_metadata_manager
1953 .table_name_manager()
1954 .get(old_table_name)
1955 .await
1956 .unwrap()
1957 .is_none()
1958 );
1959
1960 assert_eq!(
1961 table_metadata_manager
1962 .table_name_manager()
1963 .get(new_table_name)
1964 .await
1965 .unwrap()
1966 .unwrap()
1967 .table_id(),
1968 table_id
1969 );
1970 }
1971
1972 #[tokio::test]
1973 async fn test_update_table_info() {
1974 let mem_kv = Arc::new(MemoryKvBackend::default());
1975 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1976 let region_route = new_test_region_route();
1977 let region_routes = vec![region_route.clone()];
1978 let table_info: RawTableInfo = new_test_table_info().into();
1979 let table_id = table_info.ident.table_id;
1980 create_physical_table_metadata(
1982 &table_metadata_manager,
1983 table_info.clone(),
1984 region_routes.clone(),
1985 HashMap::new(),
1986 )
1987 .await
1988 .unwrap();
1989
1990 let mut new_table_info = table_info.clone();
1991 new_table_info.name = "hi".to_string();
1992 let current_table_info_value =
1993 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1994 table_metadata_manager
1996 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1997 .await
1998 .unwrap();
1999 table_metadata_manager
2001 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2002 .await
2003 .unwrap();
2004
2005 let updated_table_info = table_metadata_manager
2007 .table_info_manager()
2008 .get(table_id)
2009 .await
2010 .unwrap()
2011 .unwrap()
2012 .into_inner();
2013 assert_eq!(updated_table_info.table_info, new_table_info);
2014
2015 let mut wrong_table_info = table_info.clone();
2016 wrong_table_info.name = "wrong".to_string();
2017 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2018 current_table_info_value.update(wrong_table_info),
2019 );
2020 assert!(
2023 table_metadata_manager
2024 .update_table_info(&wrong_table_info_value, None, new_table_info)
2025 .await
2026 .is_err()
2027 )
2028 }
2029
2030 #[tokio::test]
2031 async fn test_update_table_leader_region_status() {
2032 let mem_kv = Arc::new(MemoryKvBackend::default());
2033 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2034 let datanode = 1;
2035 let region_routes = vec![
2036 RegionRoute {
2037 region: Region {
2038 id: 1.into(),
2039 name: "r1".to_string(),
2040 partition: None,
2041 attrs: BTreeMap::new(),
2042 partition_expr: Default::default(),
2043 },
2044 leader_peer: Some(Peer::new(datanode, "a2")),
2045 leader_state: Some(LeaderState::Downgrading),
2046 follower_peers: vec![],
2047 leader_down_since: Some(current_time_millis()),
2048 },
2049 RegionRoute {
2050 region: Region {
2051 id: 2.into(),
2052 name: "r2".to_string(),
2053 partition: None,
2054 attrs: BTreeMap::new(),
2055 partition_expr: Default::default(),
2056 },
2057 leader_peer: Some(Peer::new(datanode, "a1")),
2058 leader_state: None,
2059 follower_peers: vec![],
2060 leader_down_since: None,
2061 },
2062 ];
2063 let table_info: RawTableInfo = new_test_table_info().into();
2064 let table_id = table_info.ident.table_id;
2065 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2066 TableRouteValue::physical(region_routes.clone()),
2067 );
2068
2069 create_physical_table_metadata(
2071 &table_metadata_manager,
2072 table_info.clone(),
2073 region_routes.clone(),
2074 HashMap::new(),
2075 )
2076 .await
2077 .unwrap();
2078
2079 table_metadata_manager
2080 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2081 if region_route.leader_state.is_some() {
2082 None
2083 } else {
2084 Some(Some(LeaderState::Downgrading))
2085 }
2086 })
2087 .await
2088 .unwrap();
2089
2090 let updated_route_value = table_metadata_manager
2091 .table_route_manager()
2092 .table_route_storage()
2093 .get(table_id)
2094 .await
2095 .unwrap()
2096 .unwrap();
2097
2098 assert_eq!(
2099 updated_route_value.region_routes().unwrap()[0].leader_state,
2100 Some(LeaderState::Downgrading)
2101 );
2102
2103 assert!(
2104 updated_route_value.region_routes().unwrap()[0]
2105 .leader_down_since
2106 .is_some()
2107 );
2108
2109 assert_eq!(
2110 updated_route_value.region_routes().unwrap()[1].leader_state,
2111 Some(LeaderState::Downgrading)
2112 );
2113 assert!(
2114 updated_route_value.region_routes().unwrap()[1]
2115 .leader_down_since
2116 .is_some()
2117 );
2118 }
2119
2120 async fn assert_datanode_table(
2121 table_metadata_manager: &TableMetadataManager,
2122 table_id: u32,
2123 region_routes: &[RegionRoute],
2124 ) {
2125 let region_distribution = region_distribution(region_routes);
2126 for (datanode, regions) in region_distribution {
2127 let got = table_metadata_manager
2128 .datanode_table_manager()
2129 .get(&DatanodeTableKey::new(datanode, table_id))
2130 .await
2131 .unwrap()
2132 .unwrap();
2133
2134 assert_eq!(got.regions, regions.leader_regions);
2135 assert_eq!(got.follower_regions, regions.follower_regions);
2136 }
2137 }
2138
2139 #[tokio::test]
2140 async fn test_update_table_route() {
2141 let mem_kv = Arc::new(MemoryKvBackend::default());
2142 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2143 let region_route = new_test_region_route();
2144 let region_routes = vec![region_route.clone()];
2145 let table_info: RawTableInfo = new_test_table_info().into();
2146 let table_id = table_info.ident.table_id;
2147 let engine = table_info.meta.engine.as_str();
2148 let region_storage_path =
2149 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2150 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2151 TableRouteValue::physical(region_routes.clone()),
2152 );
2153
2154 create_physical_table_metadata(
2156 &table_metadata_manager,
2157 table_info.clone(),
2158 region_routes.clone(),
2159 HashMap::new(),
2160 )
2161 .await
2162 .unwrap();
2163
2164 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2165 let new_region_routes = vec![
2166 new_region_route(1, 1),
2167 new_region_route(2, 2),
2168 new_region_route(3, 3),
2169 ];
2170 table_metadata_manager
2172 .update_table_route(
2173 table_id,
2174 RegionInfo {
2175 engine: engine.to_string(),
2176 region_storage_path: region_storage_path.clone(),
2177 region_options: HashMap::new(),
2178 region_wal_options: HashMap::new(),
2179 },
2180 ¤t_table_route_value,
2181 new_region_routes.clone(),
2182 &HashMap::new(),
2183 &HashMap::new(),
2184 )
2185 .await
2186 .unwrap();
2187 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2188
2189 table_metadata_manager
2191 .update_table_route(
2192 table_id,
2193 RegionInfo {
2194 engine: engine.to_string(),
2195 region_storage_path: region_storage_path.clone(),
2196 region_options: HashMap::new(),
2197 region_wal_options: HashMap::new(),
2198 },
2199 ¤t_table_route_value,
2200 new_region_routes.clone(),
2201 &HashMap::new(),
2202 &HashMap::new(),
2203 )
2204 .await
2205 .unwrap();
2206
2207 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2208 current_table_route_value
2209 .inner
2210 .update(new_region_routes.clone())
2211 .unwrap(),
2212 );
2213 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2214 table_metadata_manager
2216 .update_table_route(
2217 table_id,
2218 RegionInfo {
2219 engine: engine.to_string(),
2220 region_storage_path: region_storage_path.clone(),
2221 region_options: HashMap::new(),
2222 region_wal_options: HashMap::new(),
2223 },
2224 ¤t_table_route_value,
2225 new_region_routes.clone(),
2226 &HashMap::new(),
2227 &HashMap::new(),
2228 )
2229 .await
2230 .unwrap();
2231 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2232
2233 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2236 current_table_route_value
2237 .update(vec![
2238 new_region_route(1, 1),
2239 new_region_route(2, 2),
2240 new_region_route(3, 3),
2241 new_region_route(4, 4),
2242 ])
2243 .unwrap(),
2244 );
2245 assert!(
2246 table_metadata_manager
2247 .update_table_route(
2248 table_id,
2249 RegionInfo {
2250 engine: engine.to_string(),
2251 region_storage_path: region_storage_path.clone(),
2252 region_options: HashMap::new(),
2253 region_wal_options: HashMap::new(),
2254 },
2255 &wrong_table_route_value,
2256 new_region_routes,
2257 &HashMap::new(),
2258 &HashMap::new(),
2259 )
2260 .await
2261 .is_err()
2262 );
2263 }
2264
2265 #[tokio::test]
2266 async fn test_destroy_table_metadata() {
2267 let mem_kv = Arc::new(MemoryKvBackend::default());
2268 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2269 let table_id = 1025;
2270 let table_name = "foo";
2271 let task = test_create_table_task(table_name, table_id);
2272 let options = create_mock_region_wal_options();
2273 let serialized_options = options
2274 .iter()
2275 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2276 .collect::<HashMap<_, _>>();
2277 table_metadata_manager
2278 .create_table_metadata(
2279 task.table_info,
2280 TableRouteValue::physical(vec![
2281 RegionRoute {
2282 region: Region::new_test(RegionId::new(table_id, 1)),
2283 leader_peer: Some(Peer::empty(1)),
2284 follower_peers: vec![Peer::empty(5)],
2285 leader_state: None,
2286 leader_down_since: None,
2287 },
2288 RegionRoute {
2289 region: Region::new_test(RegionId::new(table_id, 2)),
2290 leader_peer: Some(Peer::empty(2)),
2291 follower_peers: vec![Peer::empty(4)],
2292 leader_state: None,
2293 leader_down_since: None,
2294 },
2295 RegionRoute {
2296 region: Region::new_test(RegionId::new(table_id, 3)),
2297 leader_peer: Some(Peer::empty(3)),
2298 follower_peers: vec![],
2299 leader_state: None,
2300 leader_down_since: None,
2301 },
2302 ]),
2303 serialized_options,
2304 )
2305 .await
2306 .unwrap();
2307 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2308 let table_route_value = table_metadata_manager
2309 .table_route_manager
2310 .table_route_storage()
2311 .get_with_raw_bytes(table_id)
2312 .await
2313 .unwrap()
2314 .unwrap();
2315 table_metadata_manager
2316 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2317 .await
2318 .unwrap();
2319 assert!(mem_kv.is_empty());
2320 }
2321
2322 #[tokio::test]
2323 async fn test_restore_table_metadata() {
2324 let mem_kv = Arc::new(MemoryKvBackend::default());
2325 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2326 let table_id = 1025;
2327 let table_name = "foo";
2328 let task = test_create_table_task(table_name, table_id);
2329 let options = create_mock_region_wal_options();
2330 let serialized_options = options
2331 .iter()
2332 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2333 .collect::<HashMap<_, _>>();
2334 table_metadata_manager
2335 .create_table_metadata(
2336 task.table_info,
2337 TableRouteValue::physical(vec![
2338 RegionRoute {
2339 region: Region::new_test(RegionId::new(table_id, 1)),
2340 leader_peer: Some(Peer::empty(1)),
2341 follower_peers: vec![Peer::empty(5)],
2342 leader_state: None,
2343 leader_down_since: None,
2344 },
2345 RegionRoute {
2346 region: Region::new_test(RegionId::new(table_id, 2)),
2347 leader_peer: Some(Peer::empty(2)),
2348 follower_peers: vec![Peer::empty(4)],
2349 leader_state: None,
2350 leader_down_since: None,
2351 },
2352 RegionRoute {
2353 region: Region::new_test(RegionId::new(table_id, 3)),
2354 leader_peer: Some(Peer::empty(3)),
2355 follower_peers: vec![],
2356 leader_state: None,
2357 leader_down_since: None,
2358 },
2359 ]),
2360 serialized_options,
2361 )
2362 .await
2363 .unwrap();
2364 let expected_result = mem_kv.dump();
2365 let table_route_value = table_metadata_manager
2366 .table_route_manager
2367 .table_route_storage()
2368 .get_with_raw_bytes(table_id)
2369 .await
2370 .unwrap()
2371 .unwrap();
2372 let region_routes = table_route_value.region_routes().unwrap();
2373 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2374 let table_route_value = TableRouteValue::physical(region_routes.clone());
2375 table_metadata_manager
2376 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2377 .await
2378 .unwrap();
2379 table_metadata_manager
2380 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2381 .await
2382 .unwrap();
2383 let kvs = mem_kv.dump();
2384 assert_eq!(kvs, expected_result);
2385 table_metadata_manager
2387 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2388 .await
2389 .unwrap();
2390 let kvs = mem_kv.dump();
2391 assert_eq!(kvs, expected_result);
2392 }
2393
2394 #[tokio::test]
2395 async fn test_create_update_view_info() {
2396 let mem_kv = Arc::new(MemoryKvBackend::default());
2397 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2398
2399 let view_info: RawTableInfo = new_test_table_info().into();
2400
2401 let view_id = view_info.ident.table_id;
2402
2403 let logical_plan: Vec<u8> = vec![1, 2, 3];
2404 let columns = vec!["a".to_string()];
2405 let plan_columns = vec!["number".to_string()];
2406 let table_names = new_test_table_names();
2407 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2408
2409 table_metadata_manager
2411 .create_view_metadata(
2412 view_info.clone(),
2413 logical_plan.clone(),
2414 table_names.clone(),
2415 columns.clone(),
2416 plan_columns.clone(),
2417 definition.to_string(),
2418 )
2419 .await
2420 .unwrap();
2421
2422 {
2423 let current_view_info = table_metadata_manager
2425 .view_info_manager()
2426 .get(view_id)
2427 .await
2428 .unwrap()
2429 .unwrap()
2430 .into_inner();
2431 assert_eq!(current_view_info.view_info, logical_plan);
2432 assert_eq!(current_view_info.table_names, table_names);
2433 assert_eq!(current_view_info.definition, definition);
2434 assert_eq!(current_view_info.columns, columns);
2435 assert_eq!(current_view_info.plan_columns, plan_columns);
2436 let current_table_info = table_metadata_manager
2438 .table_info_manager()
2439 .get(view_id)
2440 .await
2441 .unwrap()
2442 .unwrap()
2443 .into_inner();
2444 assert_eq!(current_table_info.table_info, view_info);
2445 }
2446
2447 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2448 let new_table_names = {
2449 let mut set = HashSet::new();
2450 set.insert(TableName {
2451 catalog_name: "greptime".to_string(),
2452 schema_name: "public".to_string(),
2453 table_name: "b_table".to_string(),
2454 });
2455 set.insert(TableName {
2456 catalog_name: "greptime".to_string(),
2457 schema_name: "public".to_string(),
2458 table_name: "c_table".to_string(),
2459 });
2460 set
2461 };
2462 let new_columns = vec!["b".to_string()];
2463 let new_plan_columns = vec!["number2".to_string()];
2464 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2465
2466 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2467 logical_plan.clone(),
2468 table_names,
2469 columns,
2470 plan_columns,
2471 definition.to_string(),
2472 ));
2473 table_metadata_manager
2475 .update_view_info(
2476 view_id,
2477 ¤t_view_info_value,
2478 new_logical_plan.clone(),
2479 new_table_names.clone(),
2480 new_columns.clone(),
2481 new_plan_columns.clone(),
2482 new_definition.to_string(),
2483 )
2484 .await
2485 .unwrap();
2486 table_metadata_manager
2488 .update_view_info(
2489 view_id,
2490 ¤t_view_info_value,
2491 new_logical_plan.clone(),
2492 new_table_names.clone(),
2493 new_columns.clone(),
2494 new_plan_columns.clone(),
2495 new_definition.to_string(),
2496 )
2497 .await
2498 .unwrap();
2499
2500 let updated_view_info = table_metadata_manager
2502 .view_info_manager()
2503 .get(view_id)
2504 .await
2505 .unwrap()
2506 .unwrap()
2507 .into_inner();
2508 assert_eq!(updated_view_info.view_info, new_logical_plan);
2509 assert_eq!(updated_view_info.table_names, new_table_names);
2510 assert_eq!(updated_view_info.definition, new_definition);
2511 assert_eq!(updated_view_info.columns, new_columns);
2512 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2513
2514 let wrong_view_info = logical_plan.clone();
2515 let wrong_definition = "wrong_definition";
2516 let wrong_view_info_value =
2517 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2518 wrong_view_info,
2519 new_table_names.clone(),
2520 new_columns.clone(),
2521 new_plan_columns.clone(),
2522 wrong_definition.to_string(),
2523 ));
2524 assert!(
2527 table_metadata_manager
2528 .update_view_info(
2529 view_id,
2530 &wrong_view_info_value,
2531 new_logical_plan.clone(),
2532 new_table_names.clone(),
2533 vec!["c".to_string()],
2534 vec!["number3".to_string()],
2535 wrong_definition.to_string(),
2536 )
2537 .await
2538 .is_err()
2539 );
2540
2541 let current_view_info = table_metadata_manager
2543 .view_info_manager()
2544 .get(view_id)
2545 .await
2546 .unwrap()
2547 .unwrap()
2548 .into_inner();
2549 assert_eq!(current_view_info.view_info, new_logical_plan);
2550 assert_eq!(current_view_info.table_names, new_table_names);
2551 assert_eq!(current_view_info.definition, new_definition);
2552 assert_eq!(current_view_info.columns, new_columns);
2553 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2554 }
2555
2556 #[test]
2557 fn test_region_role_set_deserialize() {
2558 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2559 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2560 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2561 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2562
2563 let s = r#"[1, 2, 3]"#;
2564 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2565 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2566 assert!(region_role_set.follower_regions.is_empty());
2567 }
2568
2569 #[test]
2570 fn test_region_distribution_deserialize() {
2571 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2572 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2573 assert_eq!(region_distribution.len(), 2);
2574 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2575 assert!(region_distribution[&1].follower_regions.is_empty());
2576 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2577 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2578 }
2579}