1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{RawTableInfo, TableId};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190pub const ELECTION_KEY: &str = "__metasrv_election";
192pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197 TABLE_NAME_KEY_PREFIX,
198 CATALOG_NAME_KEY_PREFIX,
199 SCHEMA_NAME_KEY_PREFIX,
200 TABLE_ROUTE_PREFIX,
201 NODE_ADDRESS_PREFIX,
202];
203
204#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207 pub leader_regions: Vec<RegionNumber>,
209 pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215 where
216 D: serde::Deserializer<'de>,
217 {
218 #[derive(Deserialize)]
219 #[serde(untagged)]
220 enum RegionRoleSetOrLeaderOnly {
221 Full {
222 leader_regions: Vec<RegionNumber>,
223 follower_regions: Vec<RegionNumber>,
224 },
225 LeaderOnly(Vec<RegionNumber>),
226 }
227 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228 RegionRoleSetOrLeaderOnly::Full {
229 leader_regions,
230 follower_regions,
231 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233 Ok(RegionRoleSet::new(leader_regions, vec![]))
234 }
235 }
236 }
237}
238
239impl RegionRoleSet {
240 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242 Self {
243 leader_regions,
244 follower_regions,
245 }
246 }
247
248 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250 self.leader_regions.push(region_number);
251 }
252
253 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255 self.follower_regions.push(region_number);
256 }
257
258 pub fn sort(&mut self) {
260 self.follower_regions.sort();
261 self.leader_regions.sort();
262 }
263}
264
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270pub type FlowId = u32;
272pub type FlowPartitionId = u32;
274
275lazy_static! {
276 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280 static ref TABLE_INFO_KEY_PATTERN: Regex =
281 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285 static ref VIEW_INFO_KEY_PATTERN: Regex =
286 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315 ))
316 .unwrap();
317}
318
319lazy_static! {
320 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323 ))
324 .unwrap();
325}
326
327lazy_static! {
328 static ref NODE_ADDRESS_PATTERN: Regex =
329 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340 ))
341 .unwrap();
342}
343
344pub trait MetadataKey<'a, T> {
346 fn to_bytes(&self) -> Vec<u8>;
347
348 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355 fn from(value: Vec<u8>) -> Self {
356 Self(value)
357 }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361 fn to_bytes(&self) -> Vec<u8> {
362 self.0.clone()
363 }
364
365 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366 Ok(BytesAdapter(bytes.to_vec()))
367 }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371 fn build_get_op(
372 &self,
373 ) -> (
374 TxnOp,
375 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376 );
377}
378
379pub trait MetadataValue {
380 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381 where
382 Self: Sized;
383
384 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390 table_name_manager: TableNameManager,
391 table_info_manager: TableInfoManager,
392 view_info_manager: ViewInfoManager,
393 datanode_table_manager: DatanodeTableManager,
394 catalog_manager: CatalogManager,
395 schema_manager: SchemaManager,
396 table_route_manager: TableRouteManager,
397 table_repart_manager: TableRepartManager,
398 tombstone_manager: TombstoneManager,
399 topic_name_manager: TopicNameManager,
400 topic_region_manager: TopicRegionManager,
401 kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406 ($got:expr, $expected_value:expr, $name:expr) => {
407 ensure!(
408 $got == $expected_value,
409 error::UnexpectedSnafu {
410 err_msg: format!(
411 "Reads the different value: {:?} during {}, expected: {:?}",
412 $got, $name, $expected_value
413 )
414 }
415 );
416 };
417}
418
419pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429 bytes: Bytes,
431 inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436 type Target = T;
437
438 fn deref(&self) -> &Self::Target {
439 &self.inner
440 }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444 fn deref_mut(&mut self) -> &mut Self::Target {
445 &mut self.inner
446 }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(
452 f,
453 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454 self.inner, self.bytes
455 )
456 }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464 where
465 S: serde::Serializer,
466 {
467 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470 }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474 for DeserializedValueWithBytes<T>
475{
476 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480 where
481 D: serde::Deserializer<'de>,
482 {
483 let buf = String::deserialize(deserializer)?;
484 let bytes = Bytes::from(buf);
485
486 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489 Ok(value)
490 }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494 fn clone(&self) -> Self {
495 Self {
496 bytes: self.bytes.clone(),
497 inner: self.inner.clone(),
498 }
499 }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506 let inner = T::try_from_raw_value(&bytes)?;
507 Ok(Self { bytes, inner })
508 }
509
510 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514 }
515
516 pub fn into_inner(self) -> T {
517 self.inner
518 }
519
520 pub fn get_inner_ref(&self) -> &T {
521 &self.inner
522 }
523
524 pub fn get_raw_bytes(&self) -> Vec<u8> {
526 self.bytes.to_vec()
527 }
528
529 #[cfg(any(test, feature = "testing"))]
530 pub fn from_inner(inner: T) -> Self {
531 let bytes = serde_json::to_vec(&inner).unwrap();
532
533 Self {
534 bytes: Bytes::from(bytes),
535 inner,
536 }
537 }
538}
539
540impl TableMetadataManager {
541 pub fn new(kv_backend: KvBackendRef) -> Self {
542 TableMetadataManager {
543 table_name_manager: TableNameManager::new(kv_backend.clone()),
544 table_info_manager: TableInfoManager::new(kv_backend.clone()),
545 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547 catalog_manager: CatalogManager::new(kv_backend.clone()),
548 schema_manager: SchemaManager::new(kv_backend.clone()),
549 table_route_manager: TableRouteManager::new(kv_backend.clone()),
550 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554 kv_backend,
555 }
556 }
557
558 pub fn new_with_custom_tombstone_prefix(
560 kv_backend: KvBackendRef,
561 tombstone_prefix: &str,
562 ) -> Self {
563 Self {
564 table_name_manager: TableNameManager::new(kv_backend.clone()),
565 table_info_manager: TableInfoManager::new(kv_backend.clone()),
566 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568 catalog_manager: CatalogManager::new(kv_backend.clone()),
569 schema_manager: SchemaManager::new(kv_backend.clone()),
570 table_route_manager: TableRouteManager::new(kv_backend.clone()),
571 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572 tombstone_manager: TombstoneManager::new_with_prefix(
573 kv_backend.clone(),
574 tombstone_prefix,
575 ),
576 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578 kv_backend,
579 }
580 }
581
582 pub async fn init(&self) -> Result<()> {
583 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585 self.catalog_manager().create(catalog_name, true).await?;
586
587 let internal_schemas = [
588 DEFAULT_SCHEMA_NAME,
589 INFORMATION_SCHEMA_NAME,
590 DEFAULT_PRIVATE_SCHEMA_NAME,
591 ];
592
593 for schema_name in internal_schemas {
594 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596 self.schema_manager().create(schema_key, None, true).await?;
597 }
598
599 Ok(())
600 }
601
602 pub fn table_name_manager(&self) -> &TableNameManager {
603 &self.table_name_manager
604 }
605
606 pub fn table_info_manager(&self) -> &TableInfoManager {
607 &self.table_info_manager
608 }
609
610 pub fn view_info_manager(&self) -> &ViewInfoManager {
611 &self.view_info_manager
612 }
613
614 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615 &self.datanode_table_manager
616 }
617
618 pub fn catalog_manager(&self) -> &CatalogManager {
619 &self.catalog_manager
620 }
621
622 pub fn schema_manager(&self) -> &SchemaManager {
623 &self.schema_manager
624 }
625
626 pub fn table_route_manager(&self) -> &TableRouteManager {
627 &self.table_route_manager
628 }
629
630 pub fn table_repart_manager(&self) -> &TableRepartManager {
631 &self.table_repart_manager
632 }
633
634 pub fn topic_name_manager(&self) -> &TopicNameManager {
635 &self.topic_name_manager
636 }
637
638 pub fn topic_region_manager(&self) -> &TopicRegionManager {
639 &self.topic_region_manager
640 }
641
642 pub fn kv_backend(&self) -> &KvBackendRef {
643 &self.kv_backend
644 }
645
646 pub async fn get_full_table_info(
647 &self,
648 table_id: TableId,
649 ) -> Result<(
650 Option<DeserializedValueWithBytes<TableInfoValue>>,
651 Option<DeserializedValueWithBytes<TableRouteValue>>,
652 )> {
653 let table_info_key = TableInfoKey::new(table_id);
654 let table_route_key = TableRouteKey::new(table_id);
655 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659 let mut res = self.kv_backend.txn(txn).await?;
660 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663 Ok((table_info_value, table_route_value))
664 }
665
666 pub async fn create_view_metadata(
676 &self,
677 view_info: RawTableInfo,
678 raw_logical_plan: Vec<u8>,
679 table_names: HashSet<TableName>,
680 columns: Vec<String>,
681 plan_columns: Vec<String>,
682 definition: String,
683 ) -> Result<()> {
684 let view_id = view_info.ident.table_id;
685
686 let view_name = TableNameKey::new(
688 &view_info.catalog_name,
689 &view_info.schema_name,
690 &view_info.name,
691 );
692 let create_table_name_txn = self
693 .table_name_manager()
694 .build_create_txn(&view_name, view_id)?;
695
696 let table_info_value = TableInfoValue::new(view_info);
698
699 let (create_table_info_txn, on_create_table_info_failure) = self
700 .table_info_manager()
701 .build_create_txn(view_id, &table_info_value)?;
702
703 let view_info_value = ViewInfoValue::new(
705 raw_logical_plan,
706 table_names,
707 columns,
708 plan_columns,
709 definition,
710 );
711 let (create_view_info_txn, on_create_view_info_failure) = self
712 .view_info_manager()
713 .build_create_txn(view_id, &view_info_value)?;
714
715 let txn = Txn::merge_all(vec![
716 create_table_name_txn,
717 create_table_info_txn,
718 create_view_info_txn,
719 ]);
720
721 let mut r = self.kv_backend.txn(txn).await?;
722
723 if !r.succeeded {
725 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
726 let remote_table_info = on_create_table_info_failure(&mut set)?
727 .context(error::UnexpectedSnafu {
728 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
729 })?
730 .into_inner();
731
732 let remote_view_info = on_create_view_info_failure(&mut set)?
733 .context(error::UnexpectedSnafu {
734 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
735 })?
736 .into_inner();
737
738 let op_name = "the creating view metadata";
739 ensure_values!(remote_table_info, table_info_value, op_name);
740 ensure_values!(remote_view_info, view_info_value, op_name);
741 }
742
743 Ok(())
744 }
745
746 pub async fn create_table_metadata(
749 &self,
750 table_info: RawTableInfo,
751 table_route_value: TableRouteValue,
752 region_wal_options: HashMap<RegionNumber, String>,
753 ) -> Result<()> {
754 let table_id = table_info.ident.table_id;
755 let engine = table_info.meta.engine.clone();
756
757 let table_name = TableNameKey::new(
759 &table_info.catalog_name,
760 &table_info.schema_name,
761 &table_info.name,
762 );
763 let create_table_name_txn = self
764 .table_name_manager()
765 .build_create_txn(&table_name, table_id)?;
766
767 let region_options = table_info.to_region_options();
768 let table_info_value = TableInfoValue::new(table_info);
770 let (create_table_info_txn, on_create_table_info_failure) = self
771 .table_info_manager()
772 .build_create_txn(table_id, &table_info_value)?;
773
774 let (create_table_route_txn, on_create_table_route_failure) = self
775 .table_route_manager()
776 .table_route_storage()
777 .build_create_txn(table_id, &table_route_value)?;
778
779 let create_topic_region_txn = self
780 .topic_region_manager
781 .build_create_txn(table_id, ®ion_wal_options)?;
782
783 let mut txn = Txn::merge_all(vec![
784 create_table_name_txn,
785 create_table_info_txn,
786 create_table_route_txn,
787 create_topic_region_txn,
788 ]);
789
790 if let TableRouteValue::Physical(x) = &table_route_value {
791 let region_storage_path = table_info_value.region_storage_path();
792 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
793 table_id,
794 &engine,
795 ®ion_storage_path,
796 region_options,
797 region_wal_options,
798 region_distribution(&x.region_routes),
799 )?;
800 txn = txn.merge(create_datanode_table_txn);
801 }
802
803 let mut r = self.kv_backend.txn(txn).await?;
804
805 if !r.succeeded {
807 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
808 let remote_table_info = on_create_table_info_failure(&mut set)?
809 .context(error::UnexpectedSnafu {
810 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
811 })?
812 .into_inner();
813
814 let remote_table_route = on_create_table_route_failure(&mut set)?
815 .context(error::UnexpectedSnafu {
816 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
817 })?
818 .into_inner();
819
820 let op_name = "the creating table metadata";
821 ensure_values!(remote_table_info, table_info_value, op_name);
822 ensure_values!(remote_table_route, table_route_value, op_name);
823 }
824
825 Ok(())
826 }
827
828 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
829 self.kv_backend.max_txn_ops() / 3
832 }
833
834 pub async fn create_logical_tables_metadata(
836 &self,
837 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
838 ) -> Result<()> {
839 let len = tables_data.len();
840 let mut txns = Vec::with_capacity(3 * len);
841 struct OnFailure<F1, R1, F2, R2>
842 where
843 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
844 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
845 {
846 table_info_value: TableInfoValue,
847 on_create_table_info_failure: F1,
848 table_route_value: TableRouteValue,
849 on_create_table_route_failure: F2,
850 }
851 let mut on_failures = Vec::with_capacity(len);
852 for (table_info, table_route_value) in tables_data {
853 let table_id = table_info.ident.table_id;
854
855 let table_name = TableNameKey::new(
857 &table_info.catalog_name,
858 &table_info.schema_name,
859 &table_info.name,
860 );
861 let create_table_name_txn = self
862 .table_name_manager()
863 .build_create_txn(&table_name, table_id)?;
864 txns.push(create_table_name_txn);
865
866 let table_info_value = TableInfoValue::new(table_info);
868 let (create_table_info_txn, on_create_table_info_failure) =
869 self.table_info_manager()
870 .build_create_txn(table_id, &table_info_value)?;
871 txns.push(create_table_info_txn);
872
873 let (create_table_route_txn, on_create_table_route_failure) = self
874 .table_route_manager()
875 .table_route_storage()
876 .build_create_txn(table_id, &table_route_value)?;
877 txns.push(create_table_route_txn);
878
879 on_failures.push(OnFailure {
880 table_info_value,
881 on_create_table_info_failure,
882 table_route_value,
883 on_create_table_route_failure,
884 });
885 }
886
887 let txn = Txn::merge_all(txns);
888 let mut r = self.kv_backend.txn(txn).await?;
889
890 if !r.succeeded {
892 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
893 for on_failure in on_failures {
894 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
895 .context(error::UnexpectedSnafu {
896 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
897 })?
898 .into_inner();
899
900 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
901 .context(error::UnexpectedSnafu {
902 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
903 })?
904 .into_inner();
905
906 let op_name = "the creating logical tables metadata";
907 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
908 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
909 }
910 }
911
912 Ok(())
913 }
914
915 fn table_metadata_keys(
916 &self,
917 table_id: TableId,
918 table_name: &TableName,
919 table_route_value: &TableRouteValue,
920 region_wal_options: &HashMap<RegionNumber, WalOptions>,
921 ) -> Result<Vec<Vec<u8>>> {
922 let datanode_ids = if table_route_value.is_physical() {
924 region_distribution(table_route_value.region_routes()?)
925 .into_keys()
926 .collect()
927 } else {
928 vec![]
929 };
930 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
931 let table_name = TableNameKey::new(
932 &table_name.catalog_name,
933 &table_name.schema_name,
934 &table_name.table_name,
935 );
936 let table_info_key = TableInfoKey::new(table_id);
937 let table_route_key = TableRouteKey::new(table_id);
938 let table_repart_key = TableRepartKey::new(table_id);
939 let datanode_table_keys = datanode_ids
940 .into_iter()
941 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
942 .collect::<HashSet<_>>();
943 let topic_region_map = self
944 .topic_region_manager
945 .get_topic_region_mapping(table_id, region_wal_options);
946 let topic_region_keys = topic_region_map
947 .iter()
948 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
949 .collect::<Vec<_>>();
950 keys.push(table_name.to_bytes());
951 keys.push(table_info_key.to_bytes());
952 keys.push(table_route_key.to_bytes());
953 keys.push(table_repart_key.to_bytes());
954 for key in &datanode_table_keys {
955 keys.push(key.to_bytes());
956 }
957 for key in topic_region_keys {
958 keys.push(key.to_bytes());
959 }
960 Ok(keys)
961 }
962
963 pub async fn delete_table_metadata(
966 &self,
967 table_id: TableId,
968 table_name: &TableName,
969 table_route_value: &TableRouteValue,
970 region_wal_options: &HashMap<RegionNumber, WalOptions>,
971 ) -> Result<()> {
972 let keys =
973 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974 self.tombstone_manager.create(keys).await.map(|_| ())
975 }
976
977 pub async fn delete_table_metadata_tombstone(
980 &self,
981 table_id: TableId,
982 table_name: &TableName,
983 table_route_value: &TableRouteValue,
984 region_wal_options: &HashMap<RegionNumber, WalOptions>,
985 ) -> Result<()> {
986 let table_metadata_keys =
987 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
988 self.tombstone_manager
989 .delete(table_metadata_keys)
990 .await
991 .map(|_| ())
992 }
993
994 pub async fn restore_table_metadata(
997 &self,
998 table_id: TableId,
999 table_name: &TableName,
1000 table_route_value: &TableRouteValue,
1001 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002 ) -> Result<()> {
1003 let keys =
1004 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005 self.tombstone_manager.restore(keys).await.map(|_| ())
1006 }
1007
1008 pub async fn destroy_table_metadata(
1011 &self,
1012 table_id: TableId,
1013 table_name: &TableName,
1014 table_route_value: &TableRouteValue,
1015 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1016 ) -> Result<()> {
1017 let keys =
1018 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1019 let _ = self
1020 .kv_backend
1021 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1022 .await?;
1023 Ok(())
1024 }
1025
1026 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1027 let mut keys = Vec::with_capacity(3);
1028 let view_name = TableNameKey::new(
1029 &view_name.catalog_name,
1030 &view_name.schema_name,
1031 &view_name.table_name,
1032 );
1033 let table_info_key = TableInfoKey::new(view_id);
1034 let view_info_key = ViewInfoKey::new(view_id);
1035 keys.push(view_name.to_bytes());
1036 keys.push(table_info_key.to_bytes());
1037 keys.push(view_info_key.to_bytes());
1038
1039 Ok(keys)
1040 }
1041
1042 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1045 let keys = self.view_info_keys(view_id, view_name)?;
1046 let _ = self
1047 .kv_backend
1048 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1049 .await?;
1050 Ok(())
1051 }
1052
1053 pub async fn rename_table(
1057 &self,
1058 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1059 new_table_name: String,
1060 ) -> Result<()> {
1061 let current_table_info = ¤t_table_info_value.table_info;
1062 let table_id = current_table_info.ident.table_id;
1063
1064 let table_name_key = TableNameKey::new(
1065 ¤t_table_info.catalog_name,
1066 ¤t_table_info.schema_name,
1067 ¤t_table_info.name,
1068 );
1069
1070 let new_table_name_key = TableNameKey::new(
1071 ¤t_table_info.catalog_name,
1072 ¤t_table_info.schema_name,
1073 &new_table_name,
1074 );
1075
1076 let update_table_name_txn = self.table_name_manager().build_update_txn(
1078 &table_name_key,
1079 &new_table_name_key,
1080 table_id,
1081 )?;
1082
1083 let new_table_info_value = current_table_info_value
1084 .inner
1085 .with_update(move |table_info| {
1086 table_info.name = new_table_name;
1087 });
1088
1089 let (update_table_info_txn, on_update_table_info_failure) = self
1091 .table_info_manager()
1092 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1093
1094 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1095
1096 let mut r = self.kv_backend.txn(txn).await?;
1097
1098 if !r.succeeded {
1100 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1101 let remote_table_info = on_update_table_info_failure(&mut set)?
1102 .context(error::UnexpectedSnafu {
1103 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1104 })?
1105 .into_inner();
1106
1107 let op_name = "the renaming table metadata";
1108 ensure_values!(remote_table_info, new_table_info_value, op_name);
1109 }
1110
1111 Ok(())
1112 }
1113
1114 pub async fn update_table_info(
1118 &self,
1119 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1120 region_distribution: Option<RegionDistribution>,
1121 new_table_info: RawTableInfo,
1122 ) -> Result<()> {
1123 let table_id = current_table_info_value.table_info.ident.table_id;
1124 let new_table_info_value = current_table_info_value.update(new_table_info);
1125
1126 let (update_table_info_txn, on_update_table_info_failure) = self
1128 .table_info_manager()
1129 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1130
1131 let txn = if let Some(region_distribution) = region_distribution {
1132 let new_region_options = new_table_info_value.table_info.to_region_options();
1134 let update_datanode_table_options_txn = self
1135 .datanode_table_manager
1136 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1137 .await?;
1138 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1139 } else {
1140 update_table_info_txn
1141 };
1142
1143 let mut r = self.kv_backend.txn(txn).await?;
1144 if !r.succeeded {
1146 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1147 let remote_table_info = on_update_table_info_failure(&mut set)?
1148 .context(error::UnexpectedSnafu {
1149 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1150 })?
1151 .into_inner();
1152
1153 let op_name = "the updating table info";
1154 ensure_values!(remote_table_info, new_table_info_value, op_name);
1155 }
1156 Ok(())
1157 }
1158
1159 #[allow(clippy::too_many_arguments)]
1170 pub async fn update_view_info(
1171 &self,
1172 view_id: TableId,
1173 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1174 new_view_info: Vec<u8>,
1175 table_names: HashSet<TableName>,
1176 columns: Vec<String>,
1177 plan_columns: Vec<String>,
1178 definition: String,
1179 ) -> Result<()> {
1180 let new_view_info_value = current_view_info_value.update(
1181 new_view_info,
1182 table_names,
1183 columns,
1184 plan_columns,
1185 definition,
1186 );
1187
1188 let (update_view_info_txn, on_update_view_info_failure) = self
1190 .view_info_manager()
1191 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1192
1193 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1194
1195 if !r.succeeded {
1197 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1198 let remote_view_info = on_update_view_info_failure(&mut set)?
1199 .context(error::UnexpectedSnafu {
1200 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1201 })?
1202 .into_inner();
1203
1204 let op_name = "the updating view info";
1205 ensure_values!(remote_view_info, new_view_info_value, op_name);
1206 }
1207 Ok(())
1208 }
1209
1210 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1211 self.kv_backend.max_txn_ops()
1212 }
1213
1214 pub async fn batch_update_table_info_values(
1215 &self,
1216 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1217 ) -> Result<()> {
1218 let len = table_info_value_pairs.len();
1219 let mut txns = Vec::with_capacity(len);
1220 struct OnFailure<F, R>
1221 where
1222 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1223 {
1224 table_info_value: TableInfoValue,
1225 on_update_table_info_failure: F,
1226 }
1227 let mut on_failures = Vec::with_capacity(len);
1228
1229 for (table_info_value, new_table_info) in table_info_value_pairs {
1230 let table_id = table_info_value.table_info.ident.table_id;
1231
1232 let new_table_info_value = table_info_value.update(new_table_info);
1233
1234 let (update_table_info_txn, on_update_table_info_failure) =
1235 self.table_info_manager().build_update_txn(
1236 table_id,
1237 &table_info_value,
1238 &new_table_info_value,
1239 )?;
1240
1241 txns.push(update_table_info_txn);
1242
1243 on_failures.push(OnFailure {
1244 table_info_value: new_table_info_value,
1245 on_update_table_info_failure,
1246 });
1247 }
1248
1249 let txn = Txn::merge_all(txns);
1250 let mut r = self.kv_backend.txn(txn).await?;
1251
1252 if !r.succeeded {
1253 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1254 for on_failure in on_failures {
1255 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1256 .context(error::UnexpectedSnafu {
1257 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1258 })?
1259 .into_inner();
1260
1261 let op_name = "the batch updating table info";
1262 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268
1269 pub async fn update_table_route(
1270 &self,
1271 table_id: TableId,
1272 region_info: RegionInfo,
1273 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1274 new_region_routes: Vec<RegionRoute>,
1275 new_region_options: &HashMap<String, String>,
1276 new_region_wal_options: &HashMap<RegionNumber, String>,
1277 ) -> Result<()> {
1278 let current_region_distribution =
1280 region_distribution(current_table_route_value.region_routes()?);
1281 let new_region_distribution = region_distribution(&new_region_routes);
1282
1283 let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1284 table_id,
1285 ®ion_info.region_wal_options,
1286 new_region_wal_options,
1287 )?;
1288 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1289 table_id,
1290 region_info,
1291 current_region_distribution,
1292 new_region_distribution,
1293 new_region_options,
1294 new_region_wal_options,
1295 )?;
1296
1297 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299 let (update_table_route_txn, on_update_table_route_failure) = self
1300 .table_route_manager()
1301 .table_route_storage()
1302 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1303
1304 let txn = Txn::merge_all(vec![
1305 update_datanode_table_txn,
1306 update_table_route_txn,
1307 update_topic_region_txn,
1308 ]);
1309
1310 let mut r = self.kv_backend.txn(txn).await?;
1311
1312 if !r.succeeded {
1314 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1315 let remote_table_route = on_update_table_route_failure(&mut set)?
1316 .context(error::UnexpectedSnafu {
1317 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1318 })?
1319 .into_inner();
1320
1321 let op_name = "the updating table route";
1322 ensure_values!(remote_table_route, new_table_route_value, op_name);
1323 }
1324
1325 Ok(())
1326 }
1327
1328 pub async fn update_leader_region_status<F>(
1330 &self,
1331 table_id: TableId,
1332 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1333 next_region_route_status: F,
1334 ) -> Result<()>
1335 where
1336 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1337 {
1338 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1339
1340 let mut updated = 0;
1341 for route in &mut new_region_routes {
1342 if let Some(state) = next_region_route_status(route)
1343 && route.set_leader_state(state)
1344 {
1345 updated += 1;
1346 }
1347 }
1348
1349 if updated == 0 {
1350 warn!("No leader status updated");
1351 return Ok(());
1352 }
1353
1354 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1356
1357 let (update_table_route_txn, on_update_table_route_failure) = self
1358 .table_route_manager()
1359 .table_route_storage()
1360 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1361
1362 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1363
1364 if !r.succeeded {
1366 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1367 let remote_table_route = on_update_table_route_failure(&mut set)?
1368 .context(error::UnexpectedSnafu {
1369 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1370 })?
1371 .into_inner();
1372
1373 let op_name = "the updating leader region status";
1374 ensure_values!(remote_table_route, new_table_route_value, op_name);
1375 }
1376
1377 Ok(())
1378 }
1379}
1380
1381#[macro_export]
1382macro_rules! impl_metadata_value {
1383 ($($val_ty: ty), *) => {
1384 $(
1385 impl $crate::key::MetadataValue for $val_ty {
1386 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1387 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1388 }
1389
1390 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1391 serde_json::to_vec(self).context(SerdeJsonSnafu)
1392 }
1393 }
1394 )*
1395 }
1396}
1397
1398macro_rules! impl_metadata_key_get_txn_op {
1399 ($($key: ty), *) => {
1400 $(
1401 impl $crate::key::MetadataKeyGetTxnOp for $key {
1402 fn build_get_op(
1405 &self,
1406 ) -> (
1407 TxnOp,
1408 impl for<'a> FnMut(
1409 &'a mut TxnOpGetResponseSet,
1410 ) -> Option<Vec<u8>>,
1411 ) {
1412 let raw_key = self.to_bytes();
1413 (
1414 TxnOp::Get(raw_key.clone()),
1415 TxnOpGetResponseSet::filter(raw_key),
1416 )
1417 }
1418 }
1419 )*
1420 }
1421}
1422
1423impl_metadata_key_get_txn_op! {
1424 TableNameKey<'_>,
1425 TableInfoKey,
1426 ViewInfoKey,
1427 TableRouteKey,
1428 DatanodeTableKey
1429}
1430
1431#[macro_export]
1432macro_rules! impl_optional_metadata_value {
1433 ($($val_ty: ty), *) => {
1434 $(
1435 impl $val_ty {
1436 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1437 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1438 }
1439
1440 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1441 serde_json::to_vec(self).context(SerdeJsonSnafu)
1442 }
1443 }
1444 )*
1445 }
1446}
1447
1448impl_metadata_value! {
1449 TableNameValue,
1450 TableInfoValue,
1451 ViewInfoValue,
1452 DatanodeTableValue,
1453 FlowInfoValue,
1454 FlowNameValue,
1455 FlowRouteValue,
1456 TableFlowValue,
1457 NodeAddressValue,
1458 SchemaNameValue,
1459 FlowStateValue,
1460 PoisonValue,
1461 TopicRegionValue
1462}
1463
1464impl_optional_metadata_value! {
1465 CatalogNameValue,
1466 SchemaNameValue
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471 use std::collections::{BTreeMap, HashMap, HashSet};
1472 use std::sync::Arc;
1473
1474 use bytes::Bytes;
1475 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1476 use common_time::util::current_time_millis;
1477 use common_wal::options::{KafkaWalOptions, WalOptions};
1478 use futures::TryStreamExt;
1479 use store_api::storage::{RegionId, RegionNumber};
1480 use table::metadata::{RawTableInfo, TableInfo};
1481 use table::table_name::TableName;
1482
1483 use super::datanode_table::DatanodeTableKey;
1484 use super::test_utils;
1485 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1486 use crate::ddl::test_util::create_table::test_create_table_task;
1487 use crate::ddl::utils::region_storage_path;
1488 use crate::error::Result;
1489 use crate::key::datanode_table::RegionInfo;
1490 use crate::key::table_info::TableInfoValue;
1491 use crate::key::table_name::TableNameKey;
1492 use crate::key::table_route::TableRouteValue;
1493 use crate::key::topic_region::TopicRegionKey;
1494 use crate::key::{
1495 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1496 TableMetadataManager, ViewInfoValue,
1497 };
1498 use crate::kv_backend::KvBackend;
1499 use crate::kv_backend::memory::MemoryKvBackend;
1500 use crate::peer::Peer;
1501 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1502 use crate::rpc::store::RangeRequest;
1503 use crate::wal_provider::WalProvider;
1504
1505 #[test]
1506 fn test_deserialized_value_with_bytes() {
1507 let region_route = new_test_region_route();
1508 let region_routes = vec![region_route.clone()];
1509
1510 let expected_region_routes =
1511 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1512 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1513
1514 let value = DeserializedValueWithBytes {
1517 inner: TableRouteValue::physical(region_routes.clone()),
1519 bytes: Bytes::from(expected.clone()),
1520 };
1521
1522 let encoded = serde_json::to_vec(&value).unwrap();
1523
1524 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1527 serde_json::from_slice(&encoded).unwrap();
1528
1529 assert_eq!(decoded.inner, expected_region_routes);
1530 assert_eq!(decoded.bytes, expected);
1531 }
1532
1533 fn new_test_region_route() -> RegionRoute {
1534 new_region_route(1, 2)
1535 }
1536
1537 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1538 RegionRoute {
1539 region: Region {
1540 id: region_id.into(),
1541 name: "r1".to_string(),
1542 partition: None,
1543 attrs: BTreeMap::new(),
1544 partition_expr: Default::default(),
1545 },
1546 leader_peer: Some(Peer::new(datanode, "a2")),
1547 follower_peers: vec![],
1548 leader_state: None,
1549 leader_down_since: None,
1550 }
1551 }
1552
1553 fn new_test_table_info() -> TableInfo {
1554 test_utils::new_test_table_info(10)
1555 }
1556
1557 fn new_test_table_names() -> HashSet<TableName> {
1558 let mut set = HashSet::new();
1559 set.insert(TableName {
1560 catalog_name: "greptime".to_string(),
1561 schema_name: "public".to_string(),
1562 table_name: "a_table".to_string(),
1563 });
1564 set.insert(TableName {
1565 catalog_name: "greptime".to_string(),
1566 schema_name: "public".to_string(),
1567 table_name: "b_table".to_string(),
1568 });
1569 set
1570 }
1571
1572 async fn create_physical_table_metadata(
1573 table_metadata_manager: &TableMetadataManager,
1574 table_info: RawTableInfo,
1575 region_routes: Vec<RegionRoute>,
1576 region_wal_options: HashMap<RegionNumber, String>,
1577 ) -> Result<()> {
1578 table_metadata_manager
1579 .create_table_metadata(
1580 table_info,
1581 TableRouteValue::physical(region_routes),
1582 region_wal_options,
1583 )
1584 .await
1585 }
1586
1587 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1588 let topics = (0..2)
1589 .map(|i| format!("greptimedb_topic{}", i))
1590 .collect::<Vec<_>>();
1591 let wal_options = topics
1592 .iter()
1593 .map(|topic| {
1594 WalOptions::Kafka(KafkaWalOptions {
1595 topic: topic.clone(),
1596 })
1597 })
1598 .collect::<Vec<_>>();
1599
1600 (0..16)
1601 .enumerate()
1602 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1603 .collect()
1604 }
1605
1606 #[tokio::test]
1607 async fn test_raft_engine_topic_region_map() {
1608 let mem_kv = Arc::new(MemoryKvBackend::default());
1609 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1610 let region_route = new_test_region_route();
1611 let region_routes = &vec![region_route.clone()];
1612 let table_info: RawTableInfo = new_test_table_info().into();
1613 let wal_provider = WalProvider::RaftEngine;
1614 let regions: Vec<_> = (0..16).collect();
1615 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1616 create_physical_table_metadata(
1617 &table_metadata_manager,
1618 table_info.clone(),
1619 region_routes.clone(),
1620 region_wal_options.clone(),
1621 )
1622 .await
1623 .unwrap();
1624
1625 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1626 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1627 let resp = mem_kv.range(range_req).await.unwrap();
1628 assert!(resp.kvs.is_empty());
1630 }
1631
1632 #[tokio::test]
1633 async fn test_create_table_metadata() {
1634 let mem_kv = Arc::new(MemoryKvBackend::default());
1635 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1636 let region_route = new_test_region_route();
1637 let region_routes = &vec![region_route.clone()];
1638 let table_info: RawTableInfo = new_test_table_info().into();
1639 let region_wal_options = create_mock_region_wal_options()
1640 .into_iter()
1641 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1642 .collect::<HashMap<_, _>>();
1643
1644 create_physical_table_metadata(
1646 &table_metadata_manager,
1647 table_info.clone(),
1648 region_routes.clone(),
1649 region_wal_options.clone(),
1650 )
1651 .await
1652 .unwrap();
1653
1654 assert!(
1656 create_physical_table_metadata(
1657 &table_metadata_manager,
1658 table_info.clone(),
1659 region_routes.clone(),
1660 region_wal_options.clone(),
1661 )
1662 .await
1663 .is_ok()
1664 );
1665
1666 let mut modified_region_routes = region_routes.clone();
1667 modified_region_routes.push(region_route.clone());
1668 assert!(
1670 create_physical_table_metadata(
1671 &table_metadata_manager,
1672 table_info.clone(),
1673 modified_region_routes,
1674 region_wal_options.clone(),
1675 )
1676 .await
1677 .is_err()
1678 );
1679
1680 let (remote_table_info, remote_table_route) = table_metadata_manager
1681 .get_full_table_info(10)
1682 .await
1683 .unwrap();
1684
1685 assert_eq!(
1686 remote_table_info.unwrap().into_inner().table_info,
1687 table_info
1688 );
1689 assert_eq!(
1690 remote_table_route
1691 .unwrap()
1692 .into_inner()
1693 .region_routes()
1694 .unwrap(),
1695 region_routes
1696 );
1697
1698 for i in 0..2 {
1699 let region_number = i as u32;
1700 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1701 let topic = format!("greptimedb_topic{}", i);
1702 let regions = table_metadata_manager
1703 .topic_region_manager
1704 .regions(&topic)
1705 .await
1706 .unwrap()
1707 .into_keys()
1708 .collect::<Vec<_>>();
1709 assert_eq!(regions.len(), 8);
1710 assert!(regions.contains(®ion_id));
1711 }
1712 }
1713
1714 #[tokio::test]
1715 async fn test_create_logic_tables_metadata() {
1716 let mem_kv = Arc::new(MemoryKvBackend::default());
1717 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1718 let region_route = new_test_region_route();
1719 let region_routes = vec![region_route.clone()];
1720 let table_info: RawTableInfo = new_test_table_info().into();
1721 let table_id = table_info.ident.table_id;
1722 let table_route_value = TableRouteValue::physical(region_routes.clone());
1723
1724 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1725 table_metadata_manager
1727 .create_logical_tables_metadata(tables_data.clone())
1728 .await
1729 .unwrap();
1730
1731 assert!(
1733 table_metadata_manager
1734 .create_logical_tables_metadata(tables_data)
1735 .await
1736 .is_ok()
1737 );
1738
1739 let mut modified_region_routes = region_routes.clone();
1740 modified_region_routes.push(new_region_route(2, 3));
1741 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1742 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1743 assert!(
1745 table_metadata_manager
1746 .create_logical_tables_metadata(modified_tables_data)
1747 .await
1748 .is_err()
1749 );
1750
1751 let (remote_table_info, remote_table_route) = table_metadata_manager
1752 .get_full_table_info(table_id)
1753 .await
1754 .unwrap();
1755
1756 assert_eq!(
1757 remote_table_info.unwrap().into_inner().table_info,
1758 table_info
1759 );
1760 assert_eq!(
1761 remote_table_route
1762 .unwrap()
1763 .into_inner()
1764 .region_routes()
1765 .unwrap(),
1766 ®ion_routes
1767 );
1768 }
1769
1770 #[tokio::test]
1771 async fn test_create_many_logical_tables_metadata() {
1772 let kv_backend = Arc::new(MemoryKvBackend::default());
1773 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1774
1775 let mut tables_data = vec![];
1776 for i in 0..128 {
1777 let table_id = i + 1;
1778 let regin_number = table_id * 3;
1779 let region_id = RegionId::new(table_id, regin_number);
1780 let region_route = new_region_route(region_id.as_u64(), 2);
1781 let region_routes = vec![region_route.clone()];
1782 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1783 table_id,
1784 &format!("my_table_{}", table_id),
1785 )
1786 .into();
1787 let table_route_value = TableRouteValue::physical(region_routes.clone());
1788
1789 tables_data.push((table_info, table_route_value));
1790 }
1791
1792 table_metadata_manager
1794 .create_logical_tables_metadata(tables_data)
1795 .await
1796 .unwrap();
1797 }
1798
1799 #[tokio::test]
1800 async fn test_delete_table_metadata() {
1801 let mem_kv = Arc::new(MemoryKvBackend::default());
1802 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1803 let region_route = new_test_region_route();
1804 let region_routes = &vec![region_route.clone()];
1805 let table_info: RawTableInfo = new_test_table_info().into();
1806 let table_id = table_info.ident.table_id;
1807 let datanode_id = 2;
1808 let region_wal_options = create_mock_region_wal_options();
1809 let serialized_region_wal_options = region_wal_options
1810 .iter()
1811 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1812 .collect::<HashMap<_, _>>();
1813
1814 create_physical_table_metadata(
1816 &table_metadata_manager,
1817 table_info.clone(),
1818 region_routes.clone(),
1819 serialized_region_wal_options,
1820 )
1821 .await
1822 .unwrap();
1823
1824 let table_name = TableName::new(
1825 table_info.catalog_name,
1826 table_info.schema_name,
1827 table_info.name,
1828 );
1829 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1830 table_metadata_manager
1832 .delete_table_metadata(
1833 table_id,
1834 &table_name,
1835 table_route_value,
1836 ®ion_wal_options,
1837 )
1838 .await
1839 .unwrap();
1840 table_metadata_manager
1842 .delete_table_metadata(
1843 table_id,
1844 &table_name,
1845 table_route_value,
1846 ®ion_wal_options,
1847 )
1848 .await
1849 .unwrap();
1850 assert!(
1851 table_metadata_manager
1852 .table_info_manager()
1853 .get(table_id)
1854 .await
1855 .unwrap()
1856 .is_none()
1857 );
1858 assert!(
1859 table_metadata_manager
1860 .table_route_manager()
1861 .table_route_storage()
1862 .get(table_id)
1863 .await
1864 .unwrap()
1865 .is_none()
1866 );
1867 assert!(
1868 table_metadata_manager
1869 .datanode_table_manager()
1870 .tables(datanode_id)
1871 .try_collect::<Vec<_>>()
1872 .await
1873 .unwrap()
1874 .is_empty()
1875 );
1876 let table_info = table_metadata_manager
1878 .table_info_manager()
1879 .get(table_id)
1880 .await
1881 .unwrap();
1882 assert!(table_info.is_none());
1883 let table_route = table_metadata_manager
1884 .table_route_manager()
1885 .table_route_storage()
1886 .get(table_id)
1887 .await
1888 .unwrap();
1889 assert!(table_route.is_none());
1890 let regions = table_metadata_manager
1892 .topic_region_manager
1893 .regions("greptimedb_topic0")
1894 .await
1895 .unwrap();
1896 assert_eq!(regions.len(), 0);
1897 let regions = table_metadata_manager
1898 .topic_region_manager
1899 .regions("greptimedb_topic1")
1900 .await
1901 .unwrap();
1902 assert_eq!(regions.len(), 0);
1903 }
1904
1905 #[tokio::test]
1906 async fn test_rename_table() {
1907 let mem_kv = Arc::new(MemoryKvBackend::default());
1908 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1909 let region_route = new_test_region_route();
1910 let region_routes = vec![region_route.clone()];
1911 let table_info: RawTableInfo = new_test_table_info().into();
1912 let table_id = table_info.ident.table_id;
1913 create_physical_table_metadata(
1915 &table_metadata_manager,
1916 table_info.clone(),
1917 region_routes.clone(),
1918 HashMap::new(),
1919 )
1920 .await
1921 .unwrap();
1922
1923 let new_table_name = "another_name".to_string();
1924 let table_info_value =
1925 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1926
1927 table_metadata_manager
1928 .rename_table(&table_info_value, new_table_name.clone())
1929 .await
1930 .unwrap();
1931 table_metadata_manager
1933 .rename_table(&table_info_value, new_table_name.clone())
1934 .await
1935 .unwrap();
1936 let mut modified_table_info = table_info.clone();
1937 modified_table_info.name = "hi".to_string();
1938 let modified_table_info_value =
1939 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1940 assert!(
1943 table_metadata_manager
1944 .rename_table(&modified_table_info_value, new_table_name.clone())
1945 .await
1946 .is_err()
1947 );
1948
1949 let old_table_name = TableNameKey::new(
1950 &table_info.catalog_name,
1951 &table_info.schema_name,
1952 &table_info.name,
1953 );
1954 let new_table_name = TableNameKey::new(
1955 &table_info.catalog_name,
1956 &table_info.schema_name,
1957 &new_table_name,
1958 );
1959
1960 assert!(
1961 table_metadata_manager
1962 .table_name_manager()
1963 .get(old_table_name)
1964 .await
1965 .unwrap()
1966 .is_none()
1967 );
1968
1969 assert_eq!(
1970 table_metadata_manager
1971 .table_name_manager()
1972 .get(new_table_name)
1973 .await
1974 .unwrap()
1975 .unwrap()
1976 .table_id(),
1977 table_id
1978 );
1979 }
1980
1981 #[tokio::test]
1982 async fn test_update_table_info() {
1983 let mem_kv = Arc::new(MemoryKvBackend::default());
1984 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1985 let region_route = new_test_region_route();
1986 let region_routes = vec![region_route.clone()];
1987 let table_info: RawTableInfo = new_test_table_info().into();
1988 let table_id = table_info.ident.table_id;
1989 create_physical_table_metadata(
1991 &table_metadata_manager,
1992 table_info.clone(),
1993 region_routes.clone(),
1994 HashMap::new(),
1995 )
1996 .await
1997 .unwrap();
1998
1999 let mut new_table_info = table_info.clone();
2000 new_table_info.name = "hi".to_string();
2001 let current_table_info_value =
2002 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2003 table_metadata_manager
2005 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2006 .await
2007 .unwrap();
2008 table_metadata_manager
2010 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2011 .await
2012 .unwrap();
2013
2014 let updated_table_info = table_metadata_manager
2016 .table_info_manager()
2017 .get(table_id)
2018 .await
2019 .unwrap()
2020 .unwrap()
2021 .into_inner();
2022 assert_eq!(updated_table_info.table_info, new_table_info);
2023
2024 let mut wrong_table_info = table_info.clone();
2025 wrong_table_info.name = "wrong".to_string();
2026 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2027 current_table_info_value.update(wrong_table_info),
2028 );
2029 assert!(
2032 table_metadata_manager
2033 .update_table_info(&wrong_table_info_value, None, new_table_info)
2034 .await
2035 .is_err()
2036 )
2037 }
2038
2039 #[tokio::test]
2040 async fn test_update_table_leader_region_status() {
2041 let mem_kv = Arc::new(MemoryKvBackend::default());
2042 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2043 let datanode = 1;
2044 let region_routes = vec![
2045 RegionRoute {
2046 region: Region {
2047 id: 1.into(),
2048 name: "r1".to_string(),
2049 partition: None,
2050 attrs: BTreeMap::new(),
2051 partition_expr: Default::default(),
2052 },
2053 leader_peer: Some(Peer::new(datanode, "a2")),
2054 leader_state: Some(LeaderState::Downgrading),
2055 follower_peers: vec![],
2056 leader_down_since: Some(current_time_millis()),
2057 },
2058 RegionRoute {
2059 region: Region {
2060 id: 2.into(),
2061 name: "r2".to_string(),
2062 partition: None,
2063 attrs: BTreeMap::new(),
2064 partition_expr: Default::default(),
2065 },
2066 leader_peer: Some(Peer::new(datanode, "a1")),
2067 leader_state: None,
2068 follower_peers: vec![],
2069 leader_down_since: None,
2070 },
2071 ];
2072 let table_info: RawTableInfo = new_test_table_info().into();
2073 let table_id = table_info.ident.table_id;
2074 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2075 TableRouteValue::physical(region_routes.clone()),
2076 );
2077
2078 create_physical_table_metadata(
2080 &table_metadata_manager,
2081 table_info.clone(),
2082 region_routes.clone(),
2083 HashMap::new(),
2084 )
2085 .await
2086 .unwrap();
2087
2088 table_metadata_manager
2089 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2090 if region_route.leader_state.is_some() {
2091 None
2092 } else {
2093 Some(Some(LeaderState::Downgrading))
2094 }
2095 })
2096 .await
2097 .unwrap();
2098
2099 let updated_route_value = table_metadata_manager
2100 .table_route_manager()
2101 .table_route_storage()
2102 .get(table_id)
2103 .await
2104 .unwrap()
2105 .unwrap();
2106
2107 assert_eq!(
2108 updated_route_value.region_routes().unwrap()[0].leader_state,
2109 Some(LeaderState::Downgrading)
2110 );
2111
2112 assert!(
2113 updated_route_value.region_routes().unwrap()[0]
2114 .leader_down_since
2115 .is_some()
2116 );
2117
2118 assert_eq!(
2119 updated_route_value.region_routes().unwrap()[1].leader_state,
2120 Some(LeaderState::Downgrading)
2121 );
2122 assert!(
2123 updated_route_value.region_routes().unwrap()[1]
2124 .leader_down_since
2125 .is_some()
2126 );
2127 }
2128
2129 async fn assert_datanode_table(
2130 table_metadata_manager: &TableMetadataManager,
2131 table_id: u32,
2132 region_routes: &[RegionRoute],
2133 ) {
2134 let region_distribution = region_distribution(region_routes);
2135 for (datanode, regions) in region_distribution {
2136 let got = table_metadata_manager
2137 .datanode_table_manager()
2138 .get(&DatanodeTableKey::new(datanode, table_id))
2139 .await
2140 .unwrap()
2141 .unwrap();
2142
2143 assert_eq!(got.regions, regions.leader_regions);
2144 assert_eq!(got.follower_regions, regions.follower_regions);
2145 }
2146 }
2147
2148 #[tokio::test]
2149 async fn test_update_table_route() {
2150 let mem_kv = Arc::new(MemoryKvBackend::default());
2151 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2152 let region_route = new_test_region_route();
2153 let region_routes = vec![region_route.clone()];
2154 let table_info: RawTableInfo = new_test_table_info().into();
2155 let table_id = table_info.ident.table_id;
2156 let engine = table_info.meta.engine.as_str();
2157 let region_storage_path =
2158 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2159 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2160 TableRouteValue::physical(region_routes.clone()),
2161 );
2162
2163 create_physical_table_metadata(
2165 &table_metadata_manager,
2166 table_info.clone(),
2167 region_routes.clone(),
2168 HashMap::new(),
2169 )
2170 .await
2171 .unwrap();
2172
2173 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2174 let new_region_routes = vec![
2175 new_region_route(1, 1),
2176 new_region_route(2, 2),
2177 new_region_route(3, 3),
2178 ];
2179 table_metadata_manager
2181 .update_table_route(
2182 table_id,
2183 RegionInfo {
2184 engine: engine.to_string(),
2185 region_storage_path: region_storage_path.clone(),
2186 region_options: HashMap::new(),
2187 region_wal_options: HashMap::new(),
2188 },
2189 ¤t_table_route_value,
2190 new_region_routes.clone(),
2191 &HashMap::new(),
2192 &HashMap::new(),
2193 )
2194 .await
2195 .unwrap();
2196 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2197
2198 table_metadata_manager
2200 .update_table_route(
2201 table_id,
2202 RegionInfo {
2203 engine: engine.to_string(),
2204 region_storage_path: region_storage_path.clone(),
2205 region_options: HashMap::new(),
2206 region_wal_options: HashMap::new(),
2207 },
2208 ¤t_table_route_value,
2209 new_region_routes.clone(),
2210 &HashMap::new(),
2211 &HashMap::new(),
2212 )
2213 .await
2214 .unwrap();
2215
2216 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2217 current_table_route_value
2218 .inner
2219 .update(new_region_routes.clone())
2220 .unwrap(),
2221 );
2222 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2223 table_metadata_manager
2225 .update_table_route(
2226 table_id,
2227 RegionInfo {
2228 engine: engine.to_string(),
2229 region_storage_path: region_storage_path.clone(),
2230 region_options: HashMap::new(),
2231 region_wal_options: HashMap::new(),
2232 },
2233 ¤t_table_route_value,
2234 new_region_routes.clone(),
2235 &HashMap::new(),
2236 &HashMap::new(),
2237 )
2238 .await
2239 .unwrap();
2240 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2241
2242 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2245 current_table_route_value
2246 .update(vec![
2247 new_region_route(1, 1),
2248 new_region_route(2, 2),
2249 new_region_route(3, 3),
2250 new_region_route(4, 4),
2251 ])
2252 .unwrap(),
2253 );
2254 assert!(
2255 table_metadata_manager
2256 .update_table_route(
2257 table_id,
2258 RegionInfo {
2259 engine: engine.to_string(),
2260 region_storage_path: region_storage_path.clone(),
2261 region_options: HashMap::new(),
2262 region_wal_options: HashMap::new(),
2263 },
2264 &wrong_table_route_value,
2265 new_region_routes,
2266 &HashMap::new(),
2267 &HashMap::new(),
2268 )
2269 .await
2270 .is_err()
2271 );
2272 }
2273
2274 #[tokio::test]
2275 async fn test_update_table_route_with_topic_region_mapping() {
2276 let mem_kv = Arc::new(MemoryKvBackend::default());
2277 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2278 let region_route = new_test_region_route();
2279 let region_routes = vec![region_route.clone()];
2280 let table_info: RawTableInfo = new_test_table_info().into();
2281 let table_id = table_info.ident.table_id;
2282 let engine = table_info.meta.engine.as_str();
2283 let region_storage_path =
2284 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2285
2286 let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2288 (
2289 1,
2290 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2291 topic: "topic_1".to_string(),
2292 }))
2293 .unwrap(),
2294 ),
2295 (
2296 2,
2297 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2298 topic: "topic_2".to_string(),
2299 }))
2300 .unwrap(),
2301 ),
2302 ]
2303 .into_iter()
2304 .collect();
2305
2306 create_physical_table_metadata(
2307 &table_metadata_manager,
2308 table_info.clone(),
2309 region_routes.clone(),
2310 old_region_wal_options.clone(),
2311 )
2312 .await
2313 .unwrap();
2314
2315 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2316 TableRouteValue::physical(region_routes.clone()),
2317 );
2318
2319 let region_id_1 = RegionId::new(table_id, 1);
2321 let region_id_2 = RegionId::new(table_id, 2);
2322 let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2323 let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2324 assert!(
2325 table_metadata_manager
2326 .topic_region_manager
2327 .get(topic_1_key.clone())
2328 .await
2329 .unwrap()
2330 .is_some()
2331 );
2332 assert!(
2333 table_metadata_manager
2334 .topic_region_manager
2335 .get(topic_2_key.clone())
2336 .await
2337 .unwrap()
2338 .is_some()
2339 );
2340
2341 let new_region_routes = vec![
2343 new_region_route(1, 1),
2344 new_region_route(2, 2),
2345 new_region_route(3, 3), ];
2347 let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2348 (
2349 1,
2350 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2351 topic: "topic_1".to_string(), }))
2353 .unwrap(),
2354 ),
2355 (
2356 2,
2357 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2358 topic: "topic_2".to_string(), }))
2360 .unwrap(),
2361 ),
2362 (
2363 3,
2364 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2365 topic: "topic_3".to_string(), }))
2367 .unwrap(),
2368 ),
2369 ]
2370 .into_iter()
2371 .collect();
2372 let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2373 current_table_route_value
2374 .inner
2375 .update(new_region_routes.clone())
2376 .unwrap(),
2377 );
2378 table_metadata_manager
2379 .update_table_route(
2380 table_id,
2381 RegionInfo {
2382 engine: engine.to_string(),
2383 region_storage_path: region_storage_path.clone(),
2384 region_options: HashMap::new(),
2385 region_wal_options: old_region_wal_options.clone(),
2386 },
2387 ¤t_table_route_value,
2388 new_region_routes.clone(),
2389 &HashMap::new(),
2390 &new_region_wal_options,
2391 )
2392 .await
2393 .unwrap();
2394 let region_id_3 = RegionId::new(table_id, 3);
2396 let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2397 assert!(
2398 table_metadata_manager
2399 .topic_region_manager
2400 .get(topic_3_key)
2401 .await
2402 .unwrap()
2403 .is_some()
2404 );
2405 let newer_region_routes = vec![
2407 new_region_route(1, 1),
2408 ];
2411 let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2412 (
2413 1,
2414 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2415 topic: "topic_1".to_string(), }))
2417 .unwrap(),
2418 ),
2419 (
2420 3,
2421 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2422 topic: "topic_3_new".to_string(), }))
2424 .unwrap(),
2425 ),
2426 ]
2427 .into_iter()
2428 .collect();
2429 table_metadata_manager
2430 .update_table_route(
2431 table_id,
2432 RegionInfo {
2433 engine: engine.to_string(),
2434 region_storage_path: region_storage_path.clone(),
2435 region_options: HashMap::new(),
2436 region_wal_options: new_region_wal_options.clone(),
2437 },
2438 ¤t_table_route_value_updated,
2439 newer_region_routes.clone(),
2440 &HashMap::new(),
2441 &newer_region_wal_options,
2442 )
2443 .await
2444 .unwrap();
2445 let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2447 assert!(
2448 table_metadata_manager
2449 .topic_region_manager
2450 .get(topic_2_key_new)
2451 .await
2452 .unwrap()
2453 .is_none()
2454 );
2455 let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2457 assert!(
2458 table_metadata_manager
2459 .topic_region_manager
2460 .get(topic_3_key_old)
2461 .await
2462 .unwrap()
2463 .is_none()
2464 );
2465 let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2467 assert!(
2468 table_metadata_manager
2469 .topic_region_manager
2470 .get(topic_3_key_new)
2471 .await
2472 .unwrap()
2473 .is_some()
2474 );
2475 assert!(
2477 table_metadata_manager
2478 .topic_region_manager
2479 .get(topic_1_key)
2480 .await
2481 .unwrap()
2482 .is_some()
2483 );
2484 }
2485
2486 #[tokio::test]
2487 async fn test_destroy_table_metadata() {
2488 let mem_kv = Arc::new(MemoryKvBackend::default());
2489 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2490 let table_id = 1025;
2491 let table_name = "foo";
2492 let task = test_create_table_task(table_name, table_id);
2493 let options = create_mock_region_wal_options();
2494 let serialized_options = options
2495 .iter()
2496 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2497 .collect::<HashMap<_, _>>();
2498 table_metadata_manager
2499 .create_table_metadata(
2500 task.table_info,
2501 TableRouteValue::physical(vec![
2502 RegionRoute {
2503 region: Region::new_test(RegionId::new(table_id, 1)),
2504 leader_peer: Some(Peer::empty(1)),
2505 follower_peers: vec![Peer::empty(5)],
2506 leader_state: None,
2507 leader_down_since: None,
2508 },
2509 RegionRoute {
2510 region: Region::new_test(RegionId::new(table_id, 2)),
2511 leader_peer: Some(Peer::empty(2)),
2512 follower_peers: vec![Peer::empty(4)],
2513 leader_state: None,
2514 leader_down_since: None,
2515 },
2516 RegionRoute {
2517 region: Region::new_test(RegionId::new(table_id, 3)),
2518 leader_peer: Some(Peer::empty(3)),
2519 follower_peers: vec![],
2520 leader_state: None,
2521 leader_down_since: None,
2522 },
2523 ]),
2524 serialized_options,
2525 )
2526 .await
2527 .unwrap();
2528 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2529 let table_route_value = table_metadata_manager
2530 .table_route_manager
2531 .table_route_storage()
2532 .get_with_raw_bytes(table_id)
2533 .await
2534 .unwrap()
2535 .unwrap();
2536 table_metadata_manager
2537 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2538 .await
2539 .unwrap();
2540 assert!(mem_kv.is_empty());
2541 }
2542
2543 #[tokio::test]
2544 async fn test_restore_table_metadata() {
2545 let mem_kv = Arc::new(MemoryKvBackend::default());
2546 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2547 let table_id = 1025;
2548 let table_name = "foo";
2549 let task = test_create_table_task(table_name, table_id);
2550 let options = create_mock_region_wal_options();
2551 let serialized_options = options
2552 .iter()
2553 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2554 .collect::<HashMap<_, _>>();
2555 table_metadata_manager
2556 .create_table_metadata(
2557 task.table_info,
2558 TableRouteValue::physical(vec![
2559 RegionRoute {
2560 region: Region::new_test(RegionId::new(table_id, 1)),
2561 leader_peer: Some(Peer::empty(1)),
2562 follower_peers: vec![Peer::empty(5)],
2563 leader_state: None,
2564 leader_down_since: None,
2565 },
2566 RegionRoute {
2567 region: Region::new_test(RegionId::new(table_id, 2)),
2568 leader_peer: Some(Peer::empty(2)),
2569 follower_peers: vec![Peer::empty(4)],
2570 leader_state: None,
2571 leader_down_since: None,
2572 },
2573 RegionRoute {
2574 region: Region::new_test(RegionId::new(table_id, 3)),
2575 leader_peer: Some(Peer::empty(3)),
2576 follower_peers: vec![],
2577 leader_state: None,
2578 leader_down_since: None,
2579 },
2580 ]),
2581 serialized_options,
2582 )
2583 .await
2584 .unwrap();
2585 let expected_result = mem_kv.dump();
2586 let table_route_value = table_metadata_manager
2587 .table_route_manager
2588 .table_route_storage()
2589 .get_with_raw_bytes(table_id)
2590 .await
2591 .unwrap()
2592 .unwrap();
2593 let region_routes = table_route_value.region_routes().unwrap();
2594 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2595 let table_route_value = TableRouteValue::physical(region_routes.clone());
2596 table_metadata_manager
2597 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2598 .await
2599 .unwrap();
2600 table_metadata_manager
2601 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2602 .await
2603 .unwrap();
2604 let kvs = mem_kv.dump();
2605 assert_eq!(kvs, expected_result);
2606 table_metadata_manager
2608 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2609 .await
2610 .unwrap();
2611 let kvs = mem_kv.dump();
2612 assert_eq!(kvs, expected_result);
2613 }
2614
2615 #[tokio::test]
2616 async fn test_create_update_view_info() {
2617 let mem_kv = Arc::new(MemoryKvBackend::default());
2618 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2619
2620 let view_info: RawTableInfo = new_test_table_info().into();
2621
2622 let view_id = view_info.ident.table_id;
2623
2624 let logical_plan: Vec<u8> = vec![1, 2, 3];
2625 let columns = vec!["a".to_string()];
2626 let plan_columns = vec!["number".to_string()];
2627 let table_names = new_test_table_names();
2628 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2629
2630 table_metadata_manager
2632 .create_view_metadata(
2633 view_info.clone(),
2634 logical_plan.clone(),
2635 table_names.clone(),
2636 columns.clone(),
2637 plan_columns.clone(),
2638 definition.to_string(),
2639 )
2640 .await
2641 .unwrap();
2642
2643 {
2644 let current_view_info = table_metadata_manager
2646 .view_info_manager()
2647 .get(view_id)
2648 .await
2649 .unwrap()
2650 .unwrap()
2651 .into_inner();
2652 assert_eq!(current_view_info.view_info, logical_plan);
2653 assert_eq!(current_view_info.table_names, table_names);
2654 assert_eq!(current_view_info.definition, definition);
2655 assert_eq!(current_view_info.columns, columns);
2656 assert_eq!(current_view_info.plan_columns, plan_columns);
2657 let current_table_info = table_metadata_manager
2659 .table_info_manager()
2660 .get(view_id)
2661 .await
2662 .unwrap()
2663 .unwrap()
2664 .into_inner();
2665 assert_eq!(current_table_info.table_info, view_info);
2666 }
2667
2668 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2669 let new_table_names = {
2670 let mut set = HashSet::new();
2671 set.insert(TableName {
2672 catalog_name: "greptime".to_string(),
2673 schema_name: "public".to_string(),
2674 table_name: "b_table".to_string(),
2675 });
2676 set.insert(TableName {
2677 catalog_name: "greptime".to_string(),
2678 schema_name: "public".to_string(),
2679 table_name: "c_table".to_string(),
2680 });
2681 set
2682 };
2683 let new_columns = vec!["b".to_string()];
2684 let new_plan_columns = vec!["number2".to_string()];
2685 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2686
2687 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2688 logical_plan.clone(),
2689 table_names,
2690 columns,
2691 plan_columns,
2692 definition.to_string(),
2693 ));
2694 table_metadata_manager
2696 .update_view_info(
2697 view_id,
2698 ¤t_view_info_value,
2699 new_logical_plan.clone(),
2700 new_table_names.clone(),
2701 new_columns.clone(),
2702 new_plan_columns.clone(),
2703 new_definition.to_string(),
2704 )
2705 .await
2706 .unwrap();
2707 table_metadata_manager
2709 .update_view_info(
2710 view_id,
2711 ¤t_view_info_value,
2712 new_logical_plan.clone(),
2713 new_table_names.clone(),
2714 new_columns.clone(),
2715 new_plan_columns.clone(),
2716 new_definition.to_string(),
2717 )
2718 .await
2719 .unwrap();
2720
2721 let updated_view_info = table_metadata_manager
2723 .view_info_manager()
2724 .get(view_id)
2725 .await
2726 .unwrap()
2727 .unwrap()
2728 .into_inner();
2729 assert_eq!(updated_view_info.view_info, new_logical_plan);
2730 assert_eq!(updated_view_info.table_names, new_table_names);
2731 assert_eq!(updated_view_info.definition, new_definition);
2732 assert_eq!(updated_view_info.columns, new_columns);
2733 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2734
2735 let wrong_view_info = logical_plan.clone();
2736 let wrong_definition = "wrong_definition";
2737 let wrong_view_info_value =
2738 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2739 wrong_view_info,
2740 new_table_names.clone(),
2741 new_columns.clone(),
2742 new_plan_columns.clone(),
2743 wrong_definition.to_string(),
2744 ));
2745 assert!(
2748 table_metadata_manager
2749 .update_view_info(
2750 view_id,
2751 &wrong_view_info_value,
2752 new_logical_plan.clone(),
2753 new_table_names.clone(),
2754 vec!["c".to_string()],
2755 vec!["number3".to_string()],
2756 wrong_definition.to_string(),
2757 )
2758 .await
2759 .is_err()
2760 );
2761
2762 let current_view_info = table_metadata_manager
2764 .view_info_manager()
2765 .get(view_id)
2766 .await
2767 .unwrap()
2768 .unwrap()
2769 .into_inner();
2770 assert_eq!(current_view_info.view_info, new_logical_plan);
2771 assert_eq!(current_view_info.table_names, new_table_names);
2772 assert_eq!(current_view_info.definition, new_definition);
2773 assert_eq!(current_view_info.columns, new_columns);
2774 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2775 }
2776
2777 #[test]
2778 fn test_region_role_set_deserialize() {
2779 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2780 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2781 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2782 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2783
2784 let s = r#"[1, 2, 3]"#;
2785 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2786 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2787 assert!(region_role_set.follower_regions.is_empty());
2788 }
2789
2790 #[test]
2791 fn test_region_distribution_deserialize() {
2792 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2793 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2794 assert_eq!(region_distribution.len(), 2);
2795 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2796 assert!(region_distribution[&1].follower_regions.is_empty());
2797 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2798 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2799 }
2800}