1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{TableId, TableInfo};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190pub const ELECTION_KEY: &str = "__metasrv_election";
192pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197 TABLE_NAME_KEY_PREFIX,
198 CATALOG_NAME_KEY_PREFIX,
199 SCHEMA_NAME_KEY_PREFIX,
200 TABLE_ROUTE_PREFIX,
201 NODE_ADDRESS_PREFIX,
202];
203
204#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207 pub leader_regions: Vec<RegionNumber>,
209 pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215 where
216 D: serde::Deserializer<'de>,
217 {
218 #[derive(Deserialize)]
219 #[serde(untagged)]
220 enum RegionRoleSetOrLeaderOnly {
221 Full {
222 leader_regions: Vec<RegionNumber>,
223 follower_regions: Vec<RegionNumber>,
224 },
225 LeaderOnly(Vec<RegionNumber>),
226 }
227 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228 RegionRoleSetOrLeaderOnly::Full {
229 leader_regions,
230 follower_regions,
231 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233 Ok(RegionRoleSet::new(leader_regions, vec![]))
234 }
235 }
236 }
237}
238
239impl RegionRoleSet {
240 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242 Self {
243 leader_regions,
244 follower_regions,
245 }
246 }
247
248 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250 self.leader_regions.push(region_number);
251 }
252
253 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255 self.follower_regions.push(region_number);
256 }
257
258 pub fn sort(&mut self) {
260 self.follower_regions.sort();
261 self.leader_regions.sort();
262 }
263}
264
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270pub type FlowId = u32;
272pub type FlowPartitionId = u32;
274
275lazy_static! {
276 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280 static ref TABLE_INFO_KEY_PATTERN: Regex =
281 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285 static ref VIEW_INFO_KEY_PATTERN: Regex =
286 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315 ))
316 .unwrap();
317}
318
319lazy_static! {
320 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323 ))
324 .unwrap();
325}
326
327lazy_static! {
328 static ref NODE_ADDRESS_PATTERN: Regex =
329 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340 ))
341 .unwrap();
342}
343
344pub trait MetadataKey<'a, T> {
346 fn to_bytes(&self) -> Vec<u8>;
347
348 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355 fn from(value: Vec<u8>) -> Self {
356 Self(value)
357 }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361 fn to_bytes(&self) -> Vec<u8> {
362 self.0.clone()
363 }
364
365 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366 Ok(BytesAdapter(bytes.to_vec()))
367 }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371 fn build_get_op(
372 &self,
373 ) -> (
374 TxnOp,
375 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376 );
377}
378
379pub trait MetadataValue {
380 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381 where
382 Self: Sized;
383
384 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390 table_name_manager: TableNameManager,
391 table_info_manager: TableInfoManager,
392 view_info_manager: ViewInfoManager,
393 datanode_table_manager: DatanodeTableManager,
394 catalog_manager: CatalogManager,
395 schema_manager: SchemaManager,
396 table_route_manager: TableRouteManager,
397 table_repart_manager: TableRepartManager,
398 tombstone_manager: TombstoneManager,
399 topic_name_manager: TopicNameManager,
400 topic_region_manager: TopicRegionManager,
401 kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406 ($got:expr, $expected_value:expr, $name:expr) => {
407 ensure!(
408 $got == $expected_value,
409 error::UnexpectedSnafu {
410 err_msg: format!(
411 "Reads the different value: {:?} during {}, expected: {:?}",
412 $got, $name, $expected_value
413 )
414 }
415 );
416 };
417}
418
419pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429 bytes: Bytes,
431 inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436 type Target = T;
437
438 fn deref(&self) -> &Self::Target {
439 &self.inner
440 }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444 fn deref_mut(&mut self) -> &mut Self::Target {
445 &mut self.inner
446 }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(
452 f,
453 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454 self.inner, self.bytes
455 )
456 }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464 where
465 S: serde::Serializer,
466 {
467 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470 }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474 for DeserializedValueWithBytes<T>
475{
476 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480 where
481 D: serde::Deserializer<'de>,
482 {
483 let buf = String::deserialize(deserializer)?;
484 let bytes = Bytes::from(buf);
485
486 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489 Ok(value)
490 }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494 fn clone(&self) -> Self {
495 Self {
496 bytes: self.bytes.clone(),
497 inner: self.inner.clone(),
498 }
499 }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506 let inner = T::try_from_raw_value(&bytes)?;
507 Ok(Self { bytes, inner })
508 }
509
510 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514 }
515
516 pub fn into_inner(self) -> T {
517 self.inner
518 }
519
520 pub fn get_inner_ref(&self) -> &T {
521 &self.inner
522 }
523
524 pub fn get_raw_bytes(&self) -> Vec<u8> {
526 self.bytes.to_vec()
527 }
528
529 #[cfg(any(test, feature = "testing"))]
530 pub fn from_inner(inner: T) -> Self {
531 let bytes = serde_json::to_vec(&inner).unwrap();
532
533 Self {
534 bytes: Bytes::from(bytes),
535 inner,
536 }
537 }
538}
539
540impl TableMetadataManager {
541 pub fn new(kv_backend: KvBackendRef) -> Self {
542 TableMetadataManager {
543 table_name_manager: TableNameManager::new(kv_backend.clone()),
544 table_info_manager: TableInfoManager::new(kv_backend.clone()),
545 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547 catalog_manager: CatalogManager::new(kv_backend.clone()),
548 schema_manager: SchemaManager::new(kv_backend.clone()),
549 table_route_manager: TableRouteManager::new(kv_backend.clone()),
550 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554 kv_backend,
555 }
556 }
557
558 pub fn new_with_custom_tombstone_prefix(
560 kv_backend: KvBackendRef,
561 tombstone_prefix: &str,
562 ) -> Self {
563 Self {
564 table_name_manager: TableNameManager::new(kv_backend.clone()),
565 table_info_manager: TableInfoManager::new(kv_backend.clone()),
566 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568 catalog_manager: CatalogManager::new(kv_backend.clone()),
569 schema_manager: SchemaManager::new(kv_backend.clone()),
570 table_route_manager: TableRouteManager::new(kv_backend.clone()),
571 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572 tombstone_manager: TombstoneManager::new_with_prefix(
573 kv_backend.clone(),
574 tombstone_prefix,
575 ),
576 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578 kv_backend,
579 }
580 }
581
582 pub async fn init(&self) -> Result<()> {
583 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585 self.catalog_manager().create(catalog_name, true).await?;
586
587 let internal_schemas = [
588 DEFAULT_SCHEMA_NAME,
589 INFORMATION_SCHEMA_NAME,
590 DEFAULT_PRIVATE_SCHEMA_NAME,
591 ];
592
593 for schema_name in internal_schemas {
594 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596 self.schema_manager().create(schema_key, None, true).await?;
597 }
598
599 Ok(())
600 }
601
602 pub fn table_name_manager(&self) -> &TableNameManager {
603 &self.table_name_manager
604 }
605
606 pub fn table_info_manager(&self) -> &TableInfoManager {
607 &self.table_info_manager
608 }
609
610 pub fn view_info_manager(&self) -> &ViewInfoManager {
611 &self.view_info_manager
612 }
613
614 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615 &self.datanode_table_manager
616 }
617
618 pub fn catalog_manager(&self) -> &CatalogManager {
619 &self.catalog_manager
620 }
621
622 pub fn schema_manager(&self) -> &SchemaManager {
623 &self.schema_manager
624 }
625
626 pub fn table_route_manager(&self) -> &TableRouteManager {
627 &self.table_route_manager
628 }
629
630 pub fn table_repart_manager(&self) -> &TableRepartManager {
631 &self.table_repart_manager
632 }
633
634 pub fn topic_name_manager(&self) -> &TopicNameManager {
635 &self.topic_name_manager
636 }
637
638 pub fn topic_region_manager(&self) -> &TopicRegionManager {
639 &self.topic_region_manager
640 }
641
642 pub fn kv_backend(&self) -> &KvBackendRef {
643 &self.kv_backend
644 }
645
646 pub async fn get_full_table_info(
647 &self,
648 table_id: TableId,
649 ) -> Result<(
650 Option<DeserializedValueWithBytes<TableInfoValue>>,
651 Option<DeserializedValueWithBytes<TableRouteValue>>,
652 )> {
653 let table_info_key = TableInfoKey::new(table_id);
654 let table_route_key = TableRouteKey::new(table_id);
655 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659 let mut res = self.kv_backend.txn(txn).await?;
660 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663 Ok((table_info_value, table_route_value))
664 }
665
666 pub async fn create_view_metadata(
676 &self,
677 view_info: TableInfo,
678 raw_logical_plan: Vec<u8>,
679 table_names: HashSet<TableName>,
680 columns: Vec<String>,
681 plan_columns: Vec<String>,
682 definition: String,
683 ) -> Result<()> {
684 let view_id = view_info.ident.table_id;
685
686 let view_name = TableNameKey::new(
688 &view_info.catalog_name,
689 &view_info.schema_name,
690 &view_info.name,
691 );
692 let create_table_name_txn = self
693 .table_name_manager()
694 .build_create_txn(&view_name, view_id)?;
695
696 let table_info_value = TableInfoValue::new(view_info);
698
699 let (create_table_info_txn, on_create_table_info_failure) = self
700 .table_info_manager()
701 .build_create_txn(view_id, &table_info_value)?;
702
703 let view_info_value = ViewInfoValue::new(
705 raw_logical_plan,
706 table_names,
707 columns,
708 plan_columns,
709 definition,
710 );
711 let (create_view_info_txn, on_create_view_info_failure) = self
712 .view_info_manager()
713 .build_create_txn(view_id, &view_info_value)?;
714
715 let txn = Txn::merge_all(vec![
716 create_table_name_txn,
717 create_table_info_txn,
718 create_view_info_txn,
719 ]);
720
721 let mut r = self.kv_backend.txn(txn).await?;
722
723 if !r.succeeded {
725 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726 let remote_table_info = on_create_table_info_failure(&mut set)?
727 .context(error::UnexpectedSnafu {
728 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729 })?
730 .into_inner();
731
732 let remote_view_info = on_create_view_info_failure(&mut set)?
733 .context(error::UnexpectedSnafu {
734 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735 })?
736 .into_inner();
737
738 let op_name = "the creating view metadata";
739 ensure_values!(remote_table_info, table_info_value, op_name);
740 ensure_values!(remote_view_info, view_info_value, op_name);
741 }
742
743 Ok(())
744 }
745
746 pub async fn create_table_metadata(
749 &self,
750 table_info: TableInfo,
751 table_route_value: TableRouteValue,
752 region_wal_options: HashMap<RegionNumber, String>,
753 ) -> Result<()> {
754 let table_id = table_info.ident.table_id;
755 let engine = table_info.meta.engine.clone();
756
757 let table_name = TableNameKey::new(
759 &table_info.catalog_name,
760 &table_info.schema_name,
761 &table_info.name,
762 );
763 let create_table_name_txn = self
764 .table_name_manager()
765 .build_create_txn(&table_name, table_id)?;
766
767 let region_options = table_info.to_region_options();
768 let table_info_value = TableInfoValue::new(table_info);
770 let (create_table_info_txn, on_create_table_info_failure) = self
771 .table_info_manager()
772 .build_create_txn(table_id, &table_info_value)?;
773
774 let (create_table_route_txn, on_create_table_route_failure) = self
775 .table_route_manager()
776 .table_route_storage()
777 .build_create_txn(table_id, &table_route_value)?;
778
779 let create_topic_region_txn = self
780 .topic_region_manager
781 .build_create_txn(table_id, ®ion_wal_options)?;
782
783 let mut txn = Txn::merge_all(vec![
784 create_table_name_txn,
785 create_table_info_txn,
786 create_table_route_txn,
787 create_topic_region_txn,
788 ]);
789
790 if let TableRouteValue::Physical(x) = &table_route_value {
791 let region_storage_path = table_info_value.region_storage_path();
792 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793 table_id,
794 &engine,
795 ®ion_storage_path,
796 region_options,
797 region_wal_options,
798 region_distribution(&x.region_routes),
799 )?;
800 txn = txn.merge(create_datanode_table_txn);
801 }
802
803 let mut r = self.kv_backend.txn(txn).await?;
804
805 if !r.succeeded {
807 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808 let remote_table_info = on_create_table_info_failure(&mut set)?
809 .context(error::UnexpectedSnafu {
810 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811 })?
812 .into_inner();
813
814 let remote_table_route = on_create_table_route_failure(&mut set)?
815 .context(error::UnexpectedSnafu {
816 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817 })?
818 .into_inner();
819
820 let op_name = "the creating table metadata";
821 ensure_values!(remote_table_info, table_info_value, op_name);
822 ensure_values!(remote_table_route, table_route_value, op_name);
823 }
824
825 Ok(())
826 }
827
828 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829 self.kv_backend.max_txn_ops() / 3
832 }
833
834 pub async fn create_logical_tables_metadata(
836 &self,
837 tables_data: Vec<(TableInfo, TableRouteValue)>,
838 ) -> Result<()> {
839 let len = tables_data.len();
840 let mut txns = Vec::with_capacity(3 * len);
841 struct OnFailure<F1, R1, F2, R2>
842 where
843 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845 {
846 table_info_value: TableInfoValue,
847 on_create_table_info_failure: F1,
848 table_route_value: TableRouteValue,
849 on_create_table_route_failure: F2,
850 }
851 let mut on_failures = Vec::with_capacity(len);
852 for (table_info, table_route_value) in tables_data {
853 let table_id = table_info.ident.table_id;
854
855 let table_name = TableNameKey::new(
857 &table_info.catalog_name,
858 &table_info.schema_name,
859 &table_info.name,
860 );
861 let create_table_name_txn = self
862 .table_name_manager()
863 .build_create_txn(&table_name, table_id)?;
864 txns.push(create_table_name_txn);
865
866 let table_info_value = TableInfoValue::new(table_info);
868 let (create_table_info_txn, on_create_table_info_failure) =
869 self.table_info_manager()
870 .build_create_txn(table_id, &table_info_value)?;
871 txns.push(create_table_info_txn);
872
873 let (create_table_route_txn, on_create_table_route_failure) = self
874 .table_route_manager()
875 .table_route_storage()
876 .build_create_txn(table_id, &table_route_value)?;
877 txns.push(create_table_route_txn);
878
879 on_failures.push(OnFailure {
880 table_info_value,
881 on_create_table_info_failure,
882 table_route_value,
883 on_create_table_route_failure,
884 });
885 }
886
887 let txn = Txn::merge_all(txns);
888 let mut r = self.kv_backend.txn(txn).await?;
889
890 if !r.succeeded {
892 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893 for on_failure in on_failures {
894 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895 .context(error::UnexpectedSnafu {
896 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897 })?
898 .into_inner();
899
900 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901 .context(error::UnexpectedSnafu {
902 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903 })?
904 .into_inner();
905
906 let op_name = "the creating logical tables metadata";
907 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909 }
910 }
911
912 Ok(())
913 }
914
915 fn table_metadata_keys(
916 &self,
917 table_id: TableId,
918 table_name: &TableName,
919 table_route_value: &TableRouteValue,
920 region_wal_options: &HashMap<RegionNumber, WalOptions>,
921 ) -> Result<Vec<Vec<u8>>> {
922 let datanode_ids = if table_route_value.is_physical() {
924 region_distribution(table_route_value.region_routes()?)
925 .into_keys()
926 .collect()
927 } else {
928 vec![]
929 };
930 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931 let table_name = TableNameKey::new(
932 &table_name.catalog_name,
933 &table_name.schema_name,
934 &table_name.table_name,
935 );
936 let table_info_key = TableInfoKey::new(table_id);
937 let table_route_key = TableRouteKey::new(table_id);
938 let table_repart_key = TableRepartKey::new(table_id);
939 let datanode_table_keys = datanode_ids
940 .into_iter()
941 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942 .collect::<HashSet<_>>();
943 let topic_region_map = self
944 .topic_region_manager
945 .get_topic_region_mapping(table_id, region_wal_options);
946 let topic_region_keys = topic_region_map
947 .iter()
948 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949 .collect::<Vec<_>>();
950 keys.push(table_name.to_bytes());
951 keys.push(table_info_key.to_bytes());
952 keys.push(table_route_key.to_bytes());
953 keys.push(table_repart_key.to_bytes());
954 for key in &datanode_table_keys {
955 keys.push(key.to_bytes());
956 }
957 for key in topic_region_keys {
958 keys.push(key.to_bytes());
959 }
960 Ok(keys)
961 }
962
963 pub async fn delete_table_metadata(
966 &self,
967 table_id: TableId,
968 table_name: &TableName,
969 table_route_value: &TableRouteValue,
970 region_wal_options: &HashMap<RegionNumber, WalOptions>,
971 ) -> Result<()> {
972 let keys =
973 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974 self.tombstone_manager.create(keys).await.map(|_| ())
975 }
976
977 pub async fn delete_table_metadata_tombstone(
980 &self,
981 table_id: TableId,
982 table_name: &TableName,
983 table_route_value: &TableRouteValue,
984 region_wal_options: &HashMap<RegionNumber, WalOptions>,
985 ) -> Result<()> {
986 let table_metadata_keys =
987 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988 self.tombstone_manager
989 .delete(table_metadata_keys)
990 .await
991 .map(|_| ())
992 }
993
994 pub async fn restore_table_metadata(
997 &self,
998 table_id: TableId,
999 table_name: &TableName,
1000 table_route_value: &TableRouteValue,
1001 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002 ) -> Result<()> {
1003 let keys =
1004 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005 self.tombstone_manager.restore(keys).await.map(|_| ())
1006 }
1007
1008 pub async fn destroy_table_metadata(
1011 &self,
1012 table_id: TableId,
1013 table_name: &TableName,
1014 table_route_value: &TableRouteValue,
1015 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016 ) -> Result<()> {
1017 let keys =
1018 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019 let _ = self
1020 .kv_backend
1021 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022 .await?;
1023 Ok(())
1024 }
1025
1026 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027 let mut keys = Vec::with_capacity(3);
1028 let view_name = TableNameKey::new(
1029 &view_name.catalog_name,
1030 &view_name.schema_name,
1031 &view_name.table_name,
1032 );
1033 let table_info_key = TableInfoKey::new(view_id);
1034 let view_info_key = ViewInfoKey::new(view_id);
1035 keys.push(view_name.to_bytes());
1036 keys.push(table_info_key.to_bytes());
1037 keys.push(view_info_key.to_bytes());
1038
1039 Ok(keys)
1040 }
1041
1042 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045 let keys = self.view_info_keys(view_id, view_name)?;
1046 let _ = self
1047 .kv_backend
1048 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049 .await?;
1050 Ok(())
1051 }
1052
1053 pub async fn rename_table(
1057 &self,
1058 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059 new_table_name: String,
1060 ) -> Result<()> {
1061 let current_table_info = ¤t_table_info_value.table_info;
1062 let table_id = current_table_info.ident.table_id;
1063
1064 let table_name_key = TableNameKey::new(
1065 ¤t_table_info.catalog_name,
1066 ¤t_table_info.schema_name,
1067 ¤t_table_info.name,
1068 );
1069
1070 let new_table_name_key = TableNameKey::new(
1071 ¤t_table_info.catalog_name,
1072 ¤t_table_info.schema_name,
1073 &new_table_name,
1074 );
1075
1076 let update_table_name_txn = self.table_name_manager().build_update_txn(
1078 &table_name_key,
1079 &new_table_name_key,
1080 table_id,
1081 )?;
1082
1083 let new_table_info_value = current_table_info_value
1084 .inner
1085 .with_update(move |table_info| {
1086 table_info.name = new_table_name;
1087 });
1088
1089 let (update_table_info_txn, on_update_table_info_failure) = self
1091 .table_info_manager()
1092 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096 let mut r = self.kv_backend.txn(txn).await?;
1097
1098 if !r.succeeded {
1100 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101 let remote_table_info = on_update_table_info_failure(&mut set)?
1102 .context(error::UnexpectedSnafu {
1103 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104 })?
1105 .into_inner();
1106
1107 let op_name = "the renaming table metadata";
1108 ensure_values!(remote_table_info, new_table_info_value, op_name);
1109 }
1110
1111 Ok(())
1112 }
1113
1114 pub async fn update_table_info(
1118 &self,
1119 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120 region_distribution: Option<RegionDistribution>,
1121 new_table_info: TableInfo,
1122 ) -> Result<()> {
1123 let table_id = current_table_info_value.table_info.ident.table_id;
1124 let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126 let (update_table_info_txn, on_update_table_info_failure) = self
1128 .table_info_manager()
1129 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131 let txn = if let Some(region_distribution) = region_distribution {
1132 let new_region_options = new_table_info_value.table_info.to_region_options();
1134 let update_datanode_table_options_txn = self
1135 .datanode_table_manager
1136 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137 .await?;
1138 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139 } else {
1140 update_table_info_txn
1141 };
1142
1143 let mut r = self.kv_backend.txn(txn).await?;
1144 if !r.succeeded {
1146 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147 let remote_table_info = on_update_table_info_failure(&mut set)?
1148 .context(error::UnexpectedSnafu {
1149 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150 })?
1151 .into_inner();
1152
1153 let op_name = "the updating table info";
1154 ensure_values!(remote_table_info, new_table_info_value, op_name);
1155 }
1156 Ok(())
1157 }
1158
1159 #[allow(clippy::too_many_arguments)]
1170 pub async fn update_view_info(
1171 &self,
1172 view_id: TableId,
1173 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174 new_view_info: Vec<u8>,
1175 table_names: HashSet<TableName>,
1176 columns: Vec<String>,
1177 plan_columns: Vec<String>,
1178 definition: String,
1179 ) -> Result<()> {
1180 let new_view_info_value = current_view_info_value.update(
1181 new_view_info,
1182 table_names,
1183 columns,
1184 plan_columns,
1185 definition,
1186 );
1187
1188 let (update_view_info_txn, on_update_view_info_failure) = self
1190 .view_info_manager()
1191 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195 if !r.succeeded {
1197 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198 let remote_view_info = on_update_view_info_failure(&mut set)?
1199 .context(error::UnexpectedSnafu {
1200 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201 })?
1202 .into_inner();
1203
1204 let op_name = "the updating view info";
1205 ensure_values!(remote_view_info, new_view_info_value, op_name);
1206 }
1207 Ok(())
1208 }
1209
1210 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211 self.kv_backend.max_txn_ops()
1212 }
1213
1214 pub async fn batch_update_table_info_values(
1215 &self,
1216 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1217 ) -> Result<()> {
1218 let len = table_info_value_pairs.len();
1219 let mut txns = Vec::with_capacity(len);
1220 struct OnFailure<F, R>
1221 where
1222 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223 {
1224 table_info_value: TableInfoValue,
1225 on_update_table_info_failure: F,
1226 }
1227 let mut on_failures = Vec::with_capacity(len);
1228
1229 for (table_info_value, new_table_info) in table_info_value_pairs {
1230 let table_id = table_info_value.table_info.ident.table_id;
1231
1232 let new_table_info_value = table_info_value.update(new_table_info);
1233
1234 let (update_table_info_txn, on_update_table_info_failure) =
1235 self.table_info_manager().build_update_txn(
1236 table_id,
1237 &table_info_value,
1238 &new_table_info_value,
1239 )?;
1240
1241 txns.push(update_table_info_txn);
1242
1243 on_failures.push(OnFailure {
1244 table_info_value: new_table_info_value,
1245 on_update_table_info_failure,
1246 });
1247 }
1248
1249 let txn = Txn::merge_all(txns);
1250 let mut r = self.kv_backend.txn(txn).await?;
1251
1252 if !r.succeeded {
1253 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254 for on_failure in on_failures {
1255 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256 .context(error::UnexpectedSnafu {
1257 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258 })?
1259 .into_inner();
1260
1261 let op_name = "the batch updating table info";
1262 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268
1269 pub async fn update_table_route(
1270 &self,
1271 table_id: TableId,
1272 region_info: RegionInfo,
1273 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274 new_region_routes: Vec<RegionRoute>,
1275 new_region_options: &HashMap<String, String>,
1276 new_region_wal_options: &HashMap<RegionNumber, String>,
1277 ) -> Result<()> {
1278 let current_region_distribution =
1280 region_distribution(current_table_route_value.region_routes()?);
1281 let new_region_distribution = region_distribution(&new_region_routes);
1282
1283 let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1284 table_id,
1285 ®ion_info.region_wal_options,
1286 new_region_wal_options,
1287 )?;
1288 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1289 table_id,
1290 region_info,
1291 current_region_distribution,
1292 new_region_distribution,
1293 new_region_options,
1294 new_region_wal_options,
1295 )?;
1296
1297 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299 let (update_table_route_txn, on_update_table_route_failure) = self
1300 .table_route_manager()
1301 .table_route_storage()
1302 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1303
1304 let txn = Txn::merge_all(vec![
1305 update_datanode_table_txn,
1306 update_table_route_txn,
1307 update_topic_region_txn,
1308 ]);
1309
1310 let mut r = self.kv_backend.txn(txn).await?;
1311
1312 if !r.succeeded {
1314 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1315 let remote_table_route = on_update_table_route_failure(&mut set)?
1316 .context(error::UnexpectedSnafu {
1317 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1318 })?
1319 .into_inner();
1320
1321 let op_name = "the updating table route";
1322 ensure_values!(remote_table_route, new_table_route_value, op_name);
1323 }
1324
1325 Ok(())
1326 }
1327
1328 pub async fn update_leader_region_status<F>(
1330 &self,
1331 table_id: TableId,
1332 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1333 next_region_route_status: F,
1334 ) -> Result<()>
1335 where
1336 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1337 {
1338 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1339
1340 let mut updated = 0;
1341 for route in &mut new_region_routes {
1342 if let Some(state) = next_region_route_status(route)
1343 && route.set_leader_state(state)
1344 {
1345 updated += 1;
1346 }
1347 }
1348
1349 if updated == 0 {
1350 warn!("No leader status updated");
1351 return Ok(());
1352 }
1353
1354 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1356
1357 let (update_table_route_txn, on_update_table_route_failure) = self
1358 .table_route_manager()
1359 .table_route_storage()
1360 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1361
1362 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1363
1364 if !r.succeeded {
1366 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1367 let remote_table_route = on_update_table_route_failure(&mut set)?
1368 .context(error::UnexpectedSnafu {
1369 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1370 })?
1371 .into_inner();
1372
1373 let op_name = "the updating leader region status";
1374 ensure_values!(remote_table_route, new_table_route_value, op_name);
1375 }
1376
1377 Ok(())
1378 }
1379}
1380
1381#[macro_export]
1382macro_rules! impl_metadata_value {
1383 ($($val_ty: ty), *) => {
1384 $(
1385 impl $crate::key::MetadataValue for $val_ty {
1386 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1387 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1388 }
1389
1390 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1391 serde_json::to_vec(self).context(SerdeJsonSnafu)
1392 }
1393 }
1394 )*
1395 }
1396}
1397
1398macro_rules! impl_metadata_key_get_txn_op {
1399 ($($key: ty), *) => {
1400 $(
1401 impl $crate::key::MetadataKeyGetTxnOp for $key {
1402 fn build_get_op(
1405 &self,
1406 ) -> (
1407 TxnOp,
1408 impl for<'a> FnMut(
1409 &'a mut TxnOpGetResponseSet,
1410 ) -> Option<Vec<u8>>,
1411 ) {
1412 let raw_key = self.to_bytes();
1413 (
1414 TxnOp::Get(raw_key.clone()),
1415 TxnOpGetResponseSet::filter(raw_key),
1416 )
1417 }
1418 }
1419 )*
1420 }
1421}
1422
1423impl_metadata_key_get_txn_op! {
1424 TableNameKey<'_>,
1425 TableInfoKey,
1426 ViewInfoKey,
1427 TableRouteKey,
1428 DatanodeTableKey
1429}
1430
1431#[macro_export]
1432macro_rules! impl_optional_metadata_value {
1433 ($($val_ty: ty), *) => {
1434 $(
1435 impl $val_ty {
1436 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1437 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1438 }
1439
1440 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1441 serde_json::to_vec(self).context(SerdeJsonSnafu)
1442 }
1443 }
1444 )*
1445 }
1446}
1447
1448impl_metadata_value! {
1449 TableNameValue,
1450 TableInfoValue,
1451 ViewInfoValue,
1452 DatanodeTableValue,
1453 FlowInfoValue,
1454 FlowNameValue,
1455 FlowRouteValue,
1456 TableFlowValue,
1457 NodeAddressValue,
1458 SchemaNameValue,
1459 FlowStateValue,
1460 PoisonValue,
1461 TopicRegionValue
1462}
1463
1464impl_optional_metadata_value! {
1465 CatalogNameValue,
1466 SchemaNameValue
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471 use std::collections::{BTreeMap, HashMap, HashSet};
1472 use std::sync::Arc;
1473
1474 use bytes::Bytes;
1475 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1476 use common_time::util::current_time_millis;
1477 use common_wal::options::{KafkaWalOptions, WalOptions};
1478 use futures::TryStreamExt;
1479 use store_api::storage::{RegionId, RegionNumber};
1480 use table::metadata::TableInfo;
1481 use table::table_name::TableName;
1482
1483 use super::datanode_table::DatanodeTableKey;
1484 use super::test_utils;
1485 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1486 use crate::ddl::test_util::create_table::test_create_table_task;
1487 use crate::ddl::utils::region_storage_path;
1488 use crate::error::Result;
1489 use crate::key::datanode_table::RegionInfo;
1490 use crate::key::table_info::TableInfoValue;
1491 use crate::key::table_name::TableNameKey;
1492 use crate::key::table_route::TableRouteValue;
1493 use crate::key::topic_region::TopicRegionKey;
1494 use crate::key::{
1495 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1496 TableMetadataManager, ViewInfoValue,
1497 };
1498 use crate::kv_backend::KvBackend;
1499 use crate::kv_backend::memory::MemoryKvBackend;
1500 use crate::peer::Peer;
1501 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1502 use crate::rpc::store::RangeRequest;
1503 use crate::wal_provider::WalProvider;
1504
1505 #[test]
1506 fn test_deserialized_value_with_bytes() {
1507 let region_route = new_test_region_route();
1508 let region_routes = vec![region_route.clone()];
1509
1510 let expected_region_routes =
1511 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1512 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1513
1514 let value = DeserializedValueWithBytes {
1517 inner: TableRouteValue::physical(region_routes.clone()),
1519 bytes: Bytes::from(expected.clone()),
1520 };
1521
1522 let encoded = serde_json::to_vec(&value).unwrap();
1523
1524 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1527 serde_json::from_slice(&encoded).unwrap();
1528
1529 assert_eq!(decoded.inner, expected_region_routes);
1530 assert_eq!(decoded.bytes, expected);
1531 }
1532
1533 fn new_test_region_route() -> RegionRoute {
1534 new_region_route(1, 2)
1535 }
1536
1537 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1538 RegionRoute {
1539 region: Region {
1540 id: region_id.into(),
1541 name: "r1".to_string(),
1542 partition: None,
1543 attrs: BTreeMap::new(),
1544 partition_expr: Default::default(),
1545 },
1546 leader_peer: Some(Peer::new(datanode, "a2")),
1547 follower_peers: vec![],
1548 leader_state: None,
1549 leader_down_since: None,
1550 }
1551 }
1552
1553 fn new_test_table_info() -> TableInfo {
1554 test_utils::new_test_table_info(10)
1555 }
1556
1557 fn new_test_table_names() -> HashSet<TableName> {
1558 let mut set = HashSet::new();
1559 set.insert(TableName {
1560 catalog_name: "greptime".to_string(),
1561 schema_name: "public".to_string(),
1562 table_name: "a_table".to_string(),
1563 });
1564 set.insert(TableName {
1565 catalog_name: "greptime".to_string(),
1566 schema_name: "public".to_string(),
1567 table_name: "b_table".to_string(),
1568 });
1569 set
1570 }
1571
1572 async fn create_physical_table_metadata(
1573 table_metadata_manager: &TableMetadataManager,
1574 table_info: TableInfo,
1575 region_routes: Vec<RegionRoute>,
1576 region_wal_options: HashMap<RegionNumber, String>,
1577 ) -> Result<()> {
1578 table_metadata_manager
1579 .create_table_metadata(
1580 table_info,
1581 TableRouteValue::physical(region_routes),
1582 region_wal_options,
1583 )
1584 .await
1585 }
1586
1587 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1588 let topics = (0..2)
1589 .map(|i| format!("greptimedb_topic{}", i))
1590 .collect::<Vec<_>>();
1591 let wal_options = topics
1592 .iter()
1593 .map(|topic| {
1594 WalOptions::Kafka(KafkaWalOptions {
1595 topic: topic.clone(),
1596 })
1597 })
1598 .collect::<Vec<_>>();
1599
1600 (0..16)
1601 .enumerate()
1602 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1603 .collect()
1604 }
1605
1606 #[tokio::test]
1607 async fn test_raft_engine_topic_region_map() {
1608 let mem_kv = Arc::new(MemoryKvBackend::default());
1609 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1610 let region_route = new_test_region_route();
1611 let region_routes = &vec![region_route.clone()];
1612 let table_info = new_test_table_info();
1613 let wal_provider = WalProvider::RaftEngine;
1614 let regions: Vec<_> = (0..16).collect();
1615 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1616 create_physical_table_metadata(
1617 &table_metadata_manager,
1618 table_info.clone(),
1619 region_routes.clone(),
1620 region_wal_options.clone(),
1621 )
1622 .await
1623 .unwrap();
1624
1625 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1626 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1627 let resp = mem_kv.range(range_req).await.unwrap();
1628 assert!(resp.kvs.is_empty());
1630 }
1631
1632 #[tokio::test]
1633 async fn test_create_table_metadata() {
1634 let mem_kv = Arc::new(MemoryKvBackend::default());
1635 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1636 let region_route = new_test_region_route();
1637 let region_routes = &vec![region_route.clone()];
1638 let table_info = new_test_table_info();
1639 let region_wal_options = create_mock_region_wal_options()
1640 .into_iter()
1641 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1642 .collect::<HashMap<_, _>>();
1643
1644 create_physical_table_metadata(
1646 &table_metadata_manager,
1647 table_info.clone(),
1648 region_routes.clone(),
1649 region_wal_options.clone(),
1650 )
1651 .await
1652 .unwrap();
1653
1654 assert!(
1656 create_physical_table_metadata(
1657 &table_metadata_manager,
1658 table_info.clone(),
1659 region_routes.clone(),
1660 region_wal_options.clone(),
1661 )
1662 .await
1663 .is_ok()
1664 );
1665
1666 let mut modified_region_routes = region_routes.clone();
1667 modified_region_routes.push(region_route.clone());
1668 assert!(
1670 create_physical_table_metadata(
1671 &table_metadata_manager,
1672 table_info.clone(),
1673 modified_region_routes,
1674 region_wal_options.clone(),
1675 )
1676 .await
1677 .is_err()
1678 );
1679
1680 let (remote_table_info, remote_table_route) = table_metadata_manager
1681 .get_full_table_info(10)
1682 .await
1683 .unwrap();
1684
1685 assert_eq!(
1686 remote_table_info.unwrap().into_inner().table_info,
1687 table_info
1688 );
1689 assert_eq!(
1690 remote_table_route
1691 .unwrap()
1692 .into_inner()
1693 .region_routes()
1694 .unwrap(),
1695 region_routes
1696 );
1697
1698 for i in 0..2 {
1699 let region_number = i as u32;
1700 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1701 let topic = format!("greptimedb_topic{}", i);
1702 let regions = table_metadata_manager
1703 .topic_region_manager
1704 .regions(&topic)
1705 .await
1706 .unwrap()
1707 .into_keys()
1708 .collect::<Vec<_>>();
1709 assert_eq!(regions.len(), 8);
1710 assert!(regions.contains(®ion_id));
1711 }
1712 }
1713
1714 #[tokio::test]
1715 async fn test_create_logic_tables_metadata() {
1716 let mem_kv = Arc::new(MemoryKvBackend::default());
1717 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1718 let region_route = new_test_region_route();
1719 let region_routes = vec![region_route.clone()];
1720 let table_info = new_test_table_info();
1721 let table_id = table_info.ident.table_id;
1722 let table_route_value = TableRouteValue::physical(region_routes.clone());
1723
1724 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1725 table_metadata_manager
1727 .create_logical_tables_metadata(tables_data.clone())
1728 .await
1729 .unwrap();
1730
1731 assert!(
1733 table_metadata_manager
1734 .create_logical_tables_metadata(tables_data)
1735 .await
1736 .is_ok()
1737 );
1738
1739 let mut modified_region_routes = region_routes.clone();
1740 modified_region_routes.push(new_region_route(2, 3));
1741 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1742 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1743 assert!(
1745 table_metadata_manager
1746 .create_logical_tables_metadata(modified_tables_data)
1747 .await
1748 .is_err()
1749 );
1750
1751 let (remote_table_info, remote_table_route) = table_metadata_manager
1752 .get_full_table_info(table_id)
1753 .await
1754 .unwrap();
1755
1756 assert_eq!(
1757 remote_table_info.unwrap().into_inner().table_info,
1758 table_info
1759 );
1760 assert_eq!(
1761 remote_table_route
1762 .unwrap()
1763 .into_inner()
1764 .region_routes()
1765 .unwrap(),
1766 ®ion_routes
1767 );
1768 }
1769
1770 #[tokio::test]
1771 async fn test_create_many_logical_tables_metadata() {
1772 let kv_backend = Arc::new(MemoryKvBackend::default());
1773 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1774
1775 let mut tables_data = vec![];
1776 for i in 0..128 {
1777 let table_id = i + 1;
1778 let regin_number = table_id * 3;
1779 let region_id = RegionId::new(table_id, regin_number);
1780 let region_route = new_region_route(region_id.as_u64(), 2);
1781 let region_routes = vec![region_route.clone()];
1782 let table_info = test_utils::new_test_table_info_with_name(
1783 table_id,
1784 &format!("my_table_{}", table_id),
1785 );
1786 let table_route_value = TableRouteValue::physical(region_routes.clone());
1787
1788 tables_data.push((table_info, table_route_value));
1789 }
1790
1791 table_metadata_manager
1793 .create_logical_tables_metadata(tables_data)
1794 .await
1795 .unwrap();
1796 }
1797
1798 #[tokio::test]
1799 async fn test_delete_table_metadata() {
1800 let mem_kv = Arc::new(MemoryKvBackend::default());
1801 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1802 let region_route = new_test_region_route();
1803 let region_routes = &vec![region_route.clone()];
1804 let table_info = new_test_table_info();
1805 let table_id = table_info.ident.table_id;
1806 let datanode_id = 2;
1807 let region_wal_options = create_mock_region_wal_options();
1808 let serialized_region_wal_options = region_wal_options
1809 .iter()
1810 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1811 .collect::<HashMap<_, _>>();
1812
1813 create_physical_table_metadata(
1815 &table_metadata_manager,
1816 table_info.clone(),
1817 region_routes.clone(),
1818 serialized_region_wal_options,
1819 )
1820 .await
1821 .unwrap();
1822
1823 let table_name = TableName::new(
1824 table_info.catalog_name,
1825 table_info.schema_name,
1826 table_info.name,
1827 );
1828 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1829 table_metadata_manager
1831 .delete_table_metadata(
1832 table_id,
1833 &table_name,
1834 table_route_value,
1835 ®ion_wal_options,
1836 )
1837 .await
1838 .unwrap();
1839 table_metadata_manager
1841 .delete_table_metadata(
1842 table_id,
1843 &table_name,
1844 table_route_value,
1845 ®ion_wal_options,
1846 )
1847 .await
1848 .unwrap();
1849 assert!(
1850 table_metadata_manager
1851 .table_info_manager()
1852 .get(table_id)
1853 .await
1854 .unwrap()
1855 .is_none()
1856 );
1857 assert!(
1858 table_metadata_manager
1859 .table_route_manager()
1860 .table_route_storage()
1861 .get(table_id)
1862 .await
1863 .unwrap()
1864 .is_none()
1865 );
1866 assert!(
1867 table_metadata_manager
1868 .datanode_table_manager()
1869 .tables(datanode_id)
1870 .try_collect::<Vec<_>>()
1871 .await
1872 .unwrap()
1873 .is_empty()
1874 );
1875 let table_info = table_metadata_manager
1877 .table_info_manager()
1878 .get(table_id)
1879 .await
1880 .unwrap();
1881 assert!(table_info.is_none());
1882 let table_route = table_metadata_manager
1883 .table_route_manager()
1884 .table_route_storage()
1885 .get(table_id)
1886 .await
1887 .unwrap();
1888 assert!(table_route.is_none());
1889 let regions = table_metadata_manager
1891 .topic_region_manager
1892 .regions("greptimedb_topic0")
1893 .await
1894 .unwrap();
1895 assert_eq!(regions.len(), 0);
1896 let regions = table_metadata_manager
1897 .topic_region_manager
1898 .regions("greptimedb_topic1")
1899 .await
1900 .unwrap();
1901 assert_eq!(regions.len(), 0);
1902 }
1903
1904 #[tokio::test]
1905 async fn test_rename_table() {
1906 let mem_kv = Arc::new(MemoryKvBackend::default());
1907 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1908 let region_route = new_test_region_route();
1909 let region_routes = vec![region_route.clone()];
1910 let table_info = new_test_table_info();
1911 let table_id = table_info.ident.table_id;
1912 create_physical_table_metadata(
1914 &table_metadata_manager,
1915 table_info.clone(),
1916 region_routes.clone(),
1917 HashMap::new(),
1918 )
1919 .await
1920 .unwrap();
1921
1922 let new_table_name = "another_name".to_string();
1923 let table_info_value =
1924 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1925
1926 table_metadata_manager
1927 .rename_table(&table_info_value, new_table_name.clone())
1928 .await
1929 .unwrap();
1930 table_metadata_manager
1932 .rename_table(&table_info_value, new_table_name.clone())
1933 .await
1934 .unwrap();
1935 let mut modified_table_info = table_info.clone();
1936 modified_table_info.name = "hi".to_string();
1937 let modified_table_info_value =
1938 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1939 assert!(
1942 table_metadata_manager
1943 .rename_table(&modified_table_info_value, new_table_name.clone())
1944 .await
1945 .is_err()
1946 );
1947
1948 let old_table_name = TableNameKey::new(
1949 &table_info.catalog_name,
1950 &table_info.schema_name,
1951 &table_info.name,
1952 );
1953 let new_table_name = TableNameKey::new(
1954 &table_info.catalog_name,
1955 &table_info.schema_name,
1956 &new_table_name,
1957 );
1958
1959 assert!(
1960 table_metadata_manager
1961 .table_name_manager()
1962 .get(old_table_name)
1963 .await
1964 .unwrap()
1965 .is_none()
1966 );
1967
1968 assert_eq!(
1969 table_metadata_manager
1970 .table_name_manager()
1971 .get(new_table_name)
1972 .await
1973 .unwrap()
1974 .unwrap()
1975 .table_id(),
1976 table_id
1977 );
1978 }
1979
1980 #[tokio::test]
1981 async fn test_update_table_info() {
1982 let mem_kv = Arc::new(MemoryKvBackend::default());
1983 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1984 let region_route = new_test_region_route();
1985 let region_routes = vec![region_route.clone()];
1986 let table_info = new_test_table_info();
1987 let table_id = table_info.ident.table_id;
1988 create_physical_table_metadata(
1990 &table_metadata_manager,
1991 table_info.clone(),
1992 region_routes.clone(),
1993 HashMap::new(),
1994 )
1995 .await
1996 .unwrap();
1997
1998 let mut new_table_info = table_info.clone();
1999 new_table_info.name = "hi".to_string();
2000 let current_table_info_value =
2001 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2002 table_metadata_manager
2004 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2005 .await
2006 .unwrap();
2007 table_metadata_manager
2009 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2010 .await
2011 .unwrap();
2012
2013 let updated_table_info = table_metadata_manager
2015 .table_info_manager()
2016 .get(table_id)
2017 .await
2018 .unwrap()
2019 .unwrap()
2020 .into_inner();
2021 assert_eq!(updated_table_info.table_info, new_table_info);
2022
2023 let mut wrong_table_info = table_info.clone();
2024 wrong_table_info.name = "wrong".to_string();
2025 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2026 current_table_info_value.update(wrong_table_info),
2027 );
2028 assert!(
2031 table_metadata_manager
2032 .update_table_info(&wrong_table_info_value, None, new_table_info)
2033 .await
2034 .is_err()
2035 )
2036 }
2037
2038 #[tokio::test]
2039 async fn test_update_table_leader_region_status() {
2040 let mem_kv = Arc::new(MemoryKvBackend::default());
2041 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2042 let datanode = 1;
2043 let region_routes = vec![
2044 RegionRoute {
2045 region: Region {
2046 id: 1.into(),
2047 name: "r1".to_string(),
2048 partition: None,
2049 attrs: BTreeMap::new(),
2050 partition_expr: Default::default(),
2051 },
2052 leader_peer: Some(Peer::new(datanode, "a2")),
2053 leader_state: Some(LeaderState::Downgrading),
2054 follower_peers: vec![],
2055 leader_down_since: Some(current_time_millis()),
2056 },
2057 RegionRoute {
2058 region: Region {
2059 id: 2.into(),
2060 name: "r2".to_string(),
2061 partition: None,
2062 attrs: BTreeMap::new(),
2063 partition_expr: Default::default(),
2064 },
2065 leader_peer: Some(Peer::new(datanode, "a1")),
2066 leader_state: None,
2067 follower_peers: vec![],
2068 leader_down_since: None,
2069 },
2070 ];
2071 let table_info = new_test_table_info();
2072 let table_id = table_info.ident.table_id;
2073 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2074 TableRouteValue::physical(region_routes.clone()),
2075 );
2076
2077 create_physical_table_metadata(
2079 &table_metadata_manager,
2080 table_info.clone(),
2081 region_routes.clone(),
2082 HashMap::new(),
2083 )
2084 .await
2085 .unwrap();
2086
2087 table_metadata_manager
2088 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2089 if region_route.leader_state.is_some() {
2090 None
2091 } else {
2092 Some(Some(LeaderState::Downgrading))
2093 }
2094 })
2095 .await
2096 .unwrap();
2097
2098 let updated_route_value = table_metadata_manager
2099 .table_route_manager()
2100 .table_route_storage()
2101 .get(table_id)
2102 .await
2103 .unwrap()
2104 .unwrap();
2105
2106 assert_eq!(
2107 updated_route_value.region_routes().unwrap()[0].leader_state,
2108 Some(LeaderState::Downgrading)
2109 );
2110
2111 assert!(
2112 updated_route_value.region_routes().unwrap()[0]
2113 .leader_down_since
2114 .is_some()
2115 );
2116
2117 assert_eq!(
2118 updated_route_value.region_routes().unwrap()[1].leader_state,
2119 Some(LeaderState::Downgrading)
2120 );
2121 assert!(
2122 updated_route_value.region_routes().unwrap()[1]
2123 .leader_down_since
2124 .is_some()
2125 );
2126 }
2127
2128 async fn assert_datanode_table(
2129 table_metadata_manager: &TableMetadataManager,
2130 table_id: u32,
2131 region_routes: &[RegionRoute],
2132 ) {
2133 let region_distribution = region_distribution(region_routes);
2134 for (datanode, regions) in region_distribution {
2135 let got = table_metadata_manager
2136 .datanode_table_manager()
2137 .get(&DatanodeTableKey::new(datanode, table_id))
2138 .await
2139 .unwrap()
2140 .unwrap();
2141
2142 assert_eq!(got.regions, regions.leader_regions);
2143 assert_eq!(got.follower_regions, regions.follower_regions);
2144 }
2145 }
2146
2147 #[tokio::test]
2148 async fn test_update_table_route() {
2149 let mem_kv = Arc::new(MemoryKvBackend::default());
2150 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2151 let region_route = new_test_region_route();
2152 let region_routes = vec![region_route.clone()];
2153 let table_info = new_test_table_info();
2154 let table_id = table_info.ident.table_id;
2155 let engine = table_info.meta.engine.as_str();
2156 let region_storage_path =
2157 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2158 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2159 TableRouteValue::physical(region_routes.clone()),
2160 );
2161
2162 create_physical_table_metadata(
2164 &table_metadata_manager,
2165 table_info.clone(),
2166 region_routes.clone(),
2167 HashMap::new(),
2168 )
2169 .await
2170 .unwrap();
2171
2172 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2173 let new_region_routes = vec![
2174 new_region_route(1, 1),
2175 new_region_route(2, 2),
2176 new_region_route(3, 3),
2177 ];
2178 table_metadata_manager
2180 .update_table_route(
2181 table_id,
2182 RegionInfo {
2183 engine: engine.to_string(),
2184 region_storage_path: region_storage_path.clone(),
2185 region_options: HashMap::new(),
2186 region_wal_options: HashMap::new(),
2187 },
2188 ¤t_table_route_value,
2189 new_region_routes.clone(),
2190 &HashMap::new(),
2191 &HashMap::new(),
2192 )
2193 .await
2194 .unwrap();
2195 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2196
2197 table_metadata_manager
2199 .update_table_route(
2200 table_id,
2201 RegionInfo {
2202 engine: engine.to_string(),
2203 region_storage_path: region_storage_path.clone(),
2204 region_options: HashMap::new(),
2205 region_wal_options: HashMap::new(),
2206 },
2207 ¤t_table_route_value,
2208 new_region_routes.clone(),
2209 &HashMap::new(),
2210 &HashMap::new(),
2211 )
2212 .await
2213 .unwrap();
2214
2215 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2216 current_table_route_value
2217 .inner
2218 .update(new_region_routes.clone())
2219 .unwrap(),
2220 );
2221 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2222 table_metadata_manager
2224 .update_table_route(
2225 table_id,
2226 RegionInfo {
2227 engine: engine.to_string(),
2228 region_storage_path: region_storage_path.clone(),
2229 region_options: HashMap::new(),
2230 region_wal_options: HashMap::new(),
2231 },
2232 ¤t_table_route_value,
2233 new_region_routes.clone(),
2234 &HashMap::new(),
2235 &HashMap::new(),
2236 )
2237 .await
2238 .unwrap();
2239 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2240
2241 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2244 current_table_route_value
2245 .update(vec![
2246 new_region_route(1, 1),
2247 new_region_route(2, 2),
2248 new_region_route(3, 3),
2249 new_region_route(4, 4),
2250 ])
2251 .unwrap(),
2252 );
2253 assert!(
2254 table_metadata_manager
2255 .update_table_route(
2256 table_id,
2257 RegionInfo {
2258 engine: engine.to_string(),
2259 region_storage_path: region_storage_path.clone(),
2260 region_options: HashMap::new(),
2261 region_wal_options: HashMap::new(),
2262 },
2263 &wrong_table_route_value,
2264 new_region_routes,
2265 &HashMap::new(),
2266 &HashMap::new(),
2267 )
2268 .await
2269 .is_err()
2270 );
2271 }
2272
2273 #[tokio::test]
2274 async fn test_update_table_route_with_topic_region_mapping() {
2275 let mem_kv = Arc::new(MemoryKvBackend::default());
2276 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2277 let region_route = new_test_region_route();
2278 let region_routes = vec![region_route.clone()];
2279 let table_info = new_test_table_info();
2280 let table_id = table_info.ident.table_id;
2281 let engine = table_info.meta.engine.as_str();
2282 let region_storage_path =
2283 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2284
2285 let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2287 (
2288 1,
2289 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2290 topic: "topic_1".to_string(),
2291 }))
2292 .unwrap(),
2293 ),
2294 (
2295 2,
2296 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2297 topic: "topic_2".to_string(),
2298 }))
2299 .unwrap(),
2300 ),
2301 ]
2302 .into_iter()
2303 .collect();
2304
2305 create_physical_table_metadata(
2306 &table_metadata_manager,
2307 table_info.clone(),
2308 region_routes.clone(),
2309 old_region_wal_options.clone(),
2310 )
2311 .await
2312 .unwrap();
2313
2314 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2315 TableRouteValue::physical(region_routes.clone()),
2316 );
2317
2318 let region_id_1 = RegionId::new(table_id, 1);
2320 let region_id_2 = RegionId::new(table_id, 2);
2321 let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2322 let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2323 assert!(
2324 table_metadata_manager
2325 .topic_region_manager
2326 .get(topic_1_key.clone())
2327 .await
2328 .unwrap()
2329 .is_some()
2330 );
2331 assert!(
2332 table_metadata_manager
2333 .topic_region_manager
2334 .get(topic_2_key.clone())
2335 .await
2336 .unwrap()
2337 .is_some()
2338 );
2339
2340 let new_region_routes = vec![
2342 new_region_route(1, 1),
2343 new_region_route(2, 2),
2344 new_region_route(3, 3), ];
2346 let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2347 (
2348 1,
2349 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2350 topic: "topic_1".to_string(), }))
2352 .unwrap(),
2353 ),
2354 (
2355 2,
2356 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2357 topic: "topic_2".to_string(), }))
2359 .unwrap(),
2360 ),
2361 (
2362 3,
2363 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2364 topic: "topic_3".to_string(), }))
2366 .unwrap(),
2367 ),
2368 ]
2369 .into_iter()
2370 .collect();
2371 let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2372 current_table_route_value
2373 .inner
2374 .update(new_region_routes.clone())
2375 .unwrap(),
2376 );
2377 table_metadata_manager
2378 .update_table_route(
2379 table_id,
2380 RegionInfo {
2381 engine: engine.to_string(),
2382 region_storage_path: region_storage_path.clone(),
2383 region_options: HashMap::new(),
2384 region_wal_options: old_region_wal_options.clone(),
2385 },
2386 ¤t_table_route_value,
2387 new_region_routes.clone(),
2388 &HashMap::new(),
2389 &new_region_wal_options,
2390 )
2391 .await
2392 .unwrap();
2393 let region_id_3 = RegionId::new(table_id, 3);
2395 let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2396 assert!(
2397 table_metadata_manager
2398 .topic_region_manager
2399 .get(topic_3_key)
2400 .await
2401 .unwrap()
2402 .is_some()
2403 );
2404 let newer_region_routes = vec![
2406 new_region_route(1, 1),
2407 ];
2410 let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2411 (
2412 1,
2413 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2414 topic: "topic_1".to_string(), }))
2416 .unwrap(),
2417 ),
2418 (
2419 3,
2420 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2421 topic: "topic_3_new".to_string(), }))
2423 .unwrap(),
2424 ),
2425 ]
2426 .into_iter()
2427 .collect();
2428 table_metadata_manager
2429 .update_table_route(
2430 table_id,
2431 RegionInfo {
2432 engine: engine.to_string(),
2433 region_storage_path: region_storage_path.clone(),
2434 region_options: HashMap::new(),
2435 region_wal_options: new_region_wal_options.clone(),
2436 },
2437 ¤t_table_route_value_updated,
2438 newer_region_routes.clone(),
2439 &HashMap::new(),
2440 &newer_region_wal_options,
2441 )
2442 .await
2443 .unwrap();
2444 let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2446 assert!(
2447 table_metadata_manager
2448 .topic_region_manager
2449 .get(topic_2_key_new)
2450 .await
2451 .unwrap()
2452 .is_none()
2453 );
2454 let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2456 assert!(
2457 table_metadata_manager
2458 .topic_region_manager
2459 .get(topic_3_key_old)
2460 .await
2461 .unwrap()
2462 .is_none()
2463 );
2464 let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2466 assert!(
2467 table_metadata_manager
2468 .topic_region_manager
2469 .get(topic_3_key_new)
2470 .await
2471 .unwrap()
2472 .is_some()
2473 );
2474 assert!(
2476 table_metadata_manager
2477 .topic_region_manager
2478 .get(topic_1_key)
2479 .await
2480 .unwrap()
2481 .is_some()
2482 );
2483 }
2484
2485 #[tokio::test]
2486 async fn test_destroy_table_metadata() {
2487 let mem_kv = Arc::new(MemoryKvBackend::default());
2488 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2489 let table_id = 1025;
2490 let table_name = "foo";
2491 let task = test_create_table_task(table_name, table_id);
2492 let options = create_mock_region_wal_options();
2493 let serialized_options = options
2494 .iter()
2495 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2496 .collect::<HashMap<_, _>>();
2497 table_metadata_manager
2498 .create_table_metadata(
2499 task.table_info,
2500 TableRouteValue::physical(vec![
2501 RegionRoute {
2502 region: Region::new_test(RegionId::new(table_id, 1)),
2503 leader_peer: Some(Peer::empty(1)),
2504 follower_peers: vec![Peer::empty(5)],
2505 leader_state: None,
2506 leader_down_since: None,
2507 },
2508 RegionRoute {
2509 region: Region::new_test(RegionId::new(table_id, 2)),
2510 leader_peer: Some(Peer::empty(2)),
2511 follower_peers: vec![Peer::empty(4)],
2512 leader_state: None,
2513 leader_down_since: None,
2514 },
2515 RegionRoute {
2516 region: Region::new_test(RegionId::new(table_id, 3)),
2517 leader_peer: Some(Peer::empty(3)),
2518 follower_peers: vec![],
2519 leader_state: None,
2520 leader_down_since: None,
2521 },
2522 ]),
2523 serialized_options,
2524 )
2525 .await
2526 .unwrap();
2527 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2528 let table_route_value = table_metadata_manager
2529 .table_route_manager
2530 .table_route_storage()
2531 .get_with_raw_bytes(table_id)
2532 .await
2533 .unwrap()
2534 .unwrap();
2535 table_metadata_manager
2536 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2537 .await
2538 .unwrap();
2539 assert!(mem_kv.is_empty());
2540 }
2541
2542 #[tokio::test]
2543 async fn test_restore_table_metadata() {
2544 let mem_kv = Arc::new(MemoryKvBackend::default());
2545 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2546 let table_id = 1025;
2547 let table_name = "foo";
2548 let task = test_create_table_task(table_name, table_id);
2549 let options = create_mock_region_wal_options();
2550 let serialized_options = options
2551 .iter()
2552 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2553 .collect::<HashMap<_, _>>();
2554 table_metadata_manager
2555 .create_table_metadata(
2556 task.table_info,
2557 TableRouteValue::physical(vec![
2558 RegionRoute {
2559 region: Region::new_test(RegionId::new(table_id, 1)),
2560 leader_peer: Some(Peer::empty(1)),
2561 follower_peers: vec![Peer::empty(5)],
2562 leader_state: None,
2563 leader_down_since: None,
2564 },
2565 RegionRoute {
2566 region: Region::new_test(RegionId::new(table_id, 2)),
2567 leader_peer: Some(Peer::empty(2)),
2568 follower_peers: vec![Peer::empty(4)],
2569 leader_state: None,
2570 leader_down_since: None,
2571 },
2572 RegionRoute {
2573 region: Region::new_test(RegionId::new(table_id, 3)),
2574 leader_peer: Some(Peer::empty(3)),
2575 follower_peers: vec![],
2576 leader_state: None,
2577 leader_down_since: None,
2578 },
2579 ]),
2580 serialized_options,
2581 )
2582 .await
2583 .unwrap();
2584 let expected_result = mem_kv.dump();
2585 let table_route_value = table_metadata_manager
2586 .table_route_manager
2587 .table_route_storage()
2588 .get_with_raw_bytes(table_id)
2589 .await
2590 .unwrap()
2591 .unwrap();
2592 let region_routes = table_route_value.region_routes().unwrap();
2593 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2594 let table_route_value = TableRouteValue::physical(region_routes.clone());
2595 table_metadata_manager
2596 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2597 .await
2598 .unwrap();
2599 table_metadata_manager
2600 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2601 .await
2602 .unwrap();
2603 let kvs = mem_kv.dump();
2604 assert_eq!(kvs, expected_result);
2605 table_metadata_manager
2607 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2608 .await
2609 .unwrap();
2610 let kvs = mem_kv.dump();
2611 assert_eq!(kvs, expected_result);
2612 }
2613
2614 #[tokio::test]
2615 async fn test_create_update_view_info() {
2616 let mem_kv = Arc::new(MemoryKvBackend::default());
2617 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2618
2619 let view_info = new_test_table_info();
2620
2621 let view_id = view_info.ident.table_id;
2622
2623 let logical_plan: Vec<u8> = vec![1, 2, 3];
2624 let columns = vec!["a".to_string()];
2625 let plan_columns = vec!["number".to_string()];
2626 let table_names = new_test_table_names();
2627 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2628
2629 table_metadata_manager
2631 .create_view_metadata(
2632 view_info.clone(),
2633 logical_plan.clone(),
2634 table_names.clone(),
2635 columns.clone(),
2636 plan_columns.clone(),
2637 definition.to_string(),
2638 )
2639 .await
2640 .unwrap();
2641
2642 {
2643 let current_view_info = table_metadata_manager
2645 .view_info_manager()
2646 .get(view_id)
2647 .await
2648 .unwrap()
2649 .unwrap()
2650 .into_inner();
2651 assert_eq!(current_view_info.view_info, logical_plan);
2652 assert_eq!(current_view_info.table_names, table_names);
2653 assert_eq!(current_view_info.definition, definition);
2654 assert_eq!(current_view_info.columns, columns);
2655 assert_eq!(current_view_info.plan_columns, plan_columns);
2656 let current_table_info = table_metadata_manager
2658 .table_info_manager()
2659 .get(view_id)
2660 .await
2661 .unwrap()
2662 .unwrap()
2663 .into_inner();
2664 assert_eq!(current_table_info.table_info, view_info);
2665 }
2666
2667 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2668 let new_table_names = {
2669 let mut set = HashSet::new();
2670 set.insert(TableName {
2671 catalog_name: "greptime".to_string(),
2672 schema_name: "public".to_string(),
2673 table_name: "b_table".to_string(),
2674 });
2675 set.insert(TableName {
2676 catalog_name: "greptime".to_string(),
2677 schema_name: "public".to_string(),
2678 table_name: "c_table".to_string(),
2679 });
2680 set
2681 };
2682 let new_columns = vec!["b".to_string()];
2683 let new_plan_columns = vec!["number2".to_string()];
2684 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2685
2686 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2687 logical_plan.clone(),
2688 table_names,
2689 columns,
2690 plan_columns,
2691 definition.to_string(),
2692 ));
2693 table_metadata_manager
2695 .update_view_info(
2696 view_id,
2697 ¤t_view_info_value,
2698 new_logical_plan.clone(),
2699 new_table_names.clone(),
2700 new_columns.clone(),
2701 new_plan_columns.clone(),
2702 new_definition.to_string(),
2703 )
2704 .await
2705 .unwrap();
2706 table_metadata_manager
2708 .update_view_info(
2709 view_id,
2710 ¤t_view_info_value,
2711 new_logical_plan.clone(),
2712 new_table_names.clone(),
2713 new_columns.clone(),
2714 new_plan_columns.clone(),
2715 new_definition.to_string(),
2716 )
2717 .await
2718 .unwrap();
2719
2720 let updated_view_info = table_metadata_manager
2722 .view_info_manager()
2723 .get(view_id)
2724 .await
2725 .unwrap()
2726 .unwrap()
2727 .into_inner();
2728 assert_eq!(updated_view_info.view_info, new_logical_plan);
2729 assert_eq!(updated_view_info.table_names, new_table_names);
2730 assert_eq!(updated_view_info.definition, new_definition);
2731 assert_eq!(updated_view_info.columns, new_columns);
2732 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2733
2734 let wrong_view_info = logical_plan.clone();
2735 let wrong_definition = "wrong_definition";
2736 let wrong_view_info_value =
2737 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2738 wrong_view_info,
2739 new_table_names.clone(),
2740 new_columns.clone(),
2741 new_plan_columns.clone(),
2742 wrong_definition.to_string(),
2743 ));
2744 assert!(
2747 table_metadata_manager
2748 .update_view_info(
2749 view_id,
2750 &wrong_view_info_value,
2751 new_logical_plan.clone(),
2752 new_table_names.clone(),
2753 vec!["c".to_string()],
2754 vec!["number3".to_string()],
2755 wrong_definition.to_string(),
2756 )
2757 .await
2758 .is_err()
2759 );
2760
2761 let current_view_info = table_metadata_manager
2763 .view_info_manager()
2764 .get(view_id)
2765 .await
2766 .unwrap()
2767 .unwrap()
2768 .into_inner();
2769 assert_eq!(current_view_info.view_info, new_logical_plan);
2770 assert_eq!(current_view_info.table_names, new_table_names);
2771 assert_eq!(current_view_info.definition, new_definition);
2772 assert_eq!(current_view_info.columns, new_columns);
2773 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2774 }
2775
2776 #[test]
2777 fn test_region_role_set_deserialize() {
2778 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2779 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2780 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2781 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2782
2783 let s = r#"[1, 2, 3]"#;
2784 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2785 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2786 assert!(region_role_set.follower_regions.is_empty());
2787 }
2788
2789 #[test]
2790 fn test_region_distribution_deserialize() {
2791 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2792 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2793 assert_eq!(region_distribution.len(), 2);
2794 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2795 assert!(region_distribution[&1].follower_regions.is_empty());
2796 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2797 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2798 }
2799}