1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use lazy_static::lazy_static;
135use regex::Regex;
136pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
137use serde::de::DeserializeOwned;
138use serde::{Deserialize, Serialize};
139use snafu::{OptionExt, ResultExt, ensure};
140use store_api::storage::RegionNumber;
141use table::metadata::{TableId, TableInfo};
142use table::table_name::TableName;
143use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
144use table_name::{TableNameKey, TableNameManager, TableNameValue};
145use topic_name::TopicNameManager;
146use topic_region::{TopicRegionKey, TopicRegionManager};
147use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
148
149use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
150use self::datanode_table::RegionInfo;
151use self::flow::flow_info::FlowInfoValue;
152use self::flow::flow_name::FlowNameValue;
153use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
154use self::table_route::{TableRouteManager, TableRouteValue};
155use self::tombstone::TombstoneManager;
156use crate::DatanodeId;
157use crate::error::{self, Result, SerdeJsonSnafu};
158use crate::key::flow::flow_state::FlowStateValue;
159use crate::key::node_address::NodeAddressValue;
160use crate::key::table_repart::{TableRepartKey, TableRepartManager};
161use crate::key::table_route::TableRouteKey;
162use crate::key::topic_region::TopicRegionValue;
163use crate::key::txn_helper::TxnOpGetResponseSet;
164use crate::kv_backend::KvBackendRef;
165use crate::kv_backend::txn::{Txn, TxnOp};
166use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
167use crate::rpc::store::BatchDeleteRequest;
168use crate::state_store::PoisonValue;
169
170pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
171pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
172pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
173pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
174pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
175
176pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
177pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
178pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
179pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
180pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
181pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
182pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
183pub const TABLE_REPART_PREFIX: &str = "__table_repart";
184pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
185pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
186pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
188pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
189
190pub const ELECTION_KEY: &str = "__metasrv_election";
192pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
194
195pub const CACHE_KEY_PREFIXES: [&str; 5] = [
197 TABLE_NAME_KEY_PREFIX,
198 CATALOG_NAME_KEY_PREFIX,
199 SCHEMA_NAME_KEY_PREFIX,
200 TABLE_ROUTE_PREFIX,
201 NODE_ADDRESS_PREFIX,
202];
203
204#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
206pub struct RegionRoleSet {
207 pub leader_regions: Vec<RegionNumber>,
209 pub follower_regions: Vec<RegionNumber>,
211}
212
213impl<'de> Deserialize<'de> for RegionRoleSet {
214 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
215 where
216 D: serde::Deserializer<'de>,
217 {
218 #[derive(Deserialize)]
219 #[serde(untagged)]
220 enum RegionRoleSetOrLeaderOnly {
221 Full {
222 leader_regions: Vec<RegionNumber>,
223 follower_regions: Vec<RegionNumber>,
224 },
225 LeaderOnly(Vec<RegionNumber>),
226 }
227 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
228 RegionRoleSetOrLeaderOnly::Full {
229 leader_regions,
230 follower_regions,
231 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
232 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
233 Ok(RegionRoleSet::new(leader_regions, vec![]))
234 }
235 }
236 }
237}
238
239impl RegionRoleSet {
240 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
242 Self {
243 leader_regions,
244 follower_regions,
245 }
246 }
247
248 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
250 self.leader_regions.push(region_number);
251 }
252
253 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
255 self.follower_regions.push(region_number);
256 }
257
258 pub fn sort(&mut self) {
260 self.follower_regions.sort();
261 self.leader_regions.sort();
262 }
263}
264
265pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
269
270pub type FlowId = u32;
272pub type FlowPartitionId = u32;
274
275lazy_static! {
276 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
277}
278
279lazy_static! {
280 static ref TABLE_INFO_KEY_PATTERN: Regex =
281 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285 static ref VIEW_INFO_KEY_PATTERN: Regex =
286 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
291 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
296 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
297}
298
299lazy_static! {
300 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
301 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
302}
303
304lazy_static! {
305 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
314 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
315 ))
316 .unwrap();
317}
318
319lazy_static! {
320 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
322 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
323 ))
324 .unwrap();
325}
326
327lazy_static! {
328 static ref NODE_ADDRESS_PATTERN: Regex =
329 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
330}
331
332lazy_static! {
333 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
334 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
335}
336
337lazy_static! {
338 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
339 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
340 ))
341 .unwrap();
342}
343
344pub trait MetadataKey<'a, T> {
346 fn to_bytes(&self) -> Vec<u8>;
347
348 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
349}
350
351#[derive(Debug, Clone, PartialEq)]
352pub struct BytesAdapter(Vec<u8>);
353
354impl From<Vec<u8>> for BytesAdapter {
355 fn from(value: Vec<u8>) -> Self {
356 Self(value)
357 }
358}
359
360impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
361 fn to_bytes(&self) -> Vec<u8> {
362 self.0.clone()
363 }
364
365 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
366 Ok(BytesAdapter(bytes.to_vec()))
367 }
368}
369
370pub(crate) trait MetadataKeyGetTxnOp {
371 fn build_get_op(
372 &self,
373 ) -> (
374 TxnOp,
375 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
376 );
377}
378
379pub trait MetadataValue {
380 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
381 where
382 Self: Sized;
383
384 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
385}
386
387pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
388
389pub struct TableMetadataManager {
390 table_name_manager: TableNameManager,
391 table_info_manager: TableInfoManager,
392 view_info_manager: ViewInfoManager,
393 datanode_table_manager: DatanodeTableManager,
394 catalog_manager: CatalogManager,
395 schema_manager: SchemaManager,
396 table_route_manager: TableRouteManager,
397 table_repart_manager: TableRepartManager,
398 tombstone_manager: TombstoneManager,
399 topic_name_manager: TopicNameManager,
400 topic_region_manager: TopicRegionManager,
401 kv_backend: KvBackendRef,
402}
403
404#[macro_export]
405macro_rules! ensure_values {
406 ($got:expr, $expected_value:expr, $name:expr) => {
407 ensure!(
408 $got == $expected_value,
409 error::UnexpectedSnafu {
410 err_msg: format!(
411 "Reads the different value: {:?} during {}, expected: {:?}",
412 $got, $name, $expected_value
413 )
414 }
415 );
416 };
417}
418
419pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
429 bytes: Bytes,
431 inner: T,
433}
434
435impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
436 type Target = T;
437
438 fn deref(&self) -> &Self::Target {
439 &self.inner
440 }
441}
442
443impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
444 fn deref_mut(&mut self) -> &mut Self::Target {
445 &mut self.inner
446 }
447}
448
449impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(
452 f,
453 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
454 self.inner, self.bytes
455 )
456 }
457}
458
459impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
460 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
464 where
465 S: serde::Serializer,
466 {
467 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
470 }
471}
472
473impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
474 for DeserializedValueWithBytes<T>
475{
476 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
480 where
481 D: serde::Deserializer<'de>,
482 {
483 let buf = String::deserialize(deserializer)?;
484 let bytes = Bytes::from(buf);
485
486 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
487 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
488
489 Ok(value)
490 }
491}
492
493impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
494 fn clone(&self) -> Self {
495 Self {
496 bytes: self.bytes.clone(),
497 inner: self.inner.clone(),
498 }
499 }
500}
501
502impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
503 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
506 let inner = T::try_from_raw_value(&bytes)?;
507 Ok(Self { bytes, inner })
508 }
509
510 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
513 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
514 }
515
516 pub fn into_inner(self) -> T {
517 self.inner
518 }
519
520 pub fn get_inner_ref(&self) -> &T {
521 &self.inner
522 }
523
524 pub fn get_raw_bytes(&self) -> Vec<u8> {
526 self.bytes.to_vec()
527 }
528
529 #[cfg(any(test, feature = "testing"))]
530 pub fn from_inner(inner: T) -> Self {
531 let bytes = serde_json::to_vec(&inner).unwrap();
532
533 Self {
534 bytes: Bytes::from(bytes),
535 inner,
536 }
537 }
538}
539
540impl TableMetadataManager {
541 pub fn new(kv_backend: KvBackendRef) -> Self {
542 TableMetadataManager {
543 table_name_manager: TableNameManager::new(kv_backend.clone()),
544 table_info_manager: TableInfoManager::new(kv_backend.clone()),
545 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
546 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
547 catalog_manager: CatalogManager::new(kv_backend.clone()),
548 schema_manager: SchemaManager::new(kv_backend.clone()),
549 table_route_manager: TableRouteManager::new(kv_backend.clone()),
550 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
551 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
552 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
553 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
554 kv_backend,
555 }
556 }
557
558 pub fn new_with_custom_tombstone_prefix(
560 kv_backend: KvBackendRef,
561 tombstone_prefix: &str,
562 ) -> Self {
563 Self {
564 table_name_manager: TableNameManager::new(kv_backend.clone()),
565 table_info_manager: TableInfoManager::new(kv_backend.clone()),
566 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
567 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
568 catalog_manager: CatalogManager::new(kv_backend.clone()),
569 schema_manager: SchemaManager::new(kv_backend.clone()),
570 table_route_manager: TableRouteManager::new(kv_backend.clone()),
571 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
572 tombstone_manager: TombstoneManager::new_with_prefix(
573 kv_backend.clone(),
574 tombstone_prefix,
575 ),
576 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578 kv_backend,
579 }
580 }
581
582 pub async fn init(&self) -> Result<()> {
583 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
584
585 self.catalog_manager().create(catalog_name, true).await?;
586
587 let internal_schemas = [
588 DEFAULT_SCHEMA_NAME,
589 INFORMATION_SCHEMA_NAME,
590 DEFAULT_PRIVATE_SCHEMA_NAME,
591 ];
592
593 for schema_name in internal_schemas {
594 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
595
596 self.schema_manager().create(schema_key, None, true).await?;
597 }
598
599 Ok(())
600 }
601
602 pub fn table_name_manager(&self) -> &TableNameManager {
603 &self.table_name_manager
604 }
605
606 pub fn table_info_manager(&self) -> &TableInfoManager {
607 &self.table_info_manager
608 }
609
610 pub fn view_info_manager(&self) -> &ViewInfoManager {
611 &self.view_info_manager
612 }
613
614 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
615 &self.datanode_table_manager
616 }
617
618 pub fn catalog_manager(&self) -> &CatalogManager {
619 &self.catalog_manager
620 }
621
622 pub fn schema_manager(&self) -> &SchemaManager {
623 &self.schema_manager
624 }
625
626 pub fn table_route_manager(&self) -> &TableRouteManager {
627 &self.table_route_manager
628 }
629
630 pub fn table_repart_manager(&self) -> &TableRepartManager {
631 &self.table_repart_manager
632 }
633
634 pub fn topic_name_manager(&self) -> &TopicNameManager {
635 &self.topic_name_manager
636 }
637
638 pub fn topic_region_manager(&self) -> &TopicRegionManager {
639 &self.topic_region_manager
640 }
641
642 pub fn kv_backend(&self) -> &KvBackendRef {
643 &self.kv_backend
644 }
645
646 pub async fn get_full_table_info(
647 &self,
648 table_id: TableId,
649 ) -> Result<(
650 Option<DeserializedValueWithBytes<TableInfoValue>>,
651 Option<DeserializedValueWithBytes<TableRouteValue>>,
652 )> {
653 let table_info_key = TableInfoKey::new(table_id);
654 let table_route_key = TableRouteKey::new(table_id);
655 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
656 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
657
658 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
659 let mut res = self.kv_backend.txn(txn).await?;
660 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
661 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
662 let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
663 if let Some(table_route_value) = &mut table_route_value {
664 self.table_route_manager()
665 .table_route_storage()
666 .remap_route_address(table_route_value)
667 .await?;
668 }
669 Ok((table_info_value, table_route_value))
670 }
671
672 pub async fn create_view_metadata(
682 &self,
683 view_info: TableInfo,
684 raw_logical_plan: Vec<u8>,
685 table_names: HashSet<TableName>,
686 columns: Vec<String>,
687 plan_columns: Vec<String>,
688 definition: String,
689 ) -> Result<()> {
690 let view_id = view_info.ident.table_id;
691
692 let view_name = TableNameKey::new(
694 &view_info.catalog_name,
695 &view_info.schema_name,
696 &view_info.name,
697 );
698 let create_table_name_txn = self
699 .table_name_manager()
700 .build_create_txn(&view_name, view_id)?;
701
702 let table_info_value = TableInfoValue::new(view_info);
704
705 let (create_table_info_txn, on_create_table_info_failure) = self
706 .table_info_manager()
707 .build_create_txn(view_id, &table_info_value)?;
708
709 let view_info_value = ViewInfoValue::new(
711 raw_logical_plan,
712 table_names,
713 columns,
714 plan_columns,
715 definition,
716 );
717 let (create_view_info_txn, on_create_view_info_failure) = self
718 .view_info_manager()
719 .build_create_txn(view_id, &view_info_value)?;
720
721 let txn = Txn::merge_all(vec![
722 create_table_name_txn,
723 create_table_info_txn,
724 create_view_info_txn,
725 ]);
726
727 let mut r = self.kv_backend.txn(txn).await?;
728
729 if !r.succeeded {
731 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
732 let remote_table_info = on_create_table_info_failure(&mut set)?
733 .context(error::UnexpectedSnafu {
734 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
735 })?
736 .into_inner();
737
738 let remote_view_info = on_create_view_info_failure(&mut set)?
739 .context(error::UnexpectedSnafu {
740 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
741 })?
742 .into_inner();
743
744 let op_name = "the creating view metadata";
745 ensure_values!(remote_table_info, table_info_value, op_name);
746 ensure_values!(remote_view_info, view_info_value, op_name);
747 }
748
749 Ok(())
750 }
751
752 pub async fn create_table_metadata(
755 &self,
756 table_info: TableInfo,
757 table_route_value: TableRouteValue,
758 region_wal_options: HashMap<RegionNumber, String>,
759 ) -> Result<()> {
760 let table_id = table_info.ident.table_id;
761 let engine = table_info.meta.engine.clone();
762
763 let table_name = TableNameKey::new(
765 &table_info.catalog_name,
766 &table_info.schema_name,
767 &table_info.name,
768 );
769 let create_table_name_txn = self
770 .table_name_manager()
771 .build_create_txn(&table_name, table_id)?;
772
773 let region_options = table_info.to_region_options();
774 let table_info_value = TableInfoValue::new(table_info);
776 let (create_table_info_txn, on_create_table_info_failure) = self
777 .table_info_manager()
778 .build_create_txn(table_id, &table_info_value)?;
779
780 let (create_table_route_txn, on_create_table_route_failure) = self
781 .table_route_manager()
782 .table_route_storage()
783 .build_create_txn(table_id, &table_route_value)?;
784
785 let create_topic_region_txn = self
786 .topic_region_manager
787 .build_create_txn(table_id, ®ion_wal_options)?;
788
789 let mut txn = Txn::merge_all(vec![
790 create_table_name_txn,
791 create_table_info_txn,
792 create_table_route_txn,
793 create_topic_region_txn,
794 ]);
795
796 if let TableRouteValue::Physical(x) = &table_route_value {
797 let region_storage_path = table_info_value.region_storage_path();
798 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
799 table_id,
800 &engine,
801 ®ion_storage_path,
802 region_options,
803 region_wal_options,
804 region_distribution(&x.region_routes),
805 )?;
806 txn = txn.merge(create_datanode_table_txn);
807 }
808
809 let mut r = self.kv_backend.txn(txn).await?;
810
811 if !r.succeeded {
813 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
814 let remote_table_info = on_create_table_info_failure(&mut set)?
815 .context(error::UnexpectedSnafu {
816 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
817 })?
818 .into_inner();
819
820 let remote_table_route = on_create_table_route_failure(&mut set)?
821 .context(error::UnexpectedSnafu {
822 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
823 })?
824 .into_inner();
825
826 let op_name = "the creating table metadata";
827 ensure_values!(remote_table_info, table_info_value, op_name);
828 ensure_values!(remote_table_route, table_route_value, op_name);
829 }
830
831 Ok(())
832 }
833
834 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
835 self.kv_backend.max_txn_ops() / 3
838 }
839
840 pub async fn create_logical_tables_metadata(
842 &self,
843 tables_data: Vec<(TableInfo, TableRouteValue)>,
844 ) -> Result<()> {
845 let len = tables_data.len();
846 let mut txns = Vec::with_capacity(3 * len);
847 struct OnFailure<F1, R1, F2, R2>
848 where
849 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
850 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
851 {
852 table_info_value: TableInfoValue,
853 on_create_table_info_failure: F1,
854 table_route_value: TableRouteValue,
855 on_create_table_route_failure: F2,
856 }
857 let mut on_failures = Vec::with_capacity(len);
858 for (table_info, table_route_value) in tables_data {
859 let table_id = table_info.ident.table_id;
860
861 let table_name = TableNameKey::new(
863 &table_info.catalog_name,
864 &table_info.schema_name,
865 &table_info.name,
866 );
867 let create_table_name_txn = self
868 .table_name_manager()
869 .build_create_txn(&table_name, table_id)?;
870 txns.push(create_table_name_txn);
871
872 let table_info_value = TableInfoValue::new(table_info);
874 let (create_table_info_txn, on_create_table_info_failure) =
875 self.table_info_manager()
876 .build_create_txn(table_id, &table_info_value)?;
877 txns.push(create_table_info_txn);
878
879 let (create_table_route_txn, on_create_table_route_failure) = self
880 .table_route_manager()
881 .table_route_storage()
882 .build_create_txn(table_id, &table_route_value)?;
883 txns.push(create_table_route_txn);
884
885 on_failures.push(OnFailure {
886 table_info_value,
887 on_create_table_info_failure,
888 table_route_value,
889 on_create_table_route_failure,
890 });
891 }
892
893 let txn = Txn::merge_all(txns);
894 let mut r = self.kv_backend.txn(txn).await?;
895
896 if !r.succeeded {
898 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
899 for on_failure in on_failures {
900 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
901 .context(error::UnexpectedSnafu {
902 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
903 })?
904 .into_inner();
905
906 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
907 .context(error::UnexpectedSnafu {
908 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
909 })?
910 .into_inner();
911
912 let op_name = "the creating logical tables metadata";
913 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
914 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
915 }
916 }
917
918 Ok(())
919 }
920
921 fn table_metadata_keys(
922 &self,
923 table_id: TableId,
924 table_name: &TableName,
925 table_route_value: &TableRouteValue,
926 region_wal_options: &HashMap<RegionNumber, WalOptions>,
927 ) -> Result<Vec<Vec<u8>>> {
928 let datanode_ids = if table_route_value.is_physical() {
930 region_distribution(table_route_value.region_routes()?)
931 .into_keys()
932 .collect()
933 } else {
934 vec![]
935 };
936 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
937 let table_name = TableNameKey::new(
938 &table_name.catalog_name,
939 &table_name.schema_name,
940 &table_name.table_name,
941 );
942 let table_info_key = TableInfoKey::new(table_id);
943 let table_route_key = TableRouteKey::new(table_id);
944 let table_repart_key = TableRepartKey::new(table_id);
945 let datanode_table_keys = datanode_ids
946 .into_iter()
947 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
948 .collect::<HashSet<_>>();
949 let topic_region_map = self
950 .topic_region_manager
951 .get_topic_region_mapping(table_id, region_wal_options);
952 let topic_region_keys = topic_region_map
953 .iter()
954 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
955 .collect::<Vec<_>>();
956 keys.push(table_name.to_bytes());
957 keys.push(table_info_key.to_bytes());
958 keys.push(table_route_key.to_bytes());
959 keys.push(table_repart_key.to_bytes());
960 for key in &datanode_table_keys {
961 keys.push(key.to_bytes());
962 }
963 for key in topic_region_keys {
964 keys.push(key.to_bytes());
965 }
966 Ok(keys)
967 }
968
969 pub async fn delete_table_metadata(
972 &self,
973 table_id: TableId,
974 table_name: &TableName,
975 table_route_value: &TableRouteValue,
976 region_wal_options: &HashMap<RegionNumber, WalOptions>,
977 ) -> Result<()> {
978 let keys =
979 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
980 self.tombstone_manager.create(keys).await.map(|_| ())
981 }
982
983 pub async fn delete_table_metadata_tombstone(
986 &self,
987 table_id: TableId,
988 table_name: &TableName,
989 table_route_value: &TableRouteValue,
990 region_wal_options: &HashMap<RegionNumber, WalOptions>,
991 ) -> Result<()> {
992 let table_metadata_keys =
993 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
994 self.tombstone_manager
995 .delete(table_metadata_keys)
996 .await
997 .map(|_| ())
998 }
999
1000 pub async fn restore_table_metadata(
1003 &self,
1004 table_id: TableId,
1005 table_name: &TableName,
1006 table_route_value: &TableRouteValue,
1007 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1008 ) -> Result<()> {
1009 let keys =
1010 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1011 self.tombstone_manager.restore(keys).await.map(|_| ())
1012 }
1013
1014 pub async fn destroy_table_metadata(
1017 &self,
1018 table_id: TableId,
1019 table_name: &TableName,
1020 table_route_value: &TableRouteValue,
1021 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1022 ) -> Result<()> {
1023 let keys =
1024 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1025 let _ = self
1026 .kv_backend
1027 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1028 .await?;
1029 Ok(())
1030 }
1031
1032 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1033 let mut keys = Vec::with_capacity(3);
1034 let view_name = TableNameKey::new(
1035 &view_name.catalog_name,
1036 &view_name.schema_name,
1037 &view_name.table_name,
1038 );
1039 let table_info_key = TableInfoKey::new(view_id);
1040 let view_info_key = ViewInfoKey::new(view_id);
1041 keys.push(view_name.to_bytes());
1042 keys.push(table_info_key.to_bytes());
1043 keys.push(view_info_key.to_bytes());
1044
1045 Ok(keys)
1046 }
1047
1048 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1051 let keys = self.view_info_keys(view_id, view_name)?;
1052 let _ = self
1053 .kv_backend
1054 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1055 .await?;
1056 Ok(())
1057 }
1058
1059 pub async fn rename_table(
1063 &self,
1064 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1065 new_table_name: String,
1066 ) -> Result<()> {
1067 let current_table_info = ¤t_table_info_value.table_info;
1068 let table_id = current_table_info.ident.table_id;
1069
1070 let table_name_key = TableNameKey::new(
1071 ¤t_table_info.catalog_name,
1072 ¤t_table_info.schema_name,
1073 ¤t_table_info.name,
1074 );
1075
1076 let new_table_name_key = TableNameKey::new(
1077 ¤t_table_info.catalog_name,
1078 ¤t_table_info.schema_name,
1079 &new_table_name,
1080 );
1081
1082 let update_table_name_txn = self.table_name_manager().build_update_txn(
1084 &table_name_key,
1085 &new_table_name_key,
1086 table_id,
1087 )?;
1088
1089 let new_table_info_value = current_table_info_value
1090 .inner
1091 .with_update(move |table_info| {
1092 table_info.name = new_table_name;
1093 });
1094
1095 let (update_table_info_txn, on_update_table_info_failure) = self
1097 .table_info_manager()
1098 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1099
1100 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1101
1102 let mut r = self.kv_backend.txn(txn).await?;
1103
1104 if !r.succeeded {
1106 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1107 let remote_table_info = on_update_table_info_failure(&mut set)?
1108 .context(error::UnexpectedSnafu {
1109 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1110 })?
1111 .into_inner();
1112
1113 let op_name = "the renaming table metadata";
1114 ensure_values!(remote_table_info, new_table_info_value, op_name);
1115 }
1116
1117 Ok(())
1118 }
1119
1120 pub async fn update_table_info(
1124 &self,
1125 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1126 region_distribution: Option<RegionDistribution>,
1127 new_table_info: TableInfo,
1128 ) -> Result<()> {
1129 let table_id = current_table_info_value.table_info.ident.table_id;
1130 let new_table_info_value = current_table_info_value.update(new_table_info);
1131
1132 let (update_table_info_txn, on_update_table_info_failure) = self
1134 .table_info_manager()
1135 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1136
1137 let txn = if let Some(region_distribution) = region_distribution {
1138 let new_region_options = new_table_info_value.table_info.to_region_options();
1140 let update_datanode_table_options_txn = self
1141 .datanode_table_manager
1142 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1143 .await?;
1144 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1145 } else {
1146 update_table_info_txn
1147 };
1148
1149 let mut r = self.kv_backend.txn(txn).await?;
1150 if !r.succeeded {
1152 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1153 let remote_table_info = on_update_table_info_failure(&mut set)?
1154 .context(error::UnexpectedSnafu {
1155 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1156 })?
1157 .into_inner();
1158
1159 let op_name = "the updating table info";
1160 ensure_values!(remote_table_info, new_table_info_value, op_name);
1161 }
1162 Ok(())
1163 }
1164
1165 #[allow(clippy::too_many_arguments)]
1176 pub async fn update_view_info(
1177 &self,
1178 view_id: TableId,
1179 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1180 new_view_info: Vec<u8>,
1181 table_names: HashSet<TableName>,
1182 columns: Vec<String>,
1183 plan_columns: Vec<String>,
1184 definition: String,
1185 ) -> Result<()> {
1186 let new_view_info_value = current_view_info_value.update(
1187 new_view_info,
1188 table_names,
1189 columns,
1190 plan_columns,
1191 definition,
1192 );
1193
1194 let (update_view_info_txn, on_update_view_info_failure) = self
1196 .view_info_manager()
1197 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1198
1199 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1200
1201 if !r.succeeded {
1203 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1204 let remote_view_info = on_update_view_info_failure(&mut set)?
1205 .context(error::UnexpectedSnafu {
1206 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1207 })?
1208 .into_inner();
1209
1210 let op_name = "the updating view info";
1211 ensure_values!(remote_view_info, new_view_info_value, op_name);
1212 }
1213 Ok(())
1214 }
1215
1216 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1217 self.kv_backend.max_txn_ops()
1218 }
1219
1220 pub async fn batch_update_table_info_values(
1221 &self,
1222 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1223 ) -> Result<()> {
1224 let len = table_info_value_pairs.len();
1225 let mut txns = Vec::with_capacity(len);
1226 struct OnFailure<F, R>
1227 where
1228 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1229 {
1230 table_info_value: TableInfoValue,
1231 on_update_table_info_failure: F,
1232 }
1233 let mut on_failures = Vec::with_capacity(len);
1234
1235 for (table_info_value, new_table_info) in table_info_value_pairs {
1236 let table_id = table_info_value.table_info.ident.table_id;
1237
1238 let new_table_info_value = table_info_value.update(new_table_info);
1239
1240 let (update_table_info_txn, on_update_table_info_failure) =
1241 self.table_info_manager().build_update_txn(
1242 table_id,
1243 &table_info_value,
1244 &new_table_info_value,
1245 )?;
1246
1247 txns.push(update_table_info_txn);
1248
1249 on_failures.push(OnFailure {
1250 table_info_value: new_table_info_value,
1251 on_update_table_info_failure,
1252 });
1253 }
1254
1255 let txn = Txn::merge_all(txns);
1256 let mut r = self.kv_backend.txn(txn).await?;
1257
1258 if !r.succeeded {
1259 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1260 for on_failure in on_failures {
1261 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1262 .context(error::UnexpectedSnafu {
1263 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1264 })?
1265 .into_inner();
1266
1267 let op_name = "the batch updating table info";
1268 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1269 }
1270 }
1271
1272 Ok(())
1273 }
1274
1275 pub async fn update_table_route(
1276 &self,
1277 table_id: TableId,
1278 region_info: RegionInfo,
1279 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1280 new_region_routes: Vec<RegionRoute>,
1281 new_region_options: &HashMap<String, String>,
1282 new_region_wal_options: &HashMap<RegionNumber, String>,
1283 ) -> Result<()> {
1284 let current_region_distribution =
1286 region_distribution(current_table_route_value.region_routes()?);
1287 let new_region_distribution = region_distribution(&new_region_routes);
1288
1289 let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1290 table_id,
1291 ®ion_info.region_wal_options,
1292 new_region_wal_options,
1293 )?;
1294 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1295 table_id,
1296 region_info,
1297 current_region_distribution,
1298 new_region_distribution,
1299 new_region_options,
1300 new_region_wal_options,
1301 )?;
1302
1303 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1305 let (update_table_route_txn, on_update_table_route_failure) = self
1306 .table_route_manager()
1307 .table_route_storage()
1308 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1309
1310 let txn = Txn::merge_all(vec![
1311 update_datanode_table_txn,
1312 update_table_route_txn,
1313 update_topic_region_txn,
1314 ]);
1315
1316 let mut r = self.kv_backend.txn(txn).await?;
1317
1318 if !r.succeeded {
1320 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1321 let remote_table_route = on_update_table_route_failure(&mut set)?
1322 .context(error::UnexpectedSnafu {
1323 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1324 })?
1325 .into_inner();
1326
1327 let op_name = "the updating table route";
1328 ensure_values!(remote_table_route, new_table_route_value, op_name);
1329 }
1330
1331 Ok(())
1332 }
1333
1334 pub async fn update_leader_region_status<F>(
1336 &self,
1337 table_id: TableId,
1338 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1339 next_region_route_status: F,
1340 ) -> Result<()>
1341 where
1342 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1343 {
1344 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1345
1346 let mut updated = 0;
1347 for route in &mut new_region_routes {
1348 if let Some(state) = next_region_route_status(route)
1349 && route.set_leader_state(state)
1350 {
1351 updated += 1;
1352 }
1353 }
1354
1355 if updated == 0 {
1356 warn!("No leader status updated");
1357 return Ok(());
1358 }
1359
1360 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1362
1363 let (update_table_route_txn, on_update_table_route_failure) = self
1364 .table_route_manager()
1365 .table_route_storage()
1366 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1367
1368 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1369
1370 if !r.succeeded {
1372 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1373 let remote_table_route = on_update_table_route_failure(&mut set)?
1374 .context(error::UnexpectedSnafu {
1375 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1376 })?
1377 .into_inner();
1378
1379 let op_name = "the updating leader region status";
1380 ensure_values!(remote_table_route, new_table_route_value, op_name);
1381 }
1382
1383 Ok(())
1384 }
1385}
1386
1387#[macro_export]
1388macro_rules! impl_metadata_value {
1389 ($($val_ty: ty), *) => {
1390 $(
1391 impl $crate::key::MetadataValue for $val_ty {
1392 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1393 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1394 }
1395
1396 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1397 serde_json::to_vec(self).context(SerdeJsonSnafu)
1398 }
1399 }
1400 )*
1401 }
1402}
1403
1404macro_rules! impl_metadata_key_get_txn_op {
1405 ($($key: ty), *) => {
1406 $(
1407 impl $crate::key::MetadataKeyGetTxnOp for $key {
1408 fn build_get_op(
1411 &self,
1412 ) -> (
1413 TxnOp,
1414 impl for<'a> FnMut(
1415 &'a mut TxnOpGetResponseSet,
1416 ) -> Option<Vec<u8>>,
1417 ) {
1418 let raw_key = self.to_bytes();
1419 (
1420 TxnOp::Get(raw_key.clone()),
1421 TxnOpGetResponseSet::filter(raw_key),
1422 )
1423 }
1424 }
1425 )*
1426 }
1427}
1428
1429impl_metadata_key_get_txn_op! {
1430 TableNameKey<'_>,
1431 TableInfoKey,
1432 ViewInfoKey,
1433 TableRouteKey,
1434 DatanodeTableKey
1435}
1436
1437#[macro_export]
1438macro_rules! impl_optional_metadata_value {
1439 ($($val_ty: ty), *) => {
1440 $(
1441 impl $val_ty {
1442 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1443 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1444 }
1445
1446 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1447 serde_json::to_vec(self).context(SerdeJsonSnafu)
1448 }
1449 }
1450 )*
1451 }
1452}
1453
1454impl_metadata_value! {
1455 TableNameValue,
1456 TableInfoValue,
1457 ViewInfoValue,
1458 DatanodeTableValue,
1459 FlowInfoValue,
1460 FlowNameValue,
1461 FlowRouteValue,
1462 TableFlowValue,
1463 NodeAddressValue,
1464 SchemaNameValue,
1465 FlowStateValue,
1466 PoisonValue,
1467 TopicRegionValue
1468}
1469
1470impl_optional_metadata_value! {
1471 CatalogNameValue,
1472 SchemaNameValue
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477 use std::collections::{BTreeMap, HashMap, HashSet};
1478 use std::sync::Arc;
1479
1480 use bytes::Bytes;
1481 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1482 use common_time::util::current_time_millis;
1483 use common_wal::options::{KafkaWalOptions, WalOptions};
1484 use futures::TryStreamExt;
1485 use store_api::storage::{RegionId, RegionNumber};
1486 use table::metadata::TableInfo;
1487 use table::table_name::TableName;
1488
1489 use super::datanode_table::DatanodeTableKey;
1490 use super::test_utils;
1491 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1492 use crate::ddl::test_util::create_table::test_create_table_task;
1493 use crate::ddl::utils::region_storage_path;
1494 use crate::error::Result;
1495 use crate::key::datanode_table::RegionInfo;
1496 use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1497 use crate::key::table_info::TableInfoValue;
1498 use crate::key::table_name::TableNameKey;
1499 use crate::key::table_route::TableRouteValue;
1500 use crate::key::topic_region::TopicRegionKey;
1501 use crate::key::{
1502 DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1503 TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1504 };
1505 use crate::kv_backend::KvBackend;
1506 use crate::kv_backend::memory::MemoryKvBackend;
1507 use crate::peer::Peer;
1508 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1509 use crate::rpc::store::{PutRequest, RangeRequest};
1510 use crate::wal_provider::WalProvider;
1511
1512 #[test]
1513 fn test_deserialized_value_with_bytes() {
1514 let region_route = new_test_region_route();
1515 let region_routes = vec![region_route.clone()];
1516
1517 let expected_region_routes =
1518 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1519 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1520
1521 let value = DeserializedValueWithBytes {
1524 inner: TableRouteValue::physical(region_routes.clone()),
1526 bytes: Bytes::from(expected.clone()),
1527 };
1528
1529 let encoded = serde_json::to_vec(&value).unwrap();
1530
1531 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1534 serde_json::from_slice(&encoded).unwrap();
1535
1536 assert_eq!(decoded.inner, expected_region_routes);
1537 assert_eq!(decoded.bytes, expected);
1538 }
1539
1540 fn new_test_region_route() -> RegionRoute {
1541 new_region_route(1, 2)
1542 }
1543
1544 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1545 RegionRoute {
1546 region: Region {
1547 id: region_id.into(),
1548 name: "r1".to_string(),
1549 attrs: BTreeMap::new(),
1550 partition_expr: Default::default(),
1551 },
1552 leader_peer: Some(Peer::new(datanode, "a2")),
1553 follower_peers: vec![],
1554 leader_state: None,
1555 leader_down_since: None,
1556 write_route_policy: None,
1557 }
1558 }
1559
1560 fn new_test_table_info() -> TableInfo {
1561 test_utils::new_test_table_info(10)
1562 }
1563
1564 fn new_test_table_names() -> HashSet<TableName> {
1565 let mut set = HashSet::new();
1566 set.insert(TableName {
1567 catalog_name: "greptime".to_string(),
1568 schema_name: "public".to_string(),
1569 table_name: "a_table".to_string(),
1570 });
1571 set.insert(TableName {
1572 catalog_name: "greptime".to_string(),
1573 schema_name: "public".to_string(),
1574 table_name: "b_table".to_string(),
1575 });
1576 set
1577 }
1578
1579 async fn create_physical_table_metadata(
1580 table_metadata_manager: &TableMetadataManager,
1581 table_info: TableInfo,
1582 region_routes: Vec<RegionRoute>,
1583 region_wal_options: HashMap<RegionNumber, String>,
1584 ) -> Result<()> {
1585 table_metadata_manager
1586 .create_table_metadata(
1587 table_info,
1588 TableRouteValue::physical(region_routes),
1589 region_wal_options,
1590 )
1591 .await
1592 }
1593
1594 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1595 let topics = (0..2)
1596 .map(|i| format!("greptimedb_topic{}", i))
1597 .collect::<Vec<_>>();
1598 let wal_options = topics
1599 .iter()
1600 .map(|topic| {
1601 WalOptions::Kafka(KafkaWalOptions {
1602 topic: topic.clone(),
1603 })
1604 })
1605 .collect::<Vec<_>>();
1606
1607 (0..16)
1608 .enumerate()
1609 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1610 .collect()
1611 }
1612
1613 #[tokio::test]
1614 async fn test_raft_engine_topic_region_map() {
1615 let mem_kv = Arc::new(MemoryKvBackend::default());
1616 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1617 let region_route = new_test_region_route();
1618 let region_routes = &vec![region_route.clone()];
1619 let table_info = new_test_table_info();
1620 let wal_provider = WalProvider::RaftEngine;
1621 let regions: Vec<_> = (0..16).collect();
1622 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1623 create_physical_table_metadata(
1624 &table_metadata_manager,
1625 table_info.clone(),
1626 region_routes.clone(),
1627 region_wal_options.clone(),
1628 )
1629 .await
1630 .unwrap();
1631
1632 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1633 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1634 let resp = mem_kv.range(range_req).await.unwrap();
1635 assert!(resp.kvs.is_empty());
1637 }
1638
1639 #[tokio::test]
1640 async fn test_create_table_metadata() {
1641 let mem_kv = Arc::new(MemoryKvBackend::default());
1642 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1643 let region_route = new_test_region_route();
1644 let region_routes = &vec![region_route.clone()];
1645 let table_info = new_test_table_info();
1646 let region_wal_options = create_mock_region_wal_options()
1647 .into_iter()
1648 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1649 .collect::<HashMap<_, _>>();
1650
1651 create_physical_table_metadata(
1653 &table_metadata_manager,
1654 table_info.clone(),
1655 region_routes.clone(),
1656 region_wal_options.clone(),
1657 )
1658 .await
1659 .unwrap();
1660
1661 assert!(
1663 create_physical_table_metadata(
1664 &table_metadata_manager,
1665 table_info.clone(),
1666 region_routes.clone(),
1667 region_wal_options.clone(),
1668 )
1669 .await
1670 .is_ok()
1671 );
1672
1673 let mut modified_region_routes = region_routes.clone();
1674 modified_region_routes.push(region_route.clone());
1675 assert!(
1677 create_physical_table_metadata(
1678 &table_metadata_manager,
1679 table_info.clone(),
1680 modified_region_routes,
1681 region_wal_options.clone(),
1682 )
1683 .await
1684 .is_err()
1685 );
1686
1687 let (remote_table_info, remote_table_route) = table_metadata_manager
1688 .get_full_table_info(10)
1689 .await
1690 .unwrap();
1691
1692 assert_eq!(
1693 remote_table_info.unwrap().into_inner().table_info,
1694 table_info
1695 );
1696 assert_eq!(
1697 remote_table_route
1698 .unwrap()
1699 .into_inner()
1700 .region_routes()
1701 .unwrap(),
1702 region_routes
1703 );
1704
1705 for i in 0..2 {
1706 let region_number = i as u32;
1707 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1708 let topic = format!("greptimedb_topic{}", i);
1709 let regions = table_metadata_manager
1710 .topic_region_manager
1711 .regions(&topic)
1712 .await
1713 .unwrap()
1714 .into_keys()
1715 .collect::<Vec<_>>();
1716 assert_eq!(regions.len(), 8);
1717 assert!(regions.contains(®ion_id));
1718 }
1719 }
1720
1721 #[tokio::test]
1722 async fn test_get_full_table_info_remaps_route_address() {
1723 let mem_kv = Arc::new(MemoryKvBackend::default());
1724 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1725
1726 let mut region_route = new_test_region_route();
1727 region_route.follower_peers = vec![Peer::empty(3)];
1728 let region_routes = vec![region_route];
1729 let table_info = new_test_table_info();
1730 let table_id = table_info.ident.table_id;
1731
1732 create_physical_table_metadata(
1733 &table_metadata_manager,
1734 table_info,
1735 region_routes,
1736 HashMap::new(),
1737 )
1738 .await
1739 .unwrap();
1740
1741 mem_kv
1742 .put(PutRequest {
1743 key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1744 value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1745 .try_as_raw_value()
1746 .unwrap(),
1747 ..Default::default()
1748 })
1749 .await
1750 .unwrap();
1751 mem_kv
1752 .put(PutRequest {
1753 key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1754 value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1755 .try_as_raw_value()
1756 .unwrap(),
1757 ..Default::default()
1758 })
1759 .await
1760 .unwrap();
1761
1762 let (_, table_route) = table_metadata_manager
1763 .get_full_table_info(table_id)
1764 .await
1765 .unwrap();
1766 let table_route = table_route.unwrap().into_inner();
1767 let region_routes = table_route.region_routes().unwrap();
1768
1769 assert_eq!(
1770 region_routes[0].leader_peer.as_ref().unwrap().addr,
1771 "new-a2"
1772 );
1773 assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1774 }
1775
1776 #[tokio::test]
1777 async fn test_create_logic_tables_metadata() {
1778 let mem_kv = Arc::new(MemoryKvBackend::default());
1779 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1780 let region_route = new_test_region_route();
1781 let region_routes = vec![region_route.clone()];
1782 let table_info = new_test_table_info();
1783 let table_id = table_info.ident.table_id;
1784 let table_route_value = TableRouteValue::physical(region_routes.clone());
1785
1786 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1787 table_metadata_manager
1789 .create_logical_tables_metadata(tables_data.clone())
1790 .await
1791 .unwrap();
1792
1793 assert!(
1795 table_metadata_manager
1796 .create_logical_tables_metadata(tables_data)
1797 .await
1798 .is_ok()
1799 );
1800
1801 let mut modified_region_routes = region_routes.clone();
1802 modified_region_routes.push(new_region_route(2, 3));
1803 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1804 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1805 assert!(
1807 table_metadata_manager
1808 .create_logical_tables_metadata(modified_tables_data)
1809 .await
1810 .is_err()
1811 );
1812
1813 let (remote_table_info, remote_table_route) = table_metadata_manager
1814 .get_full_table_info(table_id)
1815 .await
1816 .unwrap();
1817
1818 assert_eq!(
1819 remote_table_info.unwrap().into_inner().table_info,
1820 table_info
1821 );
1822 assert_eq!(
1823 remote_table_route
1824 .unwrap()
1825 .into_inner()
1826 .region_routes()
1827 .unwrap(),
1828 ®ion_routes
1829 );
1830 }
1831
1832 #[tokio::test]
1833 async fn test_create_many_logical_tables_metadata() {
1834 let kv_backend = Arc::new(MemoryKvBackend::default());
1835 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1836
1837 let mut tables_data = vec![];
1838 for i in 0..128 {
1839 let table_id = i + 1;
1840 let regin_number = table_id * 3;
1841 let region_id = RegionId::new(table_id, regin_number);
1842 let region_route = new_region_route(region_id.as_u64(), 2);
1843 let region_routes = vec![region_route.clone()];
1844 let table_info = test_utils::new_test_table_info_with_name(
1845 table_id,
1846 &format!("my_table_{}", table_id),
1847 );
1848 let table_route_value = TableRouteValue::physical(region_routes.clone());
1849
1850 tables_data.push((table_info, table_route_value));
1851 }
1852
1853 table_metadata_manager
1855 .create_logical_tables_metadata(tables_data)
1856 .await
1857 .unwrap();
1858 }
1859
1860 #[tokio::test]
1861 async fn test_delete_table_metadata() {
1862 let mem_kv = Arc::new(MemoryKvBackend::default());
1863 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1864 let region_route = new_test_region_route();
1865 let region_routes = &vec![region_route.clone()];
1866 let table_info = new_test_table_info();
1867 let table_id = table_info.ident.table_id;
1868 let datanode_id = 2;
1869 let region_wal_options = create_mock_region_wal_options();
1870 let serialized_region_wal_options = region_wal_options
1871 .iter()
1872 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1873 .collect::<HashMap<_, _>>();
1874
1875 create_physical_table_metadata(
1877 &table_metadata_manager,
1878 table_info.clone(),
1879 region_routes.clone(),
1880 serialized_region_wal_options,
1881 )
1882 .await
1883 .unwrap();
1884
1885 let table_name = TableName::new(
1886 table_info.catalog_name,
1887 table_info.schema_name,
1888 table_info.name,
1889 );
1890 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1891 table_metadata_manager
1893 .delete_table_metadata(
1894 table_id,
1895 &table_name,
1896 table_route_value,
1897 ®ion_wal_options,
1898 )
1899 .await
1900 .unwrap();
1901 table_metadata_manager
1903 .delete_table_metadata(
1904 table_id,
1905 &table_name,
1906 table_route_value,
1907 ®ion_wal_options,
1908 )
1909 .await
1910 .unwrap();
1911 assert!(
1912 table_metadata_manager
1913 .table_info_manager()
1914 .get(table_id)
1915 .await
1916 .unwrap()
1917 .is_none()
1918 );
1919 assert!(
1920 table_metadata_manager
1921 .table_route_manager()
1922 .table_route_storage()
1923 .get(table_id)
1924 .await
1925 .unwrap()
1926 .is_none()
1927 );
1928 assert!(
1929 table_metadata_manager
1930 .datanode_table_manager()
1931 .tables(datanode_id)
1932 .try_collect::<Vec<_>>()
1933 .await
1934 .unwrap()
1935 .is_empty()
1936 );
1937 let table_info = table_metadata_manager
1939 .table_info_manager()
1940 .get(table_id)
1941 .await
1942 .unwrap();
1943 assert!(table_info.is_none());
1944 let table_route = table_metadata_manager
1945 .table_route_manager()
1946 .table_route_storage()
1947 .get(table_id)
1948 .await
1949 .unwrap();
1950 assert!(table_route.is_none());
1951 let regions = table_metadata_manager
1953 .topic_region_manager
1954 .regions("greptimedb_topic0")
1955 .await
1956 .unwrap();
1957 assert_eq!(regions.len(), 0);
1958 let regions = table_metadata_manager
1959 .topic_region_manager
1960 .regions("greptimedb_topic1")
1961 .await
1962 .unwrap();
1963 assert_eq!(regions.len(), 0);
1964 }
1965
1966 #[tokio::test]
1967 async fn test_rename_table() {
1968 let mem_kv = Arc::new(MemoryKvBackend::default());
1969 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1970 let region_route = new_test_region_route();
1971 let region_routes = vec![region_route.clone()];
1972 let table_info = new_test_table_info();
1973 let table_id = table_info.ident.table_id;
1974 create_physical_table_metadata(
1976 &table_metadata_manager,
1977 table_info.clone(),
1978 region_routes.clone(),
1979 HashMap::new(),
1980 )
1981 .await
1982 .unwrap();
1983
1984 let new_table_name = "another_name".to_string();
1985 let table_info_value =
1986 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1987
1988 table_metadata_manager
1989 .rename_table(&table_info_value, new_table_name.clone())
1990 .await
1991 .unwrap();
1992 table_metadata_manager
1994 .rename_table(&table_info_value, new_table_name.clone())
1995 .await
1996 .unwrap();
1997 let mut modified_table_info = table_info.clone();
1998 modified_table_info.name = "hi".to_string();
1999 let modified_table_info_value =
2000 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2001 assert!(
2004 table_metadata_manager
2005 .rename_table(&modified_table_info_value, new_table_name.clone())
2006 .await
2007 .is_err()
2008 );
2009
2010 let old_table_name = TableNameKey::new(
2011 &table_info.catalog_name,
2012 &table_info.schema_name,
2013 &table_info.name,
2014 );
2015 let new_table_name = TableNameKey::new(
2016 &table_info.catalog_name,
2017 &table_info.schema_name,
2018 &new_table_name,
2019 );
2020
2021 assert!(
2022 table_metadata_manager
2023 .table_name_manager()
2024 .get(old_table_name)
2025 .await
2026 .unwrap()
2027 .is_none()
2028 );
2029
2030 assert_eq!(
2031 table_metadata_manager
2032 .table_name_manager()
2033 .get(new_table_name)
2034 .await
2035 .unwrap()
2036 .unwrap()
2037 .table_id(),
2038 table_id
2039 );
2040 }
2041
2042 #[tokio::test]
2043 async fn test_update_table_info() {
2044 let mem_kv = Arc::new(MemoryKvBackend::default());
2045 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2046 let region_route = new_test_region_route();
2047 let region_routes = vec![region_route.clone()];
2048 let table_info = new_test_table_info();
2049 let table_id = table_info.ident.table_id;
2050 create_physical_table_metadata(
2052 &table_metadata_manager,
2053 table_info.clone(),
2054 region_routes.clone(),
2055 HashMap::new(),
2056 )
2057 .await
2058 .unwrap();
2059
2060 let mut new_table_info = table_info.clone();
2061 new_table_info.name = "hi".to_string();
2062 let current_table_info_value =
2063 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2064 table_metadata_manager
2066 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2067 .await
2068 .unwrap();
2069 table_metadata_manager
2071 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2072 .await
2073 .unwrap();
2074
2075 let updated_table_info = table_metadata_manager
2077 .table_info_manager()
2078 .get(table_id)
2079 .await
2080 .unwrap()
2081 .unwrap()
2082 .into_inner();
2083 assert_eq!(updated_table_info.table_info, new_table_info);
2084
2085 let mut wrong_table_info = table_info.clone();
2086 wrong_table_info.name = "wrong".to_string();
2087 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2088 current_table_info_value.update(wrong_table_info),
2089 );
2090 assert!(
2093 table_metadata_manager
2094 .update_table_info(&wrong_table_info_value, None, new_table_info)
2095 .await
2096 .is_err()
2097 )
2098 }
2099
2100 #[tokio::test]
2101 async fn test_update_table_leader_region_status() {
2102 let mem_kv = Arc::new(MemoryKvBackend::default());
2103 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2104 let datanode = 1;
2105 let region_routes = vec![
2106 RegionRoute {
2107 region: Region {
2108 id: 1.into(),
2109 name: "r1".to_string(),
2110 attrs: BTreeMap::new(),
2111 partition_expr: Default::default(),
2112 },
2113 leader_peer: Some(Peer::new(datanode, "a2")),
2114 leader_state: Some(LeaderState::Downgrading),
2115 follower_peers: vec![],
2116 leader_down_since: Some(current_time_millis()),
2117 write_route_policy: None,
2118 },
2119 RegionRoute {
2120 region: Region {
2121 id: 2.into(),
2122 name: "r2".to_string(),
2123 attrs: BTreeMap::new(),
2124 partition_expr: Default::default(),
2125 },
2126 leader_peer: Some(Peer::new(datanode, "a1")),
2127 leader_state: None,
2128 follower_peers: vec![],
2129 leader_down_since: None,
2130 write_route_policy: None,
2131 },
2132 ];
2133 let table_info = new_test_table_info();
2134 let table_id = table_info.ident.table_id;
2135 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2136 TableRouteValue::physical(region_routes.clone()),
2137 );
2138
2139 create_physical_table_metadata(
2141 &table_metadata_manager,
2142 table_info.clone(),
2143 region_routes.clone(),
2144 HashMap::new(),
2145 )
2146 .await
2147 .unwrap();
2148
2149 table_metadata_manager
2150 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2151 if region_route.leader_state.is_some() {
2152 None
2153 } else {
2154 Some(Some(LeaderState::Downgrading))
2155 }
2156 })
2157 .await
2158 .unwrap();
2159
2160 let updated_route_value = table_metadata_manager
2161 .table_route_manager()
2162 .table_route_storage()
2163 .get(table_id)
2164 .await
2165 .unwrap()
2166 .unwrap();
2167
2168 assert_eq!(
2169 updated_route_value.region_routes().unwrap()[0].leader_state,
2170 Some(LeaderState::Downgrading)
2171 );
2172
2173 assert!(
2174 updated_route_value.region_routes().unwrap()[0]
2175 .leader_down_since
2176 .is_some()
2177 );
2178
2179 assert_eq!(
2180 updated_route_value.region_routes().unwrap()[1].leader_state,
2181 Some(LeaderState::Downgrading)
2182 );
2183 assert!(
2184 updated_route_value.region_routes().unwrap()[1]
2185 .leader_down_since
2186 .is_some()
2187 );
2188 }
2189
2190 async fn assert_datanode_table(
2191 table_metadata_manager: &TableMetadataManager,
2192 table_id: u32,
2193 region_routes: &[RegionRoute],
2194 ) {
2195 let region_distribution = region_distribution(region_routes);
2196 for (datanode, regions) in region_distribution {
2197 let got = table_metadata_manager
2198 .datanode_table_manager()
2199 .get(&DatanodeTableKey::new(datanode, table_id))
2200 .await
2201 .unwrap()
2202 .unwrap();
2203
2204 assert_eq!(got.regions, regions.leader_regions);
2205 assert_eq!(got.follower_regions, regions.follower_regions);
2206 }
2207 }
2208
2209 #[tokio::test]
2210 async fn test_update_table_route() {
2211 let mem_kv = Arc::new(MemoryKvBackend::default());
2212 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2213 let region_route = new_test_region_route();
2214 let region_routes = vec![region_route.clone()];
2215 let table_info = new_test_table_info();
2216 let table_id = table_info.ident.table_id;
2217 let engine = table_info.meta.engine.as_str();
2218 let region_storage_path =
2219 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2220 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2221 TableRouteValue::physical(region_routes.clone()),
2222 );
2223
2224 create_physical_table_metadata(
2226 &table_metadata_manager,
2227 table_info.clone(),
2228 region_routes.clone(),
2229 HashMap::new(),
2230 )
2231 .await
2232 .unwrap();
2233
2234 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2235 let new_region_routes = vec![
2236 new_region_route(1, 1),
2237 new_region_route(2, 2),
2238 new_region_route(3, 3),
2239 ];
2240 table_metadata_manager
2242 .update_table_route(
2243 table_id,
2244 RegionInfo {
2245 engine: engine.to_string(),
2246 region_storage_path: region_storage_path.clone(),
2247 region_options: HashMap::new(),
2248 region_wal_options: HashMap::new(),
2249 },
2250 ¤t_table_route_value,
2251 new_region_routes.clone(),
2252 &HashMap::new(),
2253 &HashMap::new(),
2254 )
2255 .await
2256 .unwrap();
2257 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2258
2259 table_metadata_manager
2261 .update_table_route(
2262 table_id,
2263 RegionInfo {
2264 engine: engine.to_string(),
2265 region_storage_path: region_storage_path.clone(),
2266 region_options: HashMap::new(),
2267 region_wal_options: HashMap::new(),
2268 },
2269 ¤t_table_route_value,
2270 new_region_routes.clone(),
2271 &HashMap::new(),
2272 &HashMap::new(),
2273 )
2274 .await
2275 .unwrap();
2276
2277 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2278 current_table_route_value
2279 .inner
2280 .update(new_region_routes.clone())
2281 .unwrap(),
2282 );
2283 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2284 table_metadata_manager
2286 .update_table_route(
2287 table_id,
2288 RegionInfo {
2289 engine: engine.to_string(),
2290 region_storage_path: region_storage_path.clone(),
2291 region_options: HashMap::new(),
2292 region_wal_options: HashMap::new(),
2293 },
2294 ¤t_table_route_value,
2295 new_region_routes.clone(),
2296 &HashMap::new(),
2297 &HashMap::new(),
2298 )
2299 .await
2300 .unwrap();
2301 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2302
2303 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2306 current_table_route_value
2307 .update(vec![
2308 new_region_route(1, 1),
2309 new_region_route(2, 2),
2310 new_region_route(3, 3),
2311 new_region_route(4, 4),
2312 ])
2313 .unwrap(),
2314 );
2315 assert!(
2316 table_metadata_manager
2317 .update_table_route(
2318 table_id,
2319 RegionInfo {
2320 engine: engine.to_string(),
2321 region_storage_path: region_storage_path.clone(),
2322 region_options: HashMap::new(),
2323 region_wal_options: HashMap::new(),
2324 },
2325 &wrong_table_route_value,
2326 new_region_routes,
2327 &HashMap::new(),
2328 &HashMap::new(),
2329 )
2330 .await
2331 .is_err()
2332 );
2333 }
2334
2335 #[tokio::test]
2336 async fn test_update_table_route_with_topic_region_mapping() {
2337 let mem_kv = Arc::new(MemoryKvBackend::default());
2338 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2339 let region_route = new_test_region_route();
2340 let region_routes = vec![region_route.clone()];
2341 let table_info = new_test_table_info();
2342 let table_id = table_info.ident.table_id;
2343 let engine = table_info.meta.engine.as_str();
2344 let region_storage_path =
2345 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2346
2347 let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2349 (
2350 1,
2351 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2352 topic: "topic_1".to_string(),
2353 }))
2354 .unwrap(),
2355 ),
2356 (
2357 2,
2358 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2359 topic: "topic_2".to_string(),
2360 }))
2361 .unwrap(),
2362 ),
2363 ]
2364 .into_iter()
2365 .collect();
2366
2367 create_physical_table_metadata(
2368 &table_metadata_manager,
2369 table_info.clone(),
2370 region_routes.clone(),
2371 old_region_wal_options.clone(),
2372 )
2373 .await
2374 .unwrap();
2375
2376 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2377 TableRouteValue::physical(region_routes.clone()),
2378 );
2379
2380 let region_id_1 = RegionId::new(table_id, 1);
2382 let region_id_2 = RegionId::new(table_id, 2);
2383 let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2384 let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2385 assert!(
2386 table_metadata_manager
2387 .topic_region_manager
2388 .get(topic_1_key.clone())
2389 .await
2390 .unwrap()
2391 .is_some()
2392 );
2393 assert!(
2394 table_metadata_manager
2395 .topic_region_manager
2396 .get(topic_2_key.clone())
2397 .await
2398 .unwrap()
2399 .is_some()
2400 );
2401
2402 let new_region_routes = vec![
2404 new_region_route(1, 1),
2405 new_region_route(2, 2),
2406 new_region_route(3, 3), ];
2408 let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2409 (
2410 1,
2411 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2412 topic: "topic_1".to_string(), }))
2414 .unwrap(),
2415 ),
2416 (
2417 2,
2418 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2419 topic: "topic_2".to_string(), }))
2421 .unwrap(),
2422 ),
2423 (
2424 3,
2425 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2426 topic: "topic_3".to_string(), }))
2428 .unwrap(),
2429 ),
2430 ]
2431 .into_iter()
2432 .collect();
2433 let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2434 current_table_route_value
2435 .inner
2436 .update(new_region_routes.clone())
2437 .unwrap(),
2438 );
2439 table_metadata_manager
2440 .update_table_route(
2441 table_id,
2442 RegionInfo {
2443 engine: engine.to_string(),
2444 region_storage_path: region_storage_path.clone(),
2445 region_options: HashMap::new(),
2446 region_wal_options: old_region_wal_options.clone(),
2447 },
2448 ¤t_table_route_value,
2449 new_region_routes.clone(),
2450 &HashMap::new(),
2451 &new_region_wal_options,
2452 )
2453 .await
2454 .unwrap();
2455 let region_id_3 = RegionId::new(table_id, 3);
2457 let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2458 assert!(
2459 table_metadata_manager
2460 .topic_region_manager
2461 .get(topic_3_key)
2462 .await
2463 .unwrap()
2464 .is_some()
2465 );
2466 let newer_region_routes = vec![
2468 new_region_route(1, 1),
2469 ];
2472 let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2473 (
2474 1,
2475 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2476 topic: "topic_1".to_string(), }))
2478 .unwrap(),
2479 ),
2480 (
2481 3,
2482 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2483 topic: "topic_3_new".to_string(), }))
2485 .unwrap(),
2486 ),
2487 ]
2488 .into_iter()
2489 .collect();
2490 table_metadata_manager
2491 .update_table_route(
2492 table_id,
2493 RegionInfo {
2494 engine: engine.to_string(),
2495 region_storage_path: region_storage_path.clone(),
2496 region_options: HashMap::new(),
2497 region_wal_options: new_region_wal_options.clone(),
2498 },
2499 ¤t_table_route_value_updated,
2500 newer_region_routes.clone(),
2501 &HashMap::new(),
2502 &newer_region_wal_options,
2503 )
2504 .await
2505 .unwrap();
2506 let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2508 assert!(
2509 table_metadata_manager
2510 .topic_region_manager
2511 .get(topic_2_key_new)
2512 .await
2513 .unwrap()
2514 .is_none()
2515 );
2516 let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2518 assert!(
2519 table_metadata_manager
2520 .topic_region_manager
2521 .get(topic_3_key_old)
2522 .await
2523 .unwrap()
2524 .is_none()
2525 );
2526 let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2528 assert!(
2529 table_metadata_manager
2530 .topic_region_manager
2531 .get(topic_3_key_new)
2532 .await
2533 .unwrap()
2534 .is_some()
2535 );
2536 assert!(
2538 table_metadata_manager
2539 .topic_region_manager
2540 .get(topic_1_key)
2541 .await
2542 .unwrap()
2543 .is_some()
2544 );
2545 }
2546
2547 #[tokio::test]
2548 async fn test_destroy_table_metadata() {
2549 let mem_kv = Arc::new(MemoryKvBackend::default());
2550 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2551 let table_id = 1025;
2552 let table_name = "foo";
2553 let task = test_create_table_task(table_name, table_id);
2554 let options = create_mock_region_wal_options();
2555 let serialized_options = options
2556 .iter()
2557 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2558 .collect::<HashMap<_, _>>();
2559 table_metadata_manager
2560 .create_table_metadata(
2561 task.table_info,
2562 TableRouteValue::physical(vec![
2563 RegionRoute {
2564 region: Region::new_test(RegionId::new(table_id, 1)),
2565 leader_peer: Some(Peer::empty(1)),
2566 follower_peers: vec![Peer::empty(5)],
2567 leader_state: None,
2568 leader_down_since: None,
2569 write_route_policy: None,
2570 },
2571 RegionRoute {
2572 region: Region::new_test(RegionId::new(table_id, 2)),
2573 leader_peer: Some(Peer::empty(2)),
2574 follower_peers: vec![Peer::empty(4)],
2575 leader_state: None,
2576 leader_down_since: None,
2577 write_route_policy: None,
2578 },
2579 RegionRoute {
2580 region: Region::new_test(RegionId::new(table_id, 3)),
2581 leader_peer: Some(Peer::empty(3)),
2582 follower_peers: vec![],
2583 leader_state: None,
2584 leader_down_since: None,
2585 write_route_policy: None,
2586 },
2587 ]),
2588 serialized_options,
2589 )
2590 .await
2591 .unwrap();
2592 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2593 let table_route_value = table_metadata_manager
2594 .table_route_manager
2595 .table_route_storage()
2596 .get_with_raw_bytes(table_id)
2597 .await
2598 .unwrap()
2599 .unwrap();
2600 table_metadata_manager
2601 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2602 .await
2603 .unwrap();
2604 assert!(mem_kv.is_empty());
2605 }
2606
2607 #[tokio::test]
2608 async fn test_restore_table_metadata() {
2609 let mem_kv = Arc::new(MemoryKvBackend::default());
2610 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2611 let table_id = 1025;
2612 let table_name = "foo";
2613 let task = test_create_table_task(table_name, table_id);
2614 let options = create_mock_region_wal_options();
2615 let serialized_options = options
2616 .iter()
2617 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2618 .collect::<HashMap<_, _>>();
2619 table_metadata_manager
2620 .create_table_metadata(
2621 task.table_info,
2622 TableRouteValue::physical(vec![
2623 RegionRoute {
2624 region: Region::new_test(RegionId::new(table_id, 1)),
2625 leader_peer: Some(Peer::empty(1)),
2626 follower_peers: vec![Peer::empty(5)],
2627 leader_state: None,
2628 leader_down_since: None,
2629 write_route_policy: None,
2630 },
2631 RegionRoute {
2632 region: Region::new_test(RegionId::new(table_id, 2)),
2633 leader_peer: Some(Peer::empty(2)),
2634 follower_peers: vec![Peer::empty(4)],
2635 leader_state: None,
2636 leader_down_since: None,
2637 write_route_policy: None,
2638 },
2639 RegionRoute {
2640 region: Region::new_test(RegionId::new(table_id, 3)),
2641 leader_peer: Some(Peer::empty(3)),
2642 follower_peers: vec![],
2643 leader_state: None,
2644 leader_down_since: None,
2645 write_route_policy: None,
2646 },
2647 ]),
2648 serialized_options,
2649 )
2650 .await
2651 .unwrap();
2652 let expected_result = mem_kv.dump();
2653 let table_route_value = table_metadata_manager
2654 .table_route_manager
2655 .table_route_storage()
2656 .get_with_raw_bytes(table_id)
2657 .await
2658 .unwrap()
2659 .unwrap();
2660 let region_routes = table_route_value.region_routes().unwrap();
2661 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2662 let table_route_value = TableRouteValue::physical(region_routes.clone());
2663 table_metadata_manager
2664 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2665 .await
2666 .unwrap();
2667 table_metadata_manager
2668 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2669 .await
2670 .unwrap();
2671 let kvs = mem_kv.dump();
2672 assert_eq!(kvs, expected_result);
2673 table_metadata_manager
2675 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2676 .await
2677 .unwrap();
2678 let kvs = mem_kv.dump();
2679 assert_eq!(kvs, expected_result);
2680 }
2681
2682 #[tokio::test]
2683 async fn test_create_update_view_info() {
2684 let mem_kv = Arc::new(MemoryKvBackend::default());
2685 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2686
2687 let view_info = new_test_table_info();
2688
2689 let view_id = view_info.ident.table_id;
2690
2691 let logical_plan: Vec<u8> = vec![1, 2, 3];
2692 let columns = vec!["a".to_string()];
2693 let plan_columns = vec!["number".to_string()];
2694 let table_names = new_test_table_names();
2695 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2696
2697 table_metadata_manager
2699 .create_view_metadata(
2700 view_info.clone(),
2701 logical_plan.clone(),
2702 table_names.clone(),
2703 columns.clone(),
2704 plan_columns.clone(),
2705 definition.to_string(),
2706 )
2707 .await
2708 .unwrap();
2709
2710 {
2711 let current_view_info = table_metadata_manager
2713 .view_info_manager()
2714 .get(view_id)
2715 .await
2716 .unwrap()
2717 .unwrap()
2718 .into_inner();
2719 assert_eq!(current_view_info.view_info, logical_plan);
2720 assert_eq!(current_view_info.table_names, table_names);
2721 assert_eq!(current_view_info.definition, definition);
2722 assert_eq!(current_view_info.columns, columns);
2723 assert_eq!(current_view_info.plan_columns, plan_columns);
2724 let current_table_info = table_metadata_manager
2726 .table_info_manager()
2727 .get(view_id)
2728 .await
2729 .unwrap()
2730 .unwrap()
2731 .into_inner();
2732 assert_eq!(current_table_info.table_info, view_info);
2733 }
2734
2735 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2736 let new_table_names = {
2737 let mut set = HashSet::new();
2738 set.insert(TableName {
2739 catalog_name: "greptime".to_string(),
2740 schema_name: "public".to_string(),
2741 table_name: "b_table".to_string(),
2742 });
2743 set.insert(TableName {
2744 catalog_name: "greptime".to_string(),
2745 schema_name: "public".to_string(),
2746 table_name: "c_table".to_string(),
2747 });
2748 set
2749 };
2750 let new_columns = vec!["b".to_string()];
2751 let new_plan_columns = vec!["number2".to_string()];
2752 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2753
2754 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2755 logical_plan.clone(),
2756 table_names,
2757 columns,
2758 plan_columns,
2759 definition.to_string(),
2760 ));
2761 table_metadata_manager
2763 .update_view_info(
2764 view_id,
2765 ¤t_view_info_value,
2766 new_logical_plan.clone(),
2767 new_table_names.clone(),
2768 new_columns.clone(),
2769 new_plan_columns.clone(),
2770 new_definition.to_string(),
2771 )
2772 .await
2773 .unwrap();
2774 table_metadata_manager
2776 .update_view_info(
2777 view_id,
2778 ¤t_view_info_value,
2779 new_logical_plan.clone(),
2780 new_table_names.clone(),
2781 new_columns.clone(),
2782 new_plan_columns.clone(),
2783 new_definition.to_string(),
2784 )
2785 .await
2786 .unwrap();
2787
2788 let updated_view_info = table_metadata_manager
2790 .view_info_manager()
2791 .get(view_id)
2792 .await
2793 .unwrap()
2794 .unwrap()
2795 .into_inner();
2796 assert_eq!(updated_view_info.view_info, new_logical_plan);
2797 assert_eq!(updated_view_info.table_names, new_table_names);
2798 assert_eq!(updated_view_info.definition, new_definition);
2799 assert_eq!(updated_view_info.columns, new_columns);
2800 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2801
2802 let wrong_view_info = logical_plan.clone();
2803 let wrong_definition = "wrong_definition";
2804 let wrong_view_info_value =
2805 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2806 wrong_view_info,
2807 new_table_names.clone(),
2808 new_columns.clone(),
2809 new_plan_columns.clone(),
2810 wrong_definition.to_string(),
2811 ));
2812 assert!(
2815 table_metadata_manager
2816 .update_view_info(
2817 view_id,
2818 &wrong_view_info_value,
2819 new_logical_plan.clone(),
2820 new_table_names.clone(),
2821 vec!["c".to_string()],
2822 vec!["number3".to_string()],
2823 wrong_definition.to_string(),
2824 )
2825 .await
2826 .is_err()
2827 );
2828
2829 let current_view_info = table_metadata_manager
2831 .view_info_manager()
2832 .get(view_id)
2833 .await
2834 .unwrap()
2835 .unwrap()
2836 .into_inner();
2837 assert_eq!(current_view_info.view_info, new_logical_plan);
2838 assert_eq!(current_view_info.table_names, new_table_names);
2839 assert_eq!(current_view_info.definition, new_definition);
2840 assert_eq!(current_view_info.columns, new_columns);
2841 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2842 }
2843
2844 #[test]
2845 fn test_region_role_set_deserialize() {
2846 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2847 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2848 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2849 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2850
2851 let s = r#"[1, 2, 3]"#;
2852 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2853 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2854 assert!(region_role_set.follower_regions.is_empty());
2855 }
2856
2857 #[test]
2858 fn test_region_distribution_deserialize() {
2859 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2860 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2861 assert_eq!(region_distribution.len(), 2);
2862 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2863 assert!(region_distribution[&1].follower_regions.is_empty());
2864 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2865 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2866 }
2867}