1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use futures_util::TryStreamExt;
135use lazy_static::lazy_static;
136use regex::Regex;
137pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
138use serde::de::DeserializeOwned;
139use serde::{Deserialize, Serialize};
140use snafu::{OptionExt, ResultExt, ensure};
141use store_api::storage::RegionNumber;
142use table::metadata::{TableId, TableInfo};
143use table::table_name::TableName;
144use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
145use table_name::{TableNameKey, TableNameManager, TableNameValue};
146use topic_name::TopicNameManager;
147use topic_region::{TopicRegionKey, TopicRegionManager};
148use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
149
150use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
151use self::datanode_table::RegionInfo;
152use self::flow::flow_info::FlowInfoValue;
153use self::flow::flow_name::FlowNameValue;
154use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
155use self::table_route::{TableRouteManager, TableRouteValue};
156use self::tombstone::TombstoneManager;
157use crate::DatanodeId;
158use crate::error::{self, Result, SerdeJsonSnafu};
159use crate::key::flow::flow_state::FlowStateValue;
160use crate::key::node_address::NodeAddressValue;
161use crate::key::table_repart::{TableRepartKey, TableRepartManager};
162use crate::key::table_route::TableRouteKey;
163use crate::key::topic_region::TopicRegionValue;
164use crate::key::txn_helper::TxnOpGetResponseSet;
165use crate::kv_backend::KvBackendRef;
166use crate::kv_backend::txn::{Txn, TxnOp};
167use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
168use crate::rpc::store::BatchDeleteRequest;
169use crate::state_store::PoisonValue;
170use crate::wal_provider::RegionWalOptions;
171
172pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
173pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
174pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
175pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
176pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
177
178pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
179pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
180pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
181pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
182pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
183pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
184pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
185pub const TABLE_REPART_PREFIX: &str = "__table_repart";
186pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
187pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
188pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
190pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
191
192pub const ELECTION_KEY: &str = "__metasrv_election";
194pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
196
197pub const CACHE_KEY_PREFIXES: [&str; 5] = [
199 TABLE_NAME_KEY_PREFIX,
200 CATALOG_NAME_KEY_PREFIX,
201 SCHEMA_NAME_KEY_PREFIX,
202 TABLE_ROUTE_PREFIX,
203 NODE_ADDRESS_PREFIX,
204];
205
206#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
208pub struct RegionRoleSet {
209 pub leader_regions: Vec<RegionNumber>,
211 pub follower_regions: Vec<RegionNumber>,
213}
214
215impl<'de> Deserialize<'de> for RegionRoleSet {
216 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
217 where
218 D: serde::Deserializer<'de>,
219 {
220 #[derive(Deserialize)]
221 #[serde(untagged)]
222 enum RegionRoleSetOrLeaderOnly {
223 Full {
224 leader_regions: Vec<RegionNumber>,
225 follower_regions: Vec<RegionNumber>,
226 },
227 LeaderOnly(Vec<RegionNumber>),
228 }
229 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
230 RegionRoleSetOrLeaderOnly::Full {
231 leader_regions,
232 follower_regions,
233 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
234 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
235 Ok(RegionRoleSet::new(leader_regions, vec![]))
236 }
237 }
238 }
239}
240
241impl RegionRoleSet {
242 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
244 Self {
245 leader_regions,
246 follower_regions,
247 }
248 }
249
250 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
252 self.leader_regions.push(region_number);
253 }
254
255 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
257 self.follower_regions.push(region_number);
258 }
259
260 pub fn sort(&mut self) {
262 self.follower_regions.sort();
263 self.leader_regions.sort();
264 }
265}
266
267pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
271
272pub type FlowId = u32;
274pub type FlowPartitionId = u32;
276
277lazy_static! {
278 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
279}
280
281lazy_static! {
282 static ref TABLE_INFO_KEY_PATTERN: Regex =
283 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287 static ref VIEW_INFO_KEY_PATTERN: Regex =
288 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
289}
290
291lazy_static! {
292 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
293 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
294}
295
296lazy_static! {
297 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
298 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
299}
300
301lazy_static! {
302 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
303 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
304}
305
306lazy_static! {
307 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
308 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
309 ))
310 .unwrap();
311}
312
313lazy_static! {
314 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
316 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
317 ))
318 .unwrap();
319}
320
321lazy_static! {
322 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
324 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
325 ))
326 .unwrap();
327}
328
329lazy_static! {
330 static ref NODE_ADDRESS_PATTERN: Regex =
331 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
332}
333
334lazy_static! {
335 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
336 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
337}
338
339lazy_static! {
340 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
341 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
342 ))
343 .unwrap();
344}
345
346pub trait MetadataKey<'a, T> {
348 fn to_bytes(&self) -> Vec<u8>;
349
350 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
351}
352
353#[derive(Debug, Clone, PartialEq)]
354pub struct BytesAdapter(Vec<u8>);
355
356impl From<Vec<u8>> for BytesAdapter {
357 fn from(value: Vec<u8>) -> Self {
358 Self(value)
359 }
360}
361
362impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
363 fn to_bytes(&self) -> Vec<u8> {
364 self.0.clone()
365 }
366
367 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
368 Ok(BytesAdapter(bytes.to_vec()))
369 }
370}
371
372pub(crate) trait MetadataKeyGetTxnOp {
373 fn build_get_op(
374 &self,
375 ) -> (
376 TxnOp,
377 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
378 );
379}
380
381pub trait MetadataValue {
382 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
383 where
384 Self: Sized;
385
386 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
387}
388
389pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
390
391pub struct TableMetadataManager {
392 table_name_manager: TableNameManager,
393 table_info_manager: TableInfoManager,
394 view_info_manager: ViewInfoManager,
395 datanode_table_manager: DatanodeTableManager,
396 catalog_manager: CatalogManager,
397 schema_manager: SchemaManager,
398 table_route_manager: TableRouteManager,
399 table_repart_manager: TableRepartManager,
400 tombstone_manager: TombstoneManager,
401 topic_name_manager: TopicNameManager,
402 topic_region_manager: TopicRegionManager,
403 kv_backend: KvBackendRef,
404}
405
406#[macro_export]
407macro_rules! ensure_values {
408 ($got:expr, $expected_value:expr, $name:expr) => {
409 ensure!(
410 $got == $expected_value,
411 error::UnexpectedSnafu {
412 err_msg: format!(
413 "Reads the different value: {:?} during {}, expected: {:?}",
414 $got, $name, $expected_value
415 )
416 }
417 );
418 };
419}
420
421pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
431 bytes: Bytes,
433 inner: T,
435}
436
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub struct DroppedTableName {
439 pub table_id: TableId,
441 pub table_name: TableName,
443}
444
445#[derive(Debug, Clone)]
446pub struct DroppedTableMetadata {
447 pub table_id: TableId,
449 pub table_name: TableName,
451 pub table_info_value: TableInfoValue,
453 pub table_route_value: TableRouteValue,
455 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
457}
458
459impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
460 type Target = T;
461
462 fn deref(&self) -> &Self::Target {
463 &self.inner
464 }
465}
466
467impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
468 fn deref_mut(&mut self) -> &mut Self::Target {
469 &mut self.inner
470 }
471}
472
473impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 write!(
476 f,
477 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
478 self.inner, self.bytes
479 )
480 }
481}
482
483impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
484 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
488 where
489 S: serde::Serializer,
490 {
491 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
494 }
495}
496
497impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
498 for DeserializedValueWithBytes<T>
499{
500 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
504 where
505 D: serde::Deserializer<'de>,
506 {
507 let buf = String::deserialize(deserializer)?;
508 let bytes = Bytes::from(buf);
509
510 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
511 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
512
513 Ok(value)
514 }
515}
516
517impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
518 fn clone(&self) -> Self {
519 Self {
520 bytes: self.bytes.clone(),
521 inner: self.inner.clone(),
522 }
523 }
524}
525
526impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
527 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
530 let inner = T::try_from_raw_value(&bytes)?;
531 Ok(Self { bytes, inner })
532 }
533
534 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
537 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
538 }
539
540 pub fn into_inner(self) -> T {
541 self.inner
542 }
543
544 pub fn get_inner_ref(&self) -> &T {
545 &self.inner
546 }
547
548 pub fn get_raw_bytes(&self) -> Vec<u8> {
550 self.bytes.to_vec()
551 }
552
553 #[cfg(any(test, feature = "testing"))]
554 pub fn from_inner(inner: T) -> Self {
555 let bytes = serde_json::to_vec(&inner).unwrap();
556
557 Self {
558 bytes: Bytes::from(bytes),
559 inner,
560 }
561 }
562}
563
564impl TableMetadataManager {
565 pub fn new(kv_backend: KvBackendRef) -> Self {
566 TableMetadataManager {
567 table_name_manager: TableNameManager::new(kv_backend.clone()),
568 table_info_manager: TableInfoManager::new(kv_backend.clone()),
569 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
570 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
571 catalog_manager: CatalogManager::new(kv_backend.clone()),
572 schema_manager: SchemaManager::new(kv_backend.clone()),
573 table_route_manager: TableRouteManager::new(kv_backend.clone()),
574 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
575 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
576 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
577 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
578 kv_backend,
579 }
580 }
581
582 pub fn new_with_custom_tombstone_prefix(
584 kv_backend: KvBackendRef,
585 tombstone_prefix: &str,
586 ) -> Self {
587 Self {
588 table_name_manager: TableNameManager::new(kv_backend.clone()),
589 table_info_manager: TableInfoManager::new(kv_backend.clone()),
590 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
591 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
592 catalog_manager: CatalogManager::new(kv_backend.clone()),
593 schema_manager: SchemaManager::new(kv_backend.clone()),
594 table_route_manager: TableRouteManager::new(kv_backend.clone()),
595 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
596 tombstone_manager: TombstoneManager::new_with_prefix(
597 kv_backend.clone(),
598 tombstone_prefix,
599 ),
600 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
601 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
602 kv_backend,
603 }
604 }
605
606 pub async fn init(&self) -> Result<()> {
607 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
608
609 self.catalog_manager().create(catalog_name, true).await?;
610
611 let internal_schemas = [
612 DEFAULT_SCHEMA_NAME,
613 INFORMATION_SCHEMA_NAME,
614 DEFAULT_PRIVATE_SCHEMA_NAME,
615 ];
616
617 for schema_name in internal_schemas {
618 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
619
620 self.schema_manager().create(schema_key, None, true).await?;
621 }
622
623 Ok(())
624 }
625
626 pub fn table_name_manager(&self) -> &TableNameManager {
627 &self.table_name_manager
628 }
629
630 pub fn table_info_manager(&self) -> &TableInfoManager {
631 &self.table_info_manager
632 }
633
634 pub fn view_info_manager(&self) -> &ViewInfoManager {
635 &self.view_info_manager
636 }
637
638 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
639 &self.datanode_table_manager
640 }
641
642 pub fn catalog_manager(&self) -> &CatalogManager {
643 &self.catalog_manager
644 }
645
646 pub fn schema_manager(&self) -> &SchemaManager {
647 &self.schema_manager
648 }
649
650 pub fn table_route_manager(&self) -> &TableRouteManager {
651 &self.table_route_manager
652 }
653
654 pub fn table_repart_manager(&self) -> &TableRepartManager {
655 &self.table_repart_manager
656 }
657
658 pub fn topic_name_manager(&self) -> &TopicNameManager {
659 &self.topic_name_manager
660 }
661
662 pub fn topic_region_manager(&self) -> &TopicRegionManager {
663 &self.topic_region_manager
664 }
665
666 pub fn kv_backend(&self) -> &KvBackendRef {
667 &self.kv_backend
668 }
669
670 pub async fn get_full_table_info(
671 &self,
672 table_id: TableId,
673 ) -> Result<(
674 Option<DeserializedValueWithBytes<TableInfoValue>>,
675 Option<DeserializedValueWithBytes<TableRouteValue>>,
676 )> {
677 let table_info_key = TableInfoKey::new(table_id);
678 let table_route_key = TableRouteKey::new(table_id);
679 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
680 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
681
682 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
683 let mut res = self.kv_backend.txn(txn).await?;
684 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
685 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
686 let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
687 if let Some(table_route_value) = &mut table_route_value {
688 self.table_route_manager()
689 .table_route_storage()
690 .remap_table_route(table_route_value)
691 .await?;
692 }
693 Ok((table_info_value, table_route_value))
694 }
695
696 pub async fn create_view_metadata(
706 &self,
707 view_info: TableInfo,
708 raw_logical_plan: Vec<u8>,
709 table_names: HashSet<TableName>,
710 columns: Vec<String>,
711 plan_columns: Vec<String>,
712 definition: String,
713 ) -> Result<()> {
714 let view_id = view_info.ident.table_id;
715
716 let view_name = TableNameKey::new(
718 &view_info.catalog_name,
719 &view_info.schema_name,
720 &view_info.name,
721 );
722 let create_table_name_txn = self
723 .table_name_manager()
724 .build_create_txn(&view_name, view_id)?;
725
726 let table_info_value = TableInfoValue::new(view_info);
728
729 let (create_table_info_txn, on_create_table_info_failure) = self
730 .table_info_manager()
731 .build_create_txn(view_id, &table_info_value)?;
732
733 let view_info_value = ViewInfoValue::new(
735 raw_logical_plan.into(),
736 table_names,
737 columns,
738 plan_columns,
739 definition,
740 );
741 let (create_view_info_txn, on_create_view_info_failure) = self
742 .view_info_manager()
743 .build_create_txn(view_id, &view_info_value)?;
744
745 let txn = Txn::merge_all(vec![
746 create_table_name_txn,
747 create_table_info_txn,
748 create_view_info_txn,
749 ]);
750
751 let mut r = self.kv_backend.txn(txn).await?;
752
753 if !r.succeeded {
755 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
756 let remote_table_info = on_create_table_info_failure(&mut set)?
757 .context(error::UnexpectedSnafu {
758 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
759 })?
760 .into_inner();
761
762 let remote_view_info = on_create_view_info_failure(&mut set)?
763 .context(error::UnexpectedSnafu {
764 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
765 })?
766 .into_inner();
767
768 let op_name = "the creating view metadata";
769 ensure_values!(remote_table_info, table_info_value, op_name);
770 ensure_values!(remote_view_info, view_info_value, op_name);
771 }
772
773 Ok(())
774 }
775
776 pub async fn create_table_metadata(
779 &self,
780 table_info: TableInfo,
781 table_route_value: TableRouteValue,
782 region_wal_options: RegionWalOptions,
783 ) -> Result<()> {
784 let table_id = table_info.ident.table_id;
785 let engine = table_info.meta.engine.clone();
786
787 let table_name = TableNameKey::new(
789 &table_info.catalog_name,
790 &table_info.schema_name,
791 &table_info.name,
792 );
793 let create_table_name_txn = self
794 .table_name_manager()
795 .build_create_txn(&table_name, table_id)?;
796
797 let region_options = table_info.to_region_options();
798 let table_info_value = TableInfoValue::new(table_info);
800 let (create_table_info_txn, on_create_table_info_failure) = self
801 .table_info_manager()
802 .build_create_txn(table_id, &table_info_value)?;
803
804 let (create_table_route_txn, on_create_table_route_failure) = self
805 .table_route_manager()
806 .table_route_storage()
807 .build_create_txn(table_id, &table_route_value)?;
808
809 let create_topic_region_txn = self
810 .topic_region_manager
811 .build_create_txn(table_id, ®ion_wal_options)?;
812
813 let mut txn = Txn::merge_all(vec![
814 create_table_name_txn,
815 create_table_info_txn,
816 create_table_route_txn,
817 create_topic_region_txn,
818 ]);
819
820 if let TableRouteValue::Physical(x) = &table_route_value {
821 let region_storage_path = table_info_value.region_storage_path();
822 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
823 table_id,
824 &engine,
825 ®ion_storage_path,
826 region_options,
827 region_wal_options,
828 region_distribution(&x.region_routes),
829 )?;
830 txn = txn.merge(create_datanode_table_txn);
831 }
832
833 let mut r = self.kv_backend.txn(txn).await?;
834
835 if !r.succeeded {
837 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
838 let remote_table_info = on_create_table_info_failure(&mut set)?
839 .context(error::UnexpectedSnafu {
840 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
841 })?
842 .into_inner();
843
844 let remote_table_route = on_create_table_route_failure(&mut set)?
845 .context(error::UnexpectedSnafu {
846 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
847 })?
848 .into_inner();
849
850 let op_name = "the creating table metadata";
851 ensure_values!(remote_table_info, table_info_value, op_name);
852 ensure_values!(remote_table_route, table_route_value, op_name);
853 }
854
855 Ok(())
856 }
857
858 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
859 self.kv_backend.max_txn_ops() / 3
862 }
863
864 pub async fn create_logical_tables_metadata(
866 &self,
867 tables_data: Vec<(TableInfo, TableRouteValue)>,
868 ) -> Result<()> {
869 let len = tables_data.len();
870 let mut txns = Vec::with_capacity(3 * len);
871 struct OnFailure<F1, R1, F2, R2>
872 where
873 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
874 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
875 {
876 table_info_value: TableInfoValue,
877 on_create_table_info_failure: F1,
878 table_route_value: TableRouteValue,
879 on_create_table_route_failure: F2,
880 }
881 let mut on_failures = Vec::with_capacity(len);
882 for (table_info, table_route_value) in tables_data {
883 let table_id = table_info.ident.table_id;
884
885 let table_name = TableNameKey::new(
887 &table_info.catalog_name,
888 &table_info.schema_name,
889 &table_info.name,
890 );
891 let create_table_name_txn = self
892 .table_name_manager()
893 .build_create_txn(&table_name, table_id)?;
894 txns.push(create_table_name_txn);
895
896 let table_info_value = TableInfoValue::new(table_info);
898 let (create_table_info_txn, on_create_table_info_failure) =
899 self.table_info_manager()
900 .build_create_txn(table_id, &table_info_value)?;
901 txns.push(create_table_info_txn);
902
903 let (create_table_route_txn, on_create_table_route_failure) = self
904 .table_route_manager()
905 .table_route_storage()
906 .build_create_txn(table_id, &table_route_value)?;
907 txns.push(create_table_route_txn);
908
909 on_failures.push(OnFailure {
910 table_info_value,
911 on_create_table_info_failure,
912 table_route_value,
913 on_create_table_route_failure,
914 });
915 }
916
917 let txn = Txn::merge_all(txns);
918 let mut r = self.kv_backend.txn(txn).await?;
919
920 if !r.succeeded {
922 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
923 for on_failure in on_failures {
924 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
925 .context(error::UnexpectedSnafu {
926 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
927 })?
928 .into_inner();
929
930 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
931 .context(error::UnexpectedSnafu {
932 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
933 })?
934 .into_inner();
935
936 let op_name = "the creating logical tables metadata";
937 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
938 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
939 }
940 }
941
942 Ok(())
943 }
944
945 fn table_metadata_keys(
946 &self,
947 table_id: TableId,
948 table_name: &TableName,
949 table_route_value: &TableRouteValue,
950 region_wal_options: &HashMap<RegionNumber, WalOptions>,
951 ) -> Result<Vec<Vec<u8>>> {
952 let datanode_ids = if table_route_value.is_physical() {
954 region_distribution(table_route_value.region_routes()?)
955 .into_keys()
956 .collect()
957 } else {
958 vec![]
959 };
960 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
961 let table_name = TableNameKey::new(
962 &table_name.catalog_name,
963 &table_name.schema_name,
964 &table_name.table_name,
965 );
966 let table_info_key = TableInfoKey::new(table_id);
967 let table_route_key = TableRouteKey::new(table_id);
968 let table_repart_key = TableRepartKey::new(table_id);
969 let datanode_table_keys = datanode_ids
970 .into_iter()
971 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
972 .collect::<HashSet<_>>();
973 let topic_region_map = self
974 .topic_region_manager
975 .get_topic_region_mapping(table_id, region_wal_options);
976 let topic_region_keys = topic_region_map
977 .iter()
978 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
979 .collect::<Vec<_>>();
980 keys.push(table_name.to_bytes());
981 keys.push(table_info_key.to_bytes());
982 keys.push(table_route_key.to_bytes());
983 keys.push(table_repart_key.to_bytes());
984 for key in &datanode_table_keys {
985 keys.push(key.to_bytes());
986 }
987 for key in topic_region_keys {
988 keys.push(key.to_bytes());
989 }
990 Ok(keys)
991 }
992
993 pub async fn delete_table_metadata(
996 &self,
997 table_id: TableId,
998 table_name: &TableName,
999 table_route_value: &TableRouteValue,
1000 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1001 ) -> Result<()> {
1002 let keys =
1003 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1004 self.tombstone_manager.create(keys).await.map(|_| ())
1005 }
1006
1007 pub async fn list_dropped_tables(&self) -> Result<Vec<DroppedTableName>> {
1009 let mut stream = self.tombstone_manager.tombstoned_table_names();
1010 let mut dropped_tables = Vec::new();
1011
1012 while let Some(kv) = stream.try_next().await? {
1013 let raw_key = self.tombstone_manager.strip_tombstone_prefix(&kv.key)?;
1014 let table_name = TableNameKey::from_bytes(raw_key)?.into();
1015 let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1016 dropped_tables.push(DroppedTableName {
1017 table_id,
1018 table_name,
1019 });
1020 }
1021
1022 Ok(dropped_tables)
1023 }
1024
1025 pub async fn get_dropped_table(
1027 &self,
1028 table_name: &TableName,
1029 ) -> Result<Option<DroppedTableMetadata>> {
1030 let table_name_key = TableNameKey::from(table_name);
1031 let Some(kv) = self
1032 .tombstone_manager
1033 .get(&table_name_key.to_bytes())
1034 .await?
1035 else {
1036 return Ok(None);
1037 };
1038
1039 let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1040 self.get_dropped_table_metadata(table_id, table_name.clone())
1041 .await
1042 }
1043
1044 pub async fn get_dropped_table_by_id(
1046 &self,
1047 table_id: TableId,
1048 ) -> Result<Option<DroppedTableMetadata>> {
1049 self.get_dropped_table_metadata(table_id, None).await
1050 }
1051
1052 pub async fn delete_table_metadata_tombstone(
1055 &self,
1056 table_id: TableId,
1057 table_name: &TableName,
1058 table_route_value: &TableRouteValue,
1059 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1060 ) -> Result<()> {
1061 let table_metadata_keys =
1062 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1063 self.tombstone_manager
1064 .delete(table_metadata_keys)
1065 .await
1066 .map(|_| ())
1067 }
1068
1069 pub async fn restore_table_metadata(
1072 &self,
1073 table_id: TableId,
1074 table_name: &TableName,
1075 table_route_value: &TableRouteValue,
1076 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1077 ) -> Result<()> {
1078 let keys =
1079 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1080 self.tombstone_manager.restore(keys).await.map(|_| ())
1081 }
1082
1083 pub async fn destroy_table_metadata(
1086 &self,
1087 table_id: TableId,
1088 table_name: &TableName,
1089 table_route_value: &TableRouteValue,
1090 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1091 ) -> Result<()> {
1092 let keys =
1093 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1094 let _ = self
1095 .kv_backend
1096 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1097 .await?;
1098 Ok(())
1099 }
1100
1101 async fn get_dropped_table_metadata<T>(
1103 &self,
1104 table_id: TableId,
1105 table_name: T,
1106 ) -> Result<Option<DroppedTableMetadata>>
1107 where
1108 T: Into<Option<TableName>>,
1109 {
1110 let table_info_key = TableInfoKey::new(table_id);
1111 let Some(table_info_kv) = self
1112 .tombstone_manager
1113 .get(&table_info_key.to_bytes())
1114 .await?
1115 else {
1116 return Ok(None);
1117 };
1118
1119 let table_info_value = TableInfoValue::try_from_raw_value(&table_info_kv.value)?;
1120 let table_name = table_name
1121 .into()
1122 .unwrap_or_else(|| table_info_value.table_name());
1123
1124 let table_route_key = TableRouteKey::new(table_id);
1125 let table_route_kv = self
1126 .tombstone_manager
1127 .get(&table_route_key.to_bytes())
1128 .await?
1129 .with_context(|| error::UnexpectedSnafu {
1130 err_msg: format!("Missing tombstoned table route metadata for table id {table_id}"),
1131 })?;
1132 let mut table_route_value = TableRouteValue::try_from_raw_value(&table_route_kv.value)?;
1133 self.table_route_manager
1134 .table_route_storage()
1135 .remap_table_route(&mut table_route_value)
1136 .await?;
1137
1138 let region_wal_options = self
1139 .dropped_region_wal_options(table_id, &table_route_value)
1140 .await?;
1141
1142 Ok(Some(DroppedTableMetadata {
1143 table_id,
1144 table_name,
1145 table_info_value,
1146 table_route_value,
1147 region_wal_options,
1148 }))
1149 }
1150
1151 async fn dropped_region_wal_options(
1153 &self,
1154 table_id: TableId,
1155 table_route_value: &TableRouteValue,
1156 ) -> Result<HashMap<RegionNumber, WalOptions>> {
1157 let mut region_wal_options = HashMap::new();
1158 let datanode_table_keys = region_distribution(table_route_value.region_routes()?)
1159 .into_keys()
1160 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
1161 .collect::<Vec<_>>();
1162 let datanode_table_key_bytes = datanode_table_keys
1163 .iter()
1164 .map(|key| key.to_bytes())
1165 .collect::<Vec<_>>();
1166 let datanode_table_values = self
1167 .tombstone_manager
1168 .batch_get(&datanode_table_key_bytes)
1169 .await?;
1170
1171 for datanode_table_key in datanode_table_keys {
1172 let Some(kv) = datanode_table_values.get(&datanode_table_key.to_bytes()) else {
1173 continue;
1174 };
1175
1176 let datanode_table_value = DatanodeTableValue::try_from_raw_value(&kv.value)?;
1177 for (region_number, wal_options) in &datanode_table_value.region_info.region_wal_options
1178 {
1179 region_wal_options.insert(*region_number, wal_options.clone());
1180 }
1181 }
1182
1183 Ok(region_wal_options)
1184 }
1185
1186 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1187 let mut keys = Vec::with_capacity(3);
1188 let view_name = TableNameKey::new(
1189 &view_name.catalog_name,
1190 &view_name.schema_name,
1191 &view_name.table_name,
1192 );
1193 let table_info_key = TableInfoKey::new(view_id);
1194 let view_info_key = ViewInfoKey::new(view_id);
1195 keys.push(view_name.to_bytes());
1196 keys.push(table_info_key.to_bytes());
1197 keys.push(view_info_key.to_bytes());
1198
1199 Ok(keys)
1200 }
1201
1202 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1205 let keys = self.view_info_keys(view_id, view_name)?;
1206 let _ = self
1207 .kv_backend
1208 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1209 .await?;
1210 Ok(())
1211 }
1212
1213 pub async fn rename_table(
1217 &self,
1218 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1219 new_table_name: String,
1220 ) -> Result<()> {
1221 let current_table_info = ¤t_table_info_value.table_info;
1222 let table_id = current_table_info.ident.table_id;
1223
1224 let table_name_key = TableNameKey::new(
1225 ¤t_table_info.catalog_name,
1226 ¤t_table_info.schema_name,
1227 ¤t_table_info.name,
1228 );
1229
1230 let new_table_name_key = TableNameKey::new(
1231 ¤t_table_info.catalog_name,
1232 ¤t_table_info.schema_name,
1233 &new_table_name,
1234 );
1235
1236 let update_table_name_txn = self.table_name_manager().build_update_txn(
1238 &table_name_key,
1239 &new_table_name_key,
1240 table_id,
1241 )?;
1242
1243 let new_table_info_value = current_table_info_value
1244 .inner
1245 .with_update(move |table_info| {
1246 table_info.name = new_table_name;
1247 });
1248
1249 let (update_table_info_txn, on_update_table_info_failure) = self
1251 .table_info_manager()
1252 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1253
1254 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1255
1256 let mut r = self.kv_backend.txn(txn).await?;
1257
1258 if !r.succeeded {
1260 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1261 let remote_table_info = on_update_table_info_failure(&mut set)?
1262 .context(error::UnexpectedSnafu {
1263 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1264 })?
1265 .into_inner();
1266
1267 let op_name = "the renaming table metadata";
1268 ensure_values!(remote_table_info, new_table_info_value, op_name);
1269 }
1270
1271 Ok(())
1272 }
1273
1274 pub async fn update_table_info(
1278 &self,
1279 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1280 region_distribution: Option<RegionDistribution>,
1281 new_table_info: TableInfo,
1282 ) -> Result<()> {
1283 let table_id = current_table_info_value.table_info.ident.table_id;
1284 let new_table_info_value = current_table_info_value.update(new_table_info);
1285
1286 let (update_table_info_txn, on_update_table_info_failure) = self
1288 .table_info_manager()
1289 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1290
1291 let txn = if let Some(region_distribution) = region_distribution {
1292 let new_region_options = new_table_info_value.table_info.to_region_options();
1294 let update_datanode_table_options_txn = self
1295 .datanode_table_manager
1296 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1297 .await?;
1298 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1299 } else {
1300 update_table_info_txn
1301 };
1302
1303 let mut r = self.kv_backend.txn(txn).await?;
1304 if !r.succeeded {
1306 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1307 let remote_table_info = on_update_table_info_failure(&mut set)?
1308 .context(error::UnexpectedSnafu {
1309 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1310 })?
1311 .into_inner();
1312
1313 let op_name = "the updating table info";
1314 ensure_values!(remote_table_info, new_table_info_value, op_name);
1315 }
1316 Ok(())
1317 }
1318
1319 #[allow(clippy::too_many_arguments)]
1330 pub async fn update_view_info(
1331 &self,
1332 view_id: TableId,
1333 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1334 new_view_info: Vec<u8>,
1335 table_names: HashSet<TableName>,
1336 columns: Vec<String>,
1337 plan_columns: Vec<String>,
1338 definition: String,
1339 ) -> Result<()> {
1340 let new_view_info_value = current_view_info_value.update(
1341 new_view_info.into(),
1342 table_names,
1343 columns,
1344 plan_columns,
1345 definition,
1346 );
1347
1348 let (update_view_info_txn, on_update_view_info_failure) = self
1350 .view_info_manager()
1351 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1352
1353 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1354
1355 if !r.succeeded {
1357 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1358 let remote_view_info = on_update_view_info_failure(&mut set)?
1359 .context(error::UnexpectedSnafu {
1360 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1361 })?
1362 .into_inner();
1363
1364 let op_name = "the updating view info";
1365 ensure_values!(remote_view_info, new_view_info_value, op_name);
1366 }
1367 Ok(())
1368 }
1369
1370 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1371 self.kv_backend.max_txn_ops()
1372 }
1373
1374 pub async fn batch_update_table_info_values(
1375 &self,
1376 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1377 ) -> Result<()> {
1378 let len = table_info_value_pairs.len();
1379 let mut txns = Vec::with_capacity(len);
1380 struct OnFailure<F, R>
1381 where
1382 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1383 {
1384 table_info_value: TableInfoValue,
1385 on_update_table_info_failure: F,
1386 }
1387 let mut on_failures = Vec::with_capacity(len);
1388
1389 for (table_info_value, new_table_info) in table_info_value_pairs {
1390 let table_id = table_info_value.table_info.ident.table_id;
1391
1392 let new_table_info_value = table_info_value.update(new_table_info);
1393
1394 let (update_table_info_txn, on_update_table_info_failure) =
1395 self.table_info_manager().build_update_txn(
1396 table_id,
1397 &table_info_value,
1398 &new_table_info_value,
1399 )?;
1400
1401 txns.push(update_table_info_txn);
1402
1403 on_failures.push(OnFailure {
1404 table_info_value: new_table_info_value,
1405 on_update_table_info_failure,
1406 });
1407 }
1408
1409 let txn = Txn::merge_all(txns);
1410 let mut r = self.kv_backend.txn(txn).await?;
1411
1412 if !r.succeeded {
1413 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1414 for on_failure in on_failures {
1415 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1416 .context(error::UnexpectedSnafu {
1417 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1418 })?
1419 .into_inner();
1420
1421 let op_name = "the batch updating table info";
1422 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1423 }
1424 }
1425
1426 Ok(())
1427 }
1428
1429 pub async fn update_table_route(
1430 &self,
1431 table_id: TableId,
1432 region_info: RegionInfo,
1433 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1434 new_region_routes: Vec<RegionRoute>,
1435 new_region_options: &HashMap<String, String>,
1436 new_region_wal_options: &RegionWalOptions,
1437 ) -> Result<()> {
1438 let current_region_distribution =
1440 region_distribution(current_table_route_value.region_routes()?);
1441 let new_region_distribution = region_distribution(&new_region_routes);
1442
1443 let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1444 table_id,
1445 ®ion_info.region_wal_options,
1446 new_region_wal_options,
1447 )?;
1448 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1449 table_id,
1450 region_info,
1451 current_region_distribution,
1452 new_region_distribution,
1453 new_region_options,
1454 new_region_wal_options,
1455 )?;
1456
1457 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1459 let (update_table_route_txn, on_update_table_route_failure) = self
1460 .table_route_manager()
1461 .table_route_storage()
1462 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1463
1464 let txn = Txn::merge_all(vec![
1465 update_datanode_table_txn,
1466 update_table_route_txn,
1467 update_topic_region_txn,
1468 ]);
1469
1470 let mut r = self.kv_backend.txn(txn).await?;
1471
1472 if !r.succeeded {
1474 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1475 let remote_table_route = on_update_table_route_failure(&mut set)?
1476 .context(error::UnexpectedSnafu {
1477 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1478 })?
1479 .into_inner();
1480
1481 let op_name = "the updating table route";
1482 ensure_values!(remote_table_route, new_table_route_value, op_name);
1483 }
1484
1485 Ok(())
1486 }
1487
1488 pub async fn update_leader_region_status<F>(
1490 &self,
1491 table_id: TableId,
1492 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1493 next_region_route_status: F,
1494 ) -> Result<()>
1495 where
1496 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1497 {
1498 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1499
1500 let mut updated = 0;
1501 for route in &mut new_region_routes {
1502 if let Some(state) = next_region_route_status(route)
1503 && route.set_leader_state(state)
1504 {
1505 updated += 1;
1506 }
1507 }
1508
1509 if updated == 0 {
1510 warn!("No leader status updated");
1511 return Ok(());
1512 }
1513
1514 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1516
1517 let (update_table_route_txn, on_update_table_route_failure) = self
1518 .table_route_manager()
1519 .table_route_storage()
1520 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1521
1522 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1523
1524 if !r.succeeded {
1526 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1527 let remote_table_route = on_update_table_route_failure(&mut set)?
1528 .context(error::UnexpectedSnafu {
1529 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1530 })?
1531 .into_inner();
1532
1533 let op_name = "the updating leader region status";
1534 ensure_values!(remote_table_route, new_table_route_value, op_name);
1535 }
1536
1537 Ok(())
1538 }
1539}
1540
1541#[macro_export]
1542macro_rules! impl_metadata_value {
1543 ($($val_ty: ty), *) => {
1544 $(
1545 impl $crate::key::MetadataValue for $val_ty {
1546 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1547 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1548 }
1549
1550 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1551 serde_json::to_vec(self).context(SerdeJsonSnafu)
1552 }
1553 }
1554 )*
1555 }
1556}
1557
1558macro_rules! impl_metadata_key_get_txn_op {
1559 ($($key: ty), *) => {
1560 $(
1561 impl $crate::key::MetadataKeyGetTxnOp for $key {
1562 fn build_get_op(
1565 &self,
1566 ) -> (
1567 TxnOp,
1568 impl for<'a> FnMut(
1569 &'a mut TxnOpGetResponseSet,
1570 ) -> Option<Vec<u8>>,
1571 ) {
1572 let raw_key = self.to_bytes();
1573 (
1574 TxnOp::Get(raw_key.clone()),
1575 TxnOpGetResponseSet::filter(raw_key),
1576 )
1577 }
1578 }
1579 )*
1580 }
1581}
1582
1583impl_metadata_key_get_txn_op! {
1584 TableNameKey<'_>,
1585 TableInfoKey,
1586 ViewInfoKey,
1587 TableRouteKey,
1588 DatanodeTableKey
1589}
1590
1591#[macro_export]
1592macro_rules! impl_optional_metadata_value {
1593 ($($val_ty: ty), *) => {
1594 $(
1595 impl $val_ty {
1596 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1597 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1598 }
1599
1600 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1601 serde_json::to_vec(self).context(SerdeJsonSnafu)
1602 }
1603 }
1604 )*
1605 }
1606}
1607
1608impl_metadata_value! {
1609 TableNameValue,
1610 TableInfoValue,
1611 ViewInfoValue,
1612 DatanodeTableValue,
1613 FlowInfoValue,
1614 FlowNameValue,
1615 FlowRouteValue,
1616 TableFlowValue,
1617 NodeAddressValue,
1618 SchemaNameValue,
1619 FlowStateValue,
1620 PoisonValue,
1621 TopicRegionValue
1622}
1623
1624impl_optional_metadata_value! {
1625 CatalogNameValue,
1626 SchemaNameValue
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use std::collections::{BTreeMap, HashMap, HashSet};
1632 use std::sync::Arc;
1633
1634 use bytes::Bytes;
1635 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1636 use common_time::util::current_time_millis;
1637 use common_wal::options::{KafkaWalOptions, WalOptions};
1638 use futures::TryStreamExt;
1639 use store_api::storage::{RegionId, RegionNumber};
1640 use table::metadata::TableInfo;
1641 use table::table_name::TableName;
1642
1643 use super::datanode_table::DatanodeTableKey;
1644 use super::test_utils;
1645 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1646 use crate::ddl::test_util::create_table::test_create_table_task;
1647 use crate::ddl::utils::region_storage_path;
1648 use crate::error::Result;
1649 use crate::key::datanode_table::RegionInfo;
1650 use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1651 use crate::key::table_info::TableInfoValue;
1652 use crate::key::table_name::TableNameKey;
1653 use crate::key::table_route::TableRouteValue;
1654 use crate::key::topic_region::TopicRegionKey;
1655 use crate::key::{
1656 DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1657 TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1658 };
1659 use crate::kv_backend::KvBackend;
1660 use crate::kv_backend::memory::MemoryKvBackend;
1661 use crate::kv_backend::read_only::ReadOnlyKvBackend;
1662 use crate::peer::Peer;
1663 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1664 use crate::rpc::store::{PutRequest, RangeRequest};
1665 use crate::wal_provider::{RegionWalOptions, WalProvider};
1666
1667 #[test]
1668 fn test_deserialized_value_with_bytes() {
1669 let region_route = new_test_region_route();
1670 let region_routes = vec![region_route.clone()];
1671
1672 let expected_region_routes =
1673 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1674 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1675
1676 let value = DeserializedValueWithBytes {
1679 inner: TableRouteValue::physical(region_routes.clone()),
1681 bytes: Bytes::from(expected.clone()),
1682 };
1683
1684 let encoded = serde_json::to_vec(&value).unwrap();
1685
1686 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1689 serde_json::from_slice(&encoded).unwrap();
1690
1691 assert_eq!(decoded.inner, expected_region_routes);
1692 assert_eq!(decoded.bytes, expected);
1693 }
1694
1695 fn new_test_region_route() -> RegionRoute {
1696 new_region_route(1, 2)
1697 }
1698
1699 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1700 RegionRoute {
1701 region: Region {
1702 id: region_id.into(),
1703 name: "r1".to_string(),
1704 attrs: BTreeMap::new(),
1705 partition_expr: Default::default(),
1706 },
1707 leader_peer: Some(Peer::new(datanode, "a2")),
1708 follower_peers: vec![],
1709 leader_state: None,
1710 leader_down_since: None,
1711 write_route_policy: None,
1712 }
1713 }
1714
1715 fn new_test_table_info() -> TableInfo {
1716 test_utils::new_test_table_info(10)
1717 }
1718
1719 fn new_test_table_names() -> HashSet<TableName> {
1720 let mut set = HashSet::new();
1721 set.insert(TableName {
1722 catalog_name: "greptime".to_string(),
1723 schema_name: "public".to_string(),
1724 table_name: "a_table".to_string(),
1725 });
1726 set.insert(TableName {
1727 catalog_name: "greptime".to_string(),
1728 schema_name: "public".to_string(),
1729 table_name: "b_table".to_string(),
1730 });
1731 set
1732 }
1733
1734 async fn create_physical_table_metadata(
1735 table_metadata_manager: &TableMetadataManager,
1736 table_info: TableInfo,
1737 region_routes: Vec<RegionRoute>,
1738 region_wal_options: RegionWalOptions,
1739 ) -> Result<()> {
1740 table_metadata_manager
1741 .create_table_metadata(
1742 table_info,
1743 TableRouteValue::physical(region_routes),
1744 region_wal_options,
1745 )
1746 .await
1747 }
1748
1749 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1750 let topics = (0..2)
1751 .map(|i| format!("greptimedb_topic{}", i))
1752 .collect::<Vec<_>>();
1753 let wal_options = topics
1754 .iter()
1755 .map(|topic| WalOptions::Kafka(KafkaWalOptions::new(topic.clone())))
1756 .collect::<Vec<_>>();
1757
1758 (0..16)
1759 .enumerate()
1760 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1761 .collect()
1762 }
1763
1764 fn create_mixed_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1765 HashMap::from([
1766 (
1767 0,
1768 WalOptions::Kafka(KafkaWalOptions::new("greptimedb_topic0".to_string())),
1769 ),
1770 (1, WalOptions::RaftEngine),
1771 (2, WalOptions::Noop),
1772 (
1773 3,
1774 WalOptions::Kafka(KafkaWalOptions::new("greptimedb_topic1".to_string())),
1775 ),
1776 ])
1777 }
1778
1779 #[tokio::test]
1780 async fn test_raft_engine_topic_region_map() {
1781 let mem_kv = Arc::new(MemoryKvBackend::default());
1782 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1783 let region_route = new_test_region_route();
1784 let region_routes = &vec![region_route.clone()];
1785 let table_info = new_test_table_info();
1786 let wal_provider = WalProvider::RaftEngine;
1787 let regions: Vec<_> = (0..16).collect();
1788 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1789 create_physical_table_metadata(
1790 &table_metadata_manager,
1791 table_info.clone(),
1792 region_routes.clone(),
1793 region_wal_options.clone(),
1794 )
1795 .await
1796 .unwrap();
1797
1798 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1799 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1800 let resp = mem_kv.range(range_req).await.unwrap();
1801 assert!(resp.kvs.is_empty());
1803 }
1804
1805 #[tokio::test]
1806 async fn test_create_table_metadata() {
1807 let mem_kv = Arc::new(MemoryKvBackend::default());
1808 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1809 let region_route = new_test_region_route();
1810 let region_routes = &vec![region_route.clone()];
1811 let table_info = new_test_table_info();
1812 let region_wal_options = create_mock_region_wal_options();
1813
1814 create_physical_table_metadata(
1816 &table_metadata_manager,
1817 table_info.clone(),
1818 region_routes.clone(),
1819 region_wal_options.clone(),
1820 )
1821 .await
1822 .unwrap();
1823
1824 assert!(
1826 create_physical_table_metadata(
1827 &table_metadata_manager,
1828 table_info.clone(),
1829 region_routes.clone(),
1830 region_wal_options.clone(),
1831 )
1832 .await
1833 .is_ok()
1834 );
1835
1836 let mut modified_region_routes = region_routes.clone();
1837 modified_region_routes.push(region_route.clone());
1838 assert!(
1840 create_physical_table_metadata(
1841 &table_metadata_manager,
1842 table_info.clone(),
1843 modified_region_routes,
1844 region_wal_options.clone(),
1845 )
1846 .await
1847 .is_err()
1848 );
1849
1850 let (remote_table_info, remote_table_route) = table_metadata_manager
1851 .get_full_table_info(10)
1852 .await
1853 .unwrap();
1854
1855 assert_eq!(
1856 remote_table_info.unwrap().into_inner().table_info,
1857 table_info
1858 );
1859 assert_eq!(
1860 remote_table_route
1861 .unwrap()
1862 .into_inner()
1863 .region_routes()
1864 .unwrap(),
1865 region_routes
1866 );
1867
1868 for i in 0..2 {
1869 let region_number = i as u32;
1870 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1871 let topic = format!("greptimedb_topic{}", i);
1872 let regions = table_metadata_manager
1873 .topic_region_manager
1874 .regions(&topic)
1875 .await
1876 .unwrap()
1877 .into_keys()
1878 .collect::<Vec<_>>();
1879 assert_eq!(regions.len(), 8);
1880 assert!(regions.contains(®ion_id));
1881 }
1882 }
1883
1884 #[tokio::test]
1885 async fn test_get_full_table_info_remaps_route_address() {
1886 let mem_kv = Arc::new(MemoryKvBackend::default());
1887 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1888
1889 let mut region_route = new_test_region_route();
1890 region_route.follower_peers = vec![Peer::empty(3)];
1891 let region_routes = vec![region_route];
1892 let table_info = new_test_table_info();
1893 let table_id = table_info.ident.table_id;
1894
1895 create_physical_table_metadata(
1896 &table_metadata_manager,
1897 table_info,
1898 region_routes,
1899 HashMap::new(),
1900 )
1901 .await
1902 .unwrap();
1903
1904 mem_kv
1905 .put(PutRequest {
1906 key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1907 value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1908 .try_as_raw_value()
1909 .unwrap(),
1910 ..Default::default()
1911 })
1912 .await
1913 .unwrap();
1914 mem_kv
1915 .put(PutRequest {
1916 key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1917 value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1918 .try_as_raw_value()
1919 .unwrap(),
1920 ..Default::default()
1921 })
1922 .await
1923 .unwrap();
1924
1925 let (_, table_route) = table_metadata_manager
1926 .get_full_table_info(table_id)
1927 .await
1928 .unwrap();
1929 let table_route = table_route.unwrap().into_inner();
1930 let region_routes = table_route.region_routes().unwrap();
1931
1932 assert_eq!(
1933 region_routes[0].leader_peer.as_ref().unwrap().addr,
1934 "new-a2"
1935 );
1936 assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1937 }
1938
1939 #[tokio::test]
1940 async fn test_get_full_table_info_with_read_only_kv_backend() {
1941 let mem_kv = Arc::new(MemoryKvBackend::default());
1942 let writable_manager = TableMetadataManager::new(mem_kv.clone());
1943
1944 let region_routes = vec![new_test_region_route()];
1945 let table_info = new_test_table_info();
1946 let table_id = table_info.ident.table_id;
1947
1948 create_physical_table_metadata(
1949 &writable_manager,
1950 table_info.clone(),
1951 region_routes.clone(),
1952 HashMap::new(),
1953 )
1954 .await
1955 .unwrap();
1956
1957 let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
1958 let read_only_manager = TableMetadataManager::new(read_only_kv);
1959
1960 let (remote_table_info, remote_table_route) = read_only_manager
1961 .get_full_table_info(table_id)
1962 .await
1963 .unwrap();
1964
1965 assert_eq!(
1966 remote_table_info.unwrap().into_inner().table_info,
1967 table_info
1968 );
1969 assert_eq!(
1970 remote_table_route
1971 .unwrap()
1972 .into_inner()
1973 .region_routes()
1974 .unwrap(),
1975 ®ion_routes
1976 );
1977 }
1978
1979 #[tokio::test]
1980 async fn test_create_logic_tables_metadata() {
1981 let mem_kv = Arc::new(MemoryKvBackend::default());
1982 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1983 let region_route = new_test_region_route();
1984 let region_routes = vec![region_route.clone()];
1985 let table_info = new_test_table_info();
1986 let table_id = table_info.ident.table_id;
1987 let table_route_value = TableRouteValue::physical(region_routes.clone());
1988
1989 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1990 table_metadata_manager
1992 .create_logical_tables_metadata(tables_data.clone())
1993 .await
1994 .unwrap();
1995
1996 assert!(
1998 table_metadata_manager
1999 .create_logical_tables_metadata(tables_data)
2000 .await
2001 .is_ok()
2002 );
2003
2004 let mut modified_region_routes = region_routes.clone();
2005 modified_region_routes.push(new_region_route(2, 3));
2006 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
2007 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
2008 assert!(
2010 table_metadata_manager
2011 .create_logical_tables_metadata(modified_tables_data)
2012 .await
2013 .is_err()
2014 );
2015
2016 let (remote_table_info, remote_table_route) = table_metadata_manager
2017 .get_full_table_info(table_id)
2018 .await
2019 .unwrap();
2020
2021 assert_eq!(
2022 remote_table_info.unwrap().into_inner().table_info,
2023 table_info
2024 );
2025 assert_eq!(
2026 remote_table_route
2027 .unwrap()
2028 .into_inner()
2029 .region_routes()
2030 .unwrap(),
2031 ®ion_routes
2032 );
2033 }
2034
2035 #[tokio::test]
2036 async fn test_create_many_logical_tables_metadata() {
2037 let kv_backend = Arc::new(MemoryKvBackend::default());
2038 let table_metadata_manager = TableMetadataManager::new(kv_backend);
2039
2040 let mut tables_data = vec![];
2041 for i in 0..128 {
2042 let table_id = i + 1;
2043 let regin_number = table_id * 3;
2044 let region_id = RegionId::new(table_id, regin_number);
2045 let region_route = new_region_route(region_id.as_u64(), 2);
2046 let region_routes = vec![region_route.clone()];
2047 let table_info = test_utils::new_test_table_info_with_name(
2048 table_id,
2049 &format!("my_table_{}", table_id),
2050 );
2051 let table_route_value = TableRouteValue::physical(region_routes.clone());
2052
2053 tables_data.push((table_info, table_route_value));
2054 }
2055
2056 table_metadata_manager
2058 .create_logical_tables_metadata(tables_data)
2059 .await
2060 .unwrap();
2061 }
2062
2063 #[tokio::test]
2064 async fn test_delete_table_metadata() {
2065 let mem_kv = Arc::new(MemoryKvBackend::default());
2066 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2067 let region_route = new_test_region_route();
2068 let region_routes = &vec![region_route.clone()];
2069 let table_info = new_test_table_info();
2070 let table_id = table_info.ident.table_id;
2071 let datanode_id = 2;
2072 let region_wal_options = create_mock_region_wal_options();
2073 let serialized_region_wal_options = region_wal_options.clone();
2074
2075 create_physical_table_metadata(
2077 &table_metadata_manager,
2078 table_info.clone(),
2079 region_routes.clone(),
2080 serialized_region_wal_options,
2081 )
2082 .await
2083 .unwrap();
2084
2085 let table_name = TableName::new(
2086 table_info.catalog_name,
2087 table_info.schema_name,
2088 table_info.name,
2089 );
2090 let table_route_value = &TableRouteValue::physical(region_routes.clone());
2091 table_metadata_manager
2093 .delete_table_metadata(
2094 table_id,
2095 &table_name,
2096 table_route_value,
2097 ®ion_wal_options,
2098 )
2099 .await
2100 .unwrap();
2101 table_metadata_manager
2103 .delete_table_metadata(
2104 table_id,
2105 &table_name,
2106 table_route_value,
2107 ®ion_wal_options,
2108 )
2109 .await
2110 .unwrap();
2111 assert!(
2112 table_metadata_manager
2113 .table_info_manager()
2114 .get(table_id)
2115 .await
2116 .unwrap()
2117 .is_none()
2118 );
2119 assert!(
2120 table_metadata_manager
2121 .table_route_manager()
2122 .table_route_storage()
2123 .get(table_id)
2124 .await
2125 .unwrap()
2126 .is_none()
2127 );
2128 assert!(
2129 table_metadata_manager
2130 .datanode_table_manager()
2131 .tables(datanode_id)
2132 .try_collect::<Vec<_>>()
2133 .await
2134 .unwrap()
2135 .is_empty()
2136 );
2137 let table_info = table_metadata_manager
2139 .table_info_manager()
2140 .get(table_id)
2141 .await
2142 .unwrap();
2143 assert!(table_info.is_none());
2144 let table_route = table_metadata_manager
2145 .table_route_manager()
2146 .table_route_storage()
2147 .get(table_id)
2148 .await
2149 .unwrap();
2150 assert!(table_route.is_none());
2151 let regions = table_metadata_manager
2153 .topic_region_manager
2154 .regions("greptimedb_topic0")
2155 .await
2156 .unwrap();
2157 assert_eq!(regions.len(), 0);
2158 let regions = table_metadata_manager
2159 .topic_region_manager
2160 .regions("greptimedb_topic1")
2161 .await
2162 .unwrap();
2163 assert_eq!(regions.len(), 0);
2164 }
2165
2166 #[tokio::test]
2167 async fn test_rename_table() {
2168 let mem_kv = Arc::new(MemoryKvBackend::default());
2169 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2170 let region_route = new_test_region_route();
2171 let region_routes = vec![region_route.clone()];
2172 let table_info = new_test_table_info();
2173 let table_id = table_info.ident.table_id;
2174 create_physical_table_metadata(
2176 &table_metadata_manager,
2177 table_info.clone(),
2178 region_routes.clone(),
2179 HashMap::new(),
2180 )
2181 .await
2182 .unwrap();
2183
2184 let new_table_name = "another_name".to_string();
2185 let table_info_value =
2186 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2187
2188 table_metadata_manager
2189 .rename_table(&table_info_value, new_table_name.clone())
2190 .await
2191 .unwrap();
2192 table_metadata_manager
2194 .rename_table(&table_info_value, new_table_name.clone())
2195 .await
2196 .unwrap();
2197 let mut modified_table_info = table_info.clone();
2198 modified_table_info.name = "hi".to_string();
2199 let modified_table_info_value =
2200 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2201 assert!(
2204 table_metadata_manager
2205 .rename_table(&modified_table_info_value, new_table_name.clone())
2206 .await
2207 .is_err()
2208 );
2209
2210 let old_table_name = TableNameKey::new(
2211 &table_info.catalog_name,
2212 &table_info.schema_name,
2213 &table_info.name,
2214 );
2215 let new_table_name = TableNameKey::new(
2216 &table_info.catalog_name,
2217 &table_info.schema_name,
2218 &new_table_name,
2219 );
2220
2221 assert!(
2222 table_metadata_manager
2223 .table_name_manager()
2224 .get(old_table_name)
2225 .await
2226 .unwrap()
2227 .is_none()
2228 );
2229
2230 assert_eq!(
2231 table_metadata_manager
2232 .table_name_manager()
2233 .get(new_table_name)
2234 .await
2235 .unwrap()
2236 .unwrap()
2237 .table_id(),
2238 table_id
2239 );
2240 }
2241
2242 #[tokio::test]
2243 async fn test_update_table_info() {
2244 let mem_kv = Arc::new(MemoryKvBackend::default());
2245 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2246 let region_route = new_test_region_route();
2247 let region_routes = vec![region_route.clone()];
2248 let table_info = new_test_table_info();
2249 let table_id = table_info.ident.table_id;
2250 create_physical_table_metadata(
2252 &table_metadata_manager,
2253 table_info.clone(),
2254 region_routes.clone(),
2255 HashMap::new(),
2256 )
2257 .await
2258 .unwrap();
2259
2260 let mut new_table_info = table_info.clone();
2261 new_table_info.name = "hi".to_string();
2262 let current_table_info_value =
2263 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2264 table_metadata_manager
2266 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2267 .await
2268 .unwrap();
2269 table_metadata_manager
2271 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2272 .await
2273 .unwrap();
2274
2275 let updated_table_info = table_metadata_manager
2277 .table_info_manager()
2278 .get(table_id)
2279 .await
2280 .unwrap()
2281 .unwrap()
2282 .into_inner();
2283 assert_eq!(updated_table_info.table_info, new_table_info);
2284
2285 let mut wrong_table_info = table_info.clone();
2286 wrong_table_info.name = "wrong".to_string();
2287 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2288 current_table_info_value.update(wrong_table_info),
2289 );
2290 assert!(
2293 table_metadata_manager
2294 .update_table_info(&wrong_table_info_value, None, new_table_info)
2295 .await
2296 .is_err()
2297 )
2298 }
2299
2300 #[tokio::test]
2301 async fn test_update_table_leader_region_status() {
2302 let mem_kv = Arc::new(MemoryKvBackend::default());
2303 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2304 let datanode = 1;
2305 let region_routes = vec![
2306 RegionRoute {
2307 region: Region {
2308 id: 1.into(),
2309 name: "r1".to_string(),
2310 attrs: BTreeMap::new(),
2311 partition_expr: Default::default(),
2312 },
2313 leader_peer: Some(Peer::new(datanode, "a2")),
2314 leader_state: Some(LeaderState::Downgrading),
2315 follower_peers: vec![],
2316 leader_down_since: Some(current_time_millis()),
2317 write_route_policy: None,
2318 },
2319 RegionRoute {
2320 region: Region {
2321 id: 2.into(),
2322 name: "r2".to_string(),
2323 attrs: BTreeMap::new(),
2324 partition_expr: Default::default(),
2325 },
2326 leader_peer: Some(Peer::new(datanode, "a1")),
2327 leader_state: None,
2328 follower_peers: vec![],
2329 leader_down_since: None,
2330 write_route_policy: None,
2331 },
2332 ];
2333 let table_info = new_test_table_info();
2334 let table_id = table_info.ident.table_id;
2335 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2336 TableRouteValue::physical(region_routes.clone()),
2337 );
2338
2339 create_physical_table_metadata(
2341 &table_metadata_manager,
2342 table_info.clone(),
2343 region_routes.clone(),
2344 HashMap::new(),
2345 )
2346 .await
2347 .unwrap();
2348
2349 table_metadata_manager
2350 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2351 if region_route.leader_state.is_some() {
2352 None
2353 } else {
2354 Some(Some(LeaderState::Downgrading))
2355 }
2356 })
2357 .await
2358 .unwrap();
2359
2360 let updated_route_value = table_metadata_manager
2361 .table_route_manager()
2362 .table_route_storage()
2363 .get(table_id)
2364 .await
2365 .unwrap()
2366 .unwrap();
2367
2368 assert_eq!(
2369 updated_route_value.region_routes().unwrap()[0].leader_state,
2370 Some(LeaderState::Downgrading)
2371 );
2372
2373 assert!(
2374 updated_route_value.region_routes().unwrap()[0]
2375 .leader_down_since
2376 .is_some()
2377 );
2378
2379 assert_eq!(
2380 updated_route_value.region_routes().unwrap()[1].leader_state,
2381 Some(LeaderState::Downgrading)
2382 );
2383 assert!(
2384 updated_route_value.region_routes().unwrap()[1]
2385 .leader_down_since
2386 .is_some()
2387 );
2388 }
2389
2390 async fn assert_datanode_table(
2391 table_metadata_manager: &TableMetadataManager,
2392 table_id: u32,
2393 region_routes: &[RegionRoute],
2394 ) {
2395 let region_distribution = region_distribution(region_routes);
2396 for (datanode, regions) in region_distribution {
2397 let got = table_metadata_manager
2398 .datanode_table_manager()
2399 .get(&DatanodeTableKey::new(datanode, table_id))
2400 .await
2401 .unwrap()
2402 .unwrap();
2403
2404 assert_eq!(got.regions, regions.leader_regions);
2405 assert_eq!(got.follower_regions, regions.follower_regions);
2406 }
2407 }
2408
2409 #[tokio::test]
2410 async fn test_update_table_route() {
2411 let mem_kv = Arc::new(MemoryKvBackend::default());
2412 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2413 let region_route = new_test_region_route();
2414 let region_routes = vec![region_route.clone()];
2415 let table_info = new_test_table_info();
2416 let table_id = table_info.ident.table_id;
2417 let engine = table_info.meta.engine.as_str();
2418 let region_storage_path =
2419 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2420 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2421 TableRouteValue::physical(region_routes.clone()),
2422 );
2423
2424 create_physical_table_metadata(
2426 &table_metadata_manager,
2427 table_info.clone(),
2428 region_routes.clone(),
2429 HashMap::new(),
2430 )
2431 .await
2432 .unwrap();
2433
2434 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2435 let new_region_routes = vec![
2436 new_region_route(1, 1),
2437 new_region_route(2, 2),
2438 new_region_route(3, 3),
2439 ];
2440 table_metadata_manager
2442 .update_table_route(
2443 table_id,
2444 RegionInfo {
2445 engine: engine.to_string(),
2446 region_storage_path: region_storage_path.clone(),
2447 region_options: HashMap::new(),
2448 region_wal_options: HashMap::new(),
2449 },
2450 ¤t_table_route_value,
2451 new_region_routes.clone(),
2452 &HashMap::new(),
2453 &HashMap::new(),
2454 )
2455 .await
2456 .unwrap();
2457 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2458
2459 table_metadata_manager
2461 .update_table_route(
2462 table_id,
2463 RegionInfo {
2464 engine: engine.to_string(),
2465 region_storage_path: region_storage_path.clone(),
2466 region_options: HashMap::new(),
2467 region_wal_options: HashMap::new(),
2468 },
2469 ¤t_table_route_value,
2470 new_region_routes.clone(),
2471 &HashMap::new(),
2472 &HashMap::new(),
2473 )
2474 .await
2475 .unwrap();
2476
2477 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2478 current_table_route_value
2479 .inner
2480 .update(new_region_routes.clone())
2481 .unwrap(),
2482 );
2483 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2484 table_metadata_manager
2486 .update_table_route(
2487 table_id,
2488 RegionInfo {
2489 engine: engine.to_string(),
2490 region_storage_path: region_storage_path.clone(),
2491 region_options: HashMap::new(),
2492 region_wal_options: HashMap::new(),
2493 },
2494 ¤t_table_route_value,
2495 new_region_routes.clone(),
2496 &HashMap::new(),
2497 &HashMap::new(),
2498 )
2499 .await
2500 .unwrap();
2501 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2502
2503 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2506 current_table_route_value
2507 .update(vec![
2508 new_region_route(1, 1),
2509 new_region_route(2, 2),
2510 new_region_route(3, 3),
2511 new_region_route(4, 4),
2512 ])
2513 .unwrap(),
2514 );
2515 assert!(
2516 table_metadata_manager
2517 .update_table_route(
2518 table_id,
2519 RegionInfo {
2520 engine: engine.to_string(),
2521 region_storage_path: region_storage_path.clone(),
2522 region_options: HashMap::new(),
2523 region_wal_options: HashMap::new(),
2524 },
2525 &wrong_table_route_value,
2526 new_region_routes,
2527 &HashMap::new(),
2528 &HashMap::new(),
2529 )
2530 .await
2531 .is_err()
2532 );
2533 }
2534
2535 #[tokio::test]
2536 async fn test_update_table_route_with_topic_region_mapping() {
2537 let mem_kv = Arc::new(MemoryKvBackend::default());
2538 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2539 let region_route = new_test_region_route();
2540 let region_routes = vec![region_route.clone()];
2541 let table_info = new_test_table_info();
2542 let table_id = table_info.ident.table_id;
2543 let engine = table_info.meta.engine.as_str();
2544 let region_storage_path =
2545 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2546
2547 let old_region_wal_options: RegionWalOptions = vec![
2549 (
2550 1,
2551 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())),
2552 ),
2553 (
2554 2,
2555 WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())),
2556 ),
2557 ]
2558 .into_iter()
2559 .collect();
2560
2561 create_physical_table_metadata(
2562 &table_metadata_manager,
2563 table_info.clone(),
2564 region_routes.clone(),
2565 old_region_wal_options.clone(),
2566 )
2567 .await
2568 .unwrap();
2569
2570 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2571 TableRouteValue::physical(region_routes.clone()),
2572 );
2573
2574 let region_id_1 = RegionId::new(table_id, 1);
2576 let region_id_2 = RegionId::new(table_id, 2);
2577 let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2578 let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2579 assert!(
2580 table_metadata_manager
2581 .topic_region_manager
2582 .get(topic_1_key.clone())
2583 .await
2584 .unwrap()
2585 .is_some()
2586 );
2587 assert!(
2588 table_metadata_manager
2589 .topic_region_manager
2590 .get(topic_2_key.clone())
2591 .await
2592 .unwrap()
2593 .is_some()
2594 );
2595
2596 let new_region_routes = vec![
2598 new_region_route(1, 1),
2599 new_region_route(2, 2),
2600 new_region_route(3, 3), ];
2602 let new_region_wal_options: RegionWalOptions = vec![
2603 (
2604 1,
2605 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), ),
2607 (
2608 2,
2609 WalOptions::Kafka(KafkaWalOptions::new("topic_2".to_string())), ),
2611 (
2612 3,
2613 WalOptions::Kafka(KafkaWalOptions::new("topic_3".to_string())), ),
2615 ]
2616 .into_iter()
2617 .collect();
2618 let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2619 current_table_route_value
2620 .inner
2621 .update(new_region_routes.clone())
2622 .unwrap(),
2623 );
2624 table_metadata_manager
2625 .update_table_route(
2626 table_id,
2627 RegionInfo {
2628 engine: engine.to_string(),
2629 region_storage_path: region_storage_path.clone(),
2630 region_options: HashMap::new(),
2631 region_wal_options: old_region_wal_options.clone(),
2632 },
2633 ¤t_table_route_value,
2634 new_region_routes.clone(),
2635 &HashMap::new(),
2636 &new_region_wal_options,
2637 )
2638 .await
2639 .unwrap();
2640 let region_id_3 = RegionId::new(table_id, 3);
2642 let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2643 assert!(
2644 table_metadata_manager
2645 .topic_region_manager
2646 .get(topic_3_key)
2647 .await
2648 .unwrap()
2649 .is_some()
2650 );
2651 let newer_region_routes = vec![
2653 new_region_route(1, 1),
2654 ];
2657 let newer_region_wal_options: RegionWalOptions = vec![
2658 (
2659 1,
2660 WalOptions::Kafka(KafkaWalOptions::new("topic_1".to_string())), ),
2662 (
2663 3,
2664 WalOptions::Kafka(KafkaWalOptions::new("topic_3_new".to_string())), ),
2666 ]
2667 .into_iter()
2668 .collect();
2669 table_metadata_manager
2670 .update_table_route(
2671 table_id,
2672 RegionInfo {
2673 engine: engine.to_string(),
2674 region_storage_path: region_storage_path.clone(),
2675 region_options: HashMap::new(),
2676 region_wal_options: new_region_wal_options.clone(),
2677 },
2678 ¤t_table_route_value_updated,
2679 newer_region_routes.clone(),
2680 &HashMap::new(),
2681 &newer_region_wal_options,
2682 )
2683 .await
2684 .unwrap();
2685 let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2687 assert!(
2688 table_metadata_manager
2689 .topic_region_manager
2690 .get(topic_2_key_new)
2691 .await
2692 .unwrap()
2693 .is_none()
2694 );
2695 let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2697 assert!(
2698 table_metadata_manager
2699 .topic_region_manager
2700 .get(topic_3_key_old)
2701 .await
2702 .unwrap()
2703 .is_none()
2704 );
2705 let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2707 assert!(
2708 table_metadata_manager
2709 .topic_region_manager
2710 .get(topic_3_key_new)
2711 .await
2712 .unwrap()
2713 .is_some()
2714 );
2715 assert!(
2717 table_metadata_manager
2718 .topic_region_manager
2719 .get(topic_1_key)
2720 .await
2721 .unwrap()
2722 .is_some()
2723 );
2724 }
2725
2726 #[tokio::test]
2727 async fn test_destroy_table_metadata() {
2728 let mem_kv = Arc::new(MemoryKvBackend::default());
2729 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2730 let table_id = 1025;
2731 let table_name = "foo";
2732 let task = test_create_table_task(table_name, table_id);
2733 let options = create_mixed_region_wal_options();
2734 let serialized_options = options.clone();
2735 table_metadata_manager
2736 .create_table_metadata(
2737 task.table_info,
2738 TableRouteValue::physical(vec![
2739 RegionRoute {
2740 region: Region::new_test(RegionId::new(table_id, 1)),
2741 leader_peer: Some(Peer::empty(1)),
2742 follower_peers: vec![Peer::empty(5)],
2743 leader_state: None,
2744 leader_down_since: None,
2745 write_route_policy: None,
2746 },
2747 RegionRoute {
2748 region: Region::new_test(RegionId::new(table_id, 2)),
2749 leader_peer: Some(Peer::empty(2)),
2750 follower_peers: vec![Peer::empty(4)],
2751 leader_state: None,
2752 leader_down_since: None,
2753 write_route_policy: None,
2754 },
2755 RegionRoute {
2756 region: Region::new_test(RegionId::new(table_id, 3)),
2757 leader_peer: Some(Peer::empty(3)),
2758 follower_peers: vec![],
2759 leader_state: None,
2760 leader_down_since: None,
2761 write_route_policy: None,
2762 },
2763 ]),
2764 serialized_options,
2765 )
2766 .await
2767 .unwrap();
2768 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2769 let table_route_value = table_metadata_manager
2770 .table_route_manager
2771 .table_route_storage()
2772 .get_with_raw_bytes(table_id)
2773 .await
2774 .unwrap()
2775 .unwrap();
2776 table_metadata_manager
2777 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2778 .await
2779 .unwrap();
2780 assert!(mem_kv.is_empty());
2781 }
2782
2783 #[tokio::test]
2784 async fn test_restore_table_metadata() {
2785 let mem_kv = Arc::new(MemoryKvBackend::default());
2786 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2787 let table_id = 1025;
2788 let table_name = "foo";
2789 let task = test_create_table_task(table_name, table_id);
2790 let options = create_mixed_region_wal_options();
2791 let serialized_options = options.clone();
2792 table_metadata_manager
2793 .create_table_metadata(
2794 task.table_info,
2795 TableRouteValue::physical(vec![
2796 RegionRoute {
2797 region: Region::new_test(RegionId::new(table_id, 1)),
2798 leader_peer: Some(Peer::empty(1)),
2799 follower_peers: vec![Peer::empty(5)],
2800 leader_state: None,
2801 leader_down_since: None,
2802 write_route_policy: None,
2803 },
2804 RegionRoute {
2805 region: Region::new_test(RegionId::new(table_id, 2)),
2806 leader_peer: Some(Peer::empty(2)),
2807 follower_peers: vec![Peer::empty(4)],
2808 leader_state: None,
2809 leader_down_since: None,
2810 write_route_policy: None,
2811 },
2812 RegionRoute {
2813 region: Region::new_test(RegionId::new(table_id, 3)),
2814 leader_peer: Some(Peer::empty(3)),
2815 follower_peers: vec![],
2816 leader_state: None,
2817 leader_down_since: None,
2818 write_route_policy: None,
2819 },
2820 ]),
2821 serialized_options,
2822 )
2823 .await
2824 .unwrap();
2825 let expected_result = mem_kv.dump();
2826 let table_route_value = table_metadata_manager
2827 .table_route_manager
2828 .table_route_storage()
2829 .get_with_raw_bytes(table_id)
2830 .await
2831 .unwrap()
2832 .unwrap();
2833 let region_routes = table_route_value.region_routes().unwrap();
2834 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2835 let table_route_value = TableRouteValue::physical(region_routes.clone());
2836 table_metadata_manager
2837 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2838 .await
2839 .unwrap();
2840 table_metadata_manager
2841 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2842 .await
2843 .unwrap();
2844 let kvs = mem_kv.dump();
2845 assert_eq!(kvs, expected_result);
2846 table_metadata_manager
2848 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2849 .await
2850 .unwrap();
2851 let kvs = mem_kv.dump();
2852 assert_eq!(kvs, expected_result);
2853 }
2854
2855 #[tokio::test]
2856 async fn test_dropped_table_metadata_enumeration_and_lookup() {
2857 let mem_kv = Arc::new(MemoryKvBackend::default());
2858 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2859 let table_id = 1025;
2860 let table_name = "foo";
2861 let task = test_create_table_task(table_name, table_id);
2862 let table_info = task.table_info.clone();
2863 let options = create_mixed_region_wal_options();
2864 let serialized_options = options.clone();
2865 table_metadata_manager
2866 .create_table_metadata(
2867 table_info.clone(),
2868 TableRouteValue::physical(vec![
2869 RegionRoute {
2870 region: Region::new_test(RegionId::new(table_id, 1)),
2871 leader_peer: Some(Peer::empty(1)),
2872 follower_peers: vec![Peer::empty(5)],
2873 leader_state: None,
2874 leader_down_since: None,
2875 write_route_policy: None,
2876 },
2877 RegionRoute {
2878 region: Region::new_test(RegionId::new(table_id, 2)),
2879 leader_peer: Some(Peer::empty(2)),
2880 follower_peers: vec![Peer::empty(4)],
2881 leader_state: None,
2882 leader_down_since: None,
2883 write_route_policy: None,
2884 },
2885 RegionRoute {
2886 region: Region::new_test(RegionId::new(table_id, 3)),
2887 leader_peer: Some(Peer::empty(3)),
2888 follower_peers: vec![],
2889 leader_state: None,
2890 leader_down_since: None,
2891 write_route_policy: None,
2892 },
2893 ]),
2894 serialized_options,
2895 )
2896 .await
2897 .unwrap();
2898 let table_route_value = table_metadata_manager
2899 .table_route_manager
2900 .table_route_storage()
2901 .get_with_raw_bytes(table_id)
2902 .await
2903 .unwrap()
2904 .unwrap();
2905 let region_routes = table_route_value.region_routes().unwrap();
2906 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2907 let table_route_value = TableRouteValue::physical(region_routes.clone());
2908
2909 table_metadata_manager
2910 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2911 .await
2912 .unwrap();
2913
2914 let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
2915 assert_eq!(dropped_tables.len(), 1);
2916 assert_eq!(dropped_tables[0].table_id, table_id);
2917 assert_eq!(dropped_tables[0].table_name, table_name);
2918
2919 let dropped_table = table_metadata_manager
2920 .get_dropped_table(&table_name)
2921 .await
2922 .unwrap()
2923 .unwrap();
2924 assert_eq!(dropped_table.table_id, table_id);
2925 assert_eq!(dropped_table.table_name, table_name);
2926 assert_eq!(dropped_table.table_info_value.table_info, table_info);
2927 assert_eq!(
2928 dropped_table.table_route_value.region_routes().unwrap(),
2929 region_routes
2930 );
2931 assert_eq!(dropped_table.region_wal_options, options);
2932
2933 let dropped_table_by_id = table_metadata_manager
2934 .get_dropped_table_by_id(table_id)
2935 .await
2936 .unwrap()
2937 .unwrap();
2938 assert_eq!(dropped_table_by_id.table_id, table_id);
2939 assert_eq!(dropped_table_by_id.table_name, table_name);
2940 assert_eq!(dropped_table_by_id.table_info_value.table_info, table_info);
2941 assert_eq!(
2942 dropped_table_by_id
2943 .table_route_value
2944 .region_routes()
2945 .unwrap(),
2946 region_routes
2947 );
2948 assert_eq!(dropped_table_by_id.region_wal_options, options);
2949 }
2950
2951 #[tokio::test]
2952 async fn test_dropped_table_lookup_survives_live_name_recreation() {
2953 let mem_kv = Arc::new(MemoryKvBackend::default());
2954 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2955 let dropped_table_id = 1025;
2956 let recreated_table_id = 1026;
2957 let table_name = "foo";
2958 let dropped_task = test_create_table_task(table_name, dropped_table_id);
2959 let dropped_table_info = dropped_task.table_info.clone();
2960 let options = create_mock_region_wal_options();
2961 let serialized_options = options.clone();
2962 table_metadata_manager
2963 .create_table_metadata(
2964 dropped_table_info.clone(),
2965 TableRouteValue::physical(vec![
2966 RegionRoute {
2967 region: Region::new_test(RegionId::new(dropped_table_id, 1)),
2968 leader_peer: Some(Peer::empty(1)),
2969 follower_peers: vec![Peer::empty(5)],
2970 leader_state: None,
2971 leader_down_since: None,
2972 write_route_policy: None,
2973 },
2974 RegionRoute {
2975 region: Region::new_test(RegionId::new(dropped_table_id, 2)),
2976 leader_peer: Some(Peer::empty(2)),
2977 follower_peers: vec![Peer::empty(4)],
2978 leader_state: None,
2979 leader_down_since: None,
2980 write_route_policy: None,
2981 },
2982 ]),
2983 serialized_options.clone(),
2984 )
2985 .await
2986 .unwrap();
2987
2988 let dropped_table_name =
2989 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2990 let dropped_table_route = table_metadata_manager
2991 .table_route_manager
2992 .table_route_storage()
2993 .get_with_raw_bytes(dropped_table_id)
2994 .await
2995 .unwrap()
2996 .unwrap();
2997 let dropped_table_route =
2998 TableRouteValue::physical(dropped_table_route.region_routes().unwrap().clone());
2999 table_metadata_manager
3000 .delete_table_metadata(
3001 dropped_table_id,
3002 &dropped_table_name,
3003 &dropped_table_route,
3004 &options,
3005 )
3006 .await
3007 .unwrap();
3008
3009 let recreated_task = test_create_table_task(table_name, recreated_table_id);
3010 table_metadata_manager
3011 .create_table_metadata(
3012 recreated_task.table_info,
3013 TableRouteValue::physical(vec![RegionRoute {
3014 region: Region::new_test(RegionId::new(recreated_table_id, 1)),
3015 leader_peer: Some(Peer::empty(4)),
3016 follower_peers: vec![],
3017 leader_state: None,
3018 leader_down_since: None,
3019 write_route_policy: None,
3020 }]),
3021 serialized_options,
3022 )
3023 .await
3024 .unwrap();
3025
3026 assert_eq!(
3027 table_metadata_manager
3028 .table_name_manager()
3029 .get(TableNameKey::from(&dropped_table_name))
3030 .await
3031 .unwrap()
3032 .unwrap()
3033 .table_id(),
3034 recreated_table_id
3035 );
3036
3037 let dropped_table = table_metadata_manager
3038 .get_dropped_table(&dropped_table_name)
3039 .await
3040 .unwrap()
3041 .unwrap();
3042 assert_eq!(dropped_table.table_id, dropped_table_id);
3043 assert_eq!(dropped_table.table_name, dropped_table_name);
3044 assert_eq!(
3045 dropped_table.table_info_value.table_info,
3046 dropped_table_info
3047 );
3048
3049 let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
3050 assert_eq!(dropped_tables.len(), 1);
3051 assert_eq!(dropped_tables[0].table_id, dropped_table_id);
3052 assert_eq!(dropped_tables[0].table_name, dropped_table_name);
3053 }
3054
3055 #[tokio::test]
3056 async fn test_dropped_table_lookup_ignores_unrelated_malformed_datanode_tombstones() {
3057 let mem_kv = Arc::new(MemoryKvBackend::default());
3058 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3059 let table_id = 1025;
3060 let table_name = "foo";
3061 let task = test_create_table_task(table_name, table_id);
3062 let table_info = task.table_info.clone();
3063 let options = create_mixed_region_wal_options();
3064 let serialized_options = options.clone();
3065 table_metadata_manager
3066 .create_table_metadata(
3067 table_info.clone(),
3068 TableRouteValue::physical(vec![
3069 RegionRoute {
3070 region: Region::new_test(RegionId::new(table_id, 1)),
3071 leader_peer: Some(Peer::empty(1)),
3072 follower_peers: vec![Peer::empty(5)],
3073 leader_state: None,
3074 leader_down_since: None,
3075 write_route_policy: None,
3076 },
3077 RegionRoute {
3078 region: Region::new_test(RegionId::new(table_id, 2)),
3079 leader_peer: Some(Peer::empty(2)),
3080 follower_peers: vec![Peer::empty(4)],
3081 leader_state: None,
3082 leader_down_since: None,
3083 write_route_policy: None,
3084 },
3085 ]),
3086 serialized_options,
3087 )
3088 .await
3089 .unwrap();
3090
3091 let table_route_value = table_metadata_manager
3092 .table_route_manager
3093 .table_route_storage()
3094 .get_with_raw_bytes(table_id)
3095 .await
3096 .unwrap()
3097 .unwrap();
3098 let region_routes = table_route_value.region_routes().unwrap();
3099 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3100 let table_route_value = TableRouteValue::physical(region_routes.clone());
3101
3102 table_metadata_manager
3103 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
3104 .await
3105 .unwrap();
3106
3107 mem_kv
3108 .put(
3109 PutRequest::new()
3110 .with_key("__tombstone/__dn_table/not-a-datanode-table-key")
3111 .with_value("malformed"),
3112 )
3113 .await
3114 .unwrap();
3115
3116 let dropped_table = table_metadata_manager
3117 .get_dropped_table(&table_name)
3118 .await
3119 .unwrap()
3120 .unwrap();
3121 assert_eq!(dropped_table.table_id, table_id);
3122 assert_eq!(dropped_table.table_name, table_name);
3123 assert_eq!(dropped_table.table_info_value.table_info, table_info);
3124 assert_eq!(dropped_table.region_wal_options, options);
3125 }
3126
3127 #[tokio::test]
3128 async fn test_create_update_view_info() {
3129 let mem_kv = Arc::new(MemoryKvBackend::default());
3130 let table_metadata_manager = TableMetadataManager::new(mem_kv);
3131
3132 let view_info = new_test_table_info();
3133
3134 let view_id = view_info.ident.table_id;
3135
3136 let logical_plan: Vec<u8> = vec![1, 2, 3];
3137 let columns = vec!["a".to_string()];
3138 let plan_columns = vec!["number".to_string()];
3139 let table_names = new_test_table_names();
3140 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
3141
3142 table_metadata_manager
3144 .create_view_metadata(
3145 view_info.clone(),
3146 logical_plan.clone(),
3147 table_names.clone(),
3148 columns.clone(),
3149 plan_columns.clone(),
3150 definition.to_string(),
3151 )
3152 .await
3153 .unwrap();
3154
3155 {
3156 let current_view_info = table_metadata_manager
3158 .view_info_manager()
3159 .get(view_id)
3160 .await
3161 .unwrap()
3162 .unwrap()
3163 .into_inner();
3164 assert_eq!(current_view_info.view_info, logical_plan);
3165 assert_eq!(current_view_info.table_names, table_names);
3166 assert_eq!(current_view_info.definition, definition);
3167 assert_eq!(current_view_info.columns, columns);
3168 assert_eq!(current_view_info.plan_columns, plan_columns);
3169 let current_table_info = table_metadata_manager
3171 .table_info_manager()
3172 .get(view_id)
3173 .await
3174 .unwrap()
3175 .unwrap()
3176 .into_inner();
3177 assert_eq!(current_table_info.table_info, view_info);
3178 }
3179
3180 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
3181 let new_table_names = {
3182 let mut set = HashSet::new();
3183 set.insert(TableName {
3184 catalog_name: "greptime".to_string(),
3185 schema_name: "public".to_string(),
3186 table_name: "b_table".to_string(),
3187 });
3188 set.insert(TableName {
3189 catalog_name: "greptime".to_string(),
3190 schema_name: "public".to_string(),
3191 table_name: "c_table".to_string(),
3192 });
3193 set
3194 };
3195 let new_columns = vec!["b".to_string()];
3196 let new_plan_columns = vec!["number2".to_string()];
3197 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
3198
3199 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
3200 logical_plan.clone().into(),
3201 table_names,
3202 columns,
3203 plan_columns,
3204 definition.to_string(),
3205 ));
3206 table_metadata_manager
3208 .update_view_info(
3209 view_id,
3210 ¤t_view_info_value,
3211 new_logical_plan.clone(),
3212 new_table_names.clone(),
3213 new_columns.clone(),
3214 new_plan_columns.clone(),
3215 new_definition.to_string(),
3216 )
3217 .await
3218 .unwrap();
3219 table_metadata_manager
3221 .update_view_info(
3222 view_id,
3223 ¤t_view_info_value,
3224 new_logical_plan.clone(),
3225 new_table_names.clone(),
3226 new_columns.clone(),
3227 new_plan_columns.clone(),
3228 new_definition.to_string(),
3229 )
3230 .await
3231 .unwrap();
3232
3233 let updated_view_info = table_metadata_manager
3235 .view_info_manager()
3236 .get(view_id)
3237 .await
3238 .unwrap()
3239 .unwrap()
3240 .into_inner();
3241 assert_eq!(updated_view_info.view_info, new_logical_plan);
3242 assert_eq!(updated_view_info.table_names, new_table_names);
3243 assert_eq!(updated_view_info.definition, new_definition);
3244 assert_eq!(updated_view_info.columns, new_columns);
3245 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
3246
3247 let wrong_view_info = logical_plan.clone();
3248 let wrong_definition = "wrong_definition";
3249 let wrong_view_info_value =
3250 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
3251 wrong_view_info.into(),
3252 new_table_names.clone(),
3253 new_columns.clone(),
3254 new_plan_columns.clone(),
3255 wrong_definition.to_string(),
3256 ));
3257 assert!(
3260 table_metadata_manager
3261 .update_view_info(
3262 view_id,
3263 &wrong_view_info_value,
3264 new_logical_plan.clone(),
3265 new_table_names.clone(),
3266 vec!["c".to_string()],
3267 vec!["number3".to_string()],
3268 wrong_definition.to_string(),
3269 )
3270 .await
3271 .is_err()
3272 );
3273
3274 let current_view_info = table_metadata_manager
3276 .view_info_manager()
3277 .get(view_id)
3278 .await
3279 .unwrap()
3280 .unwrap()
3281 .into_inner();
3282 assert_eq!(current_view_info.view_info, new_logical_plan);
3283 assert_eq!(current_view_info.table_names, new_table_names);
3284 assert_eq!(current_view_info.definition, new_definition);
3285 assert_eq!(current_view_info.columns, new_columns);
3286 assert_eq!(current_view_info.plan_columns, new_plan_columns);
3287 }
3288
3289 #[test]
3290 fn test_region_role_set_deserialize() {
3291 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
3292 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3293 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3294 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
3295
3296 let s = r#"[1, 2, 3]"#;
3297 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3298 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3299 assert!(region_role_set.follower_regions.is_empty());
3300 }
3301
3302 #[test]
3303 fn test_region_distribution_deserialize() {
3304 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
3305 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
3306 assert_eq!(region_distribution.len(), 2);
3307 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
3308 assert!(region_distribution[&1].follower_regions.is_empty());
3309 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
3310 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
3311 }
3312}