1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_repart;
110pub mod table_route;
111#[cfg(any(test, feature = "testing"))]
112pub mod test_utils;
113pub mod tombstone;
114pub mod topic_name;
115pub mod topic_region;
116pub mod txn_helper;
117pub mod view_info;
118
119use std::collections::{BTreeMap, HashMap, HashSet};
120use std::fmt::Debug;
121use std::ops::{Deref, DerefMut};
122use std::sync::Arc;
123
124use bytes::Bytes;
125use common_base::regex_pattern::NAME_PATTERN;
126use common_catalog::consts::{
127 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
128};
129use common_telemetry::warn;
130use common_wal::options::WalOptions;
131use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
132use flow::flow_route::FlowRouteValue;
133use flow::table_flow::TableFlowValue;
134use futures_util::TryStreamExt;
135use lazy_static::lazy_static;
136use regex::Regex;
137pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
138use serde::de::DeserializeOwned;
139use serde::{Deserialize, Serialize};
140use snafu::{OptionExt, ResultExt, ensure};
141use store_api::storage::RegionNumber;
142use table::metadata::{TableId, TableInfo};
143use table::table_name::TableName;
144use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
145use table_name::{TableNameKey, TableNameManager, TableNameValue};
146use topic_name::TopicNameManager;
147use topic_region::{TopicRegionKey, TopicRegionManager};
148use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
149
150use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
151use self::datanode_table::RegionInfo;
152use self::flow::flow_info::FlowInfoValue;
153use self::flow::flow_name::FlowNameValue;
154use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
155use self::table_route::{TableRouteManager, TableRouteValue};
156use self::tombstone::TombstoneManager;
157use crate::DatanodeId;
158use crate::error::{self, Result, SerdeJsonSnafu};
159use crate::key::flow::flow_state::FlowStateValue;
160use crate::key::node_address::NodeAddressValue;
161use crate::key::table_repart::{TableRepartKey, TableRepartManager};
162use crate::key::table_route::TableRouteKey;
163use crate::key::topic_region::TopicRegionValue;
164use crate::key::txn_helper::TxnOpGetResponseSet;
165use crate::kv_backend::KvBackendRef;
166use crate::kv_backend::txn::{Txn, TxnOp};
167use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
168use crate::rpc::store::BatchDeleteRequest;
169use crate::state_store::PoisonValue;
170
171pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
172pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
173pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
174pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
175pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
176
177pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
178pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
179pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
180pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
181pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
182pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
183pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
184pub const TABLE_REPART_PREFIX: &str = "__table_repart";
185pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
186pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
187pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
189pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
190
191pub const ELECTION_KEY: &str = "__metasrv_election";
193pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
195
196pub const CACHE_KEY_PREFIXES: [&str; 5] = [
198 TABLE_NAME_KEY_PREFIX,
199 CATALOG_NAME_KEY_PREFIX,
200 SCHEMA_NAME_KEY_PREFIX,
201 TABLE_ROUTE_PREFIX,
202 NODE_ADDRESS_PREFIX,
203];
204
205#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
207pub struct RegionRoleSet {
208 pub leader_regions: Vec<RegionNumber>,
210 pub follower_regions: Vec<RegionNumber>,
212}
213
214impl<'de> Deserialize<'de> for RegionRoleSet {
215 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
216 where
217 D: serde::Deserializer<'de>,
218 {
219 #[derive(Deserialize)]
220 #[serde(untagged)]
221 enum RegionRoleSetOrLeaderOnly {
222 Full {
223 leader_regions: Vec<RegionNumber>,
224 follower_regions: Vec<RegionNumber>,
225 },
226 LeaderOnly(Vec<RegionNumber>),
227 }
228 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
229 RegionRoleSetOrLeaderOnly::Full {
230 leader_regions,
231 follower_regions,
232 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
233 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
234 Ok(RegionRoleSet::new(leader_regions, vec![]))
235 }
236 }
237 }
238}
239
240impl RegionRoleSet {
241 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
243 Self {
244 leader_regions,
245 follower_regions,
246 }
247 }
248
249 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
251 self.leader_regions.push(region_number);
252 }
253
254 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
256 self.follower_regions.push(region_number);
257 }
258
259 pub fn sort(&mut self) {
261 self.follower_regions.sort();
262 self.leader_regions.sort();
263 }
264}
265
266pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
270
271pub type FlowId = u32;
273pub type FlowPartitionId = u32;
275
276lazy_static! {
277 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
278}
279
280lazy_static! {
281 static ref TABLE_INFO_KEY_PATTERN: Regex =
282 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
283}
284
285lazy_static! {
286 static ref VIEW_INFO_KEY_PATTERN: Regex =
287 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
288}
289
290lazy_static! {
291 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
292 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
293}
294
295lazy_static! {
296 pub(crate) static ref TABLE_REPART_KEY_PATTERN: Regex =
297 Regex::new(&format!("^{TABLE_REPART_PREFIX}/([0-9]+)$")).unwrap();
298}
299
300lazy_static! {
301 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
302 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
303}
304
305lazy_static! {
306 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
307 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
308 ))
309 .unwrap();
310}
311
312lazy_static! {
313 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
315 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
316 ))
317 .unwrap();
318}
319
320lazy_static! {
321 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
323 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
324 ))
325 .unwrap();
326}
327
328lazy_static! {
329 static ref NODE_ADDRESS_PATTERN: Regex =
330 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
331}
332
333lazy_static! {
334 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
335 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
336}
337
338lazy_static! {
339 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
340 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
341 ))
342 .unwrap();
343}
344
345pub trait MetadataKey<'a, T> {
347 fn to_bytes(&self) -> Vec<u8>;
348
349 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
350}
351
352#[derive(Debug, Clone, PartialEq)]
353pub struct BytesAdapter(Vec<u8>);
354
355impl From<Vec<u8>> for BytesAdapter {
356 fn from(value: Vec<u8>) -> Self {
357 Self(value)
358 }
359}
360
361impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
362 fn to_bytes(&self) -> Vec<u8> {
363 self.0.clone()
364 }
365
366 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
367 Ok(BytesAdapter(bytes.to_vec()))
368 }
369}
370
371pub(crate) trait MetadataKeyGetTxnOp {
372 fn build_get_op(
373 &self,
374 ) -> (
375 TxnOp,
376 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
377 );
378}
379
380pub trait MetadataValue {
381 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
382 where
383 Self: Sized;
384
385 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
386}
387
388pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
389
390pub struct TableMetadataManager {
391 table_name_manager: TableNameManager,
392 table_info_manager: TableInfoManager,
393 view_info_manager: ViewInfoManager,
394 datanode_table_manager: DatanodeTableManager,
395 catalog_manager: CatalogManager,
396 schema_manager: SchemaManager,
397 table_route_manager: TableRouteManager,
398 table_repart_manager: TableRepartManager,
399 tombstone_manager: TombstoneManager,
400 topic_name_manager: TopicNameManager,
401 topic_region_manager: TopicRegionManager,
402 kv_backend: KvBackendRef,
403}
404
405#[macro_export]
406macro_rules! ensure_values {
407 ($got:expr, $expected_value:expr, $name:expr) => {
408 ensure!(
409 $got == $expected_value,
410 error::UnexpectedSnafu {
411 err_msg: format!(
412 "Reads the different value: {:?} during {}, expected: {:?}",
413 $got, $name, $expected_value
414 )
415 }
416 );
417 };
418}
419
420pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
430 bytes: Bytes,
432 inner: T,
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub struct DroppedTableName {
438 pub table_id: TableId,
440 pub table_name: TableName,
442}
443
444#[derive(Debug, Clone)]
445pub struct DroppedTableMetadata {
446 pub table_id: TableId,
448 pub table_name: TableName,
450 pub table_info_value: TableInfoValue,
452 pub table_route_value: TableRouteValue,
454 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
456}
457
458impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
459 type Target = T;
460
461 fn deref(&self) -> &Self::Target {
462 &self.inner
463 }
464}
465
466impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
467 fn deref_mut(&mut self) -> &mut Self::Target {
468 &mut self.inner
469 }
470}
471
472impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 write!(
475 f,
476 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
477 self.inner, self.bytes
478 )
479 }
480}
481
482impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
483 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
487 where
488 S: serde::Serializer,
489 {
490 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
493 }
494}
495
496impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
497 for DeserializedValueWithBytes<T>
498{
499 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
503 where
504 D: serde::Deserializer<'de>,
505 {
506 let buf = String::deserialize(deserializer)?;
507 let bytes = Bytes::from(buf);
508
509 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
510 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
511
512 Ok(value)
513 }
514}
515
516impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
517 fn clone(&self) -> Self {
518 Self {
519 bytes: self.bytes.clone(),
520 inner: self.inner.clone(),
521 }
522 }
523}
524
525impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
526 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
529 let inner = T::try_from_raw_value(&bytes)?;
530 Ok(Self { bytes, inner })
531 }
532
533 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
536 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
537 }
538
539 pub fn into_inner(self) -> T {
540 self.inner
541 }
542
543 pub fn get_inner_ref(&self) -> &T {
544 &self.inner
545 }
546
547 pub fn get_raw_bytes(&self) -> Vec<u8> {
549 self.bytes.to_vec()
550 }
551
552 #[cfg(any(test, feature = "testing"))]
553 pub fn from_inner(inner: T) -> Self {
554 let bytes = serde_json::to_vec(&inner).unwrap();
555
556 Self {
557 bytes: Bytes::from(bytes),
558 inner,
559 }
560 }
561}
562
563impl TableMetadataManager {
564 pub fn new(kv_backend: KvBackendRef) -> Self {
565 TableMetadataManager {
566 table_name_manager: TableNameManager::new(kv_backend.clone()),
567 table_info_manager: TableInfoManager::new(kv_backend.clone()),
568 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
569 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
570 catalog_manager: CatalogManager::new(kv_backend.clone()),
571 schema_manager: SchemaManager::new(kv_backend.clone()),
572 table_route_manager: TableRouteManager::new(kv_backend.clone()),
573 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
574 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
575 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
576 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
577 kv_backend,
578 }
579 }
580
581 pub fn new_with_custom_tombstone_prefix(
583 kv_backend: KvBackendRef,
584 tombstone_prefix: &str,
585 ) -> Self {
586 Self {
587 table_name_manager: TableNameManager::new(kv_backend.clone()),
588 table_info_manager: TableInfoManager::new(kv_backend.clone()),
589 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
590 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
591 catalog_manager: CatalogManager::new(kv_backend.clone()),
592 schema_manager: SchemaManager::new(kv_backend.clone()),
593 table_route_manager: TableRouteManager::new(kv_backend.clone()),
594 table_repart_manager: TableRepartManager::new(kv_backend.clone()),
595 tombstone_manager: TombstoneManager::new_with_prefix(
596 kv_backend.clone(),
597 tombstone_prefix,
598 ),
599 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
600 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
601 kv_backend,
602 }
603 }
604
605 pub async fn init(&self) -> Result<()> {
606 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
607
608 self.catalog_manager().create(catalog_name, true).await?;
609
610 let internal_schemas = [
611 DEFAULT_SCHEMA_NAME,
612 INFORMATION_SCHEMA_NAME,
613 DEFAULT_PRIVATE_SCHEMA_NAME,
614 ];
615
616 for schema_name in internal_schemas {
617 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
618
619 self.schema_manager().create(schema_key, None, true).await?;
620 }
621
622 Ok(())
623 }
624
625 pub fn table_name_manager(&self) -> &TableNameManager {
626 &self.table_name_manager
627 }
628
629 pub fn table_info_manager(&self) -> &TableInfoManager {
630 &self.table_info_manager
631 }
632
633 pub fn view_info_manager(&self) -> &ViewInfoManager {
634 &self.view_info_manager
635 }
636
637 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
638 &self.datanode_table_manager
639 }
640
641 pub fn catalog_manager(&self) -> &CatalogManager {
642 &self.catalog_manager
643 }
644
645 pub fn schema_manager(&self) -> &SchemaManager {
646 &self.schema_manager
647 }
648
649 pub fn table_route_manager(&self) -> &TableRouteManager {
650 &self.table_route_manager
651 }
652
653 pub fn table_repart_manager(&self) -> &TableRepartManager {
654 &self.table_repart_manager
655 }
656
657 pub fn topic_name_manager(&self) -> &TopicNameManager {
658 &self.topic_name_manager
659 }
660
661 pub fn topic_region_manager(&self) -> &TopicRegionManager {
662 &self.topic_region_manager
663 }
664
665 pub fn kv_backend(&self) -> &KvBackendRef {
666 &self.kv_backend
667 }
668
669 pub async fn get_full_table_info(
670 &self,
671 table_id: TableId,
672 ) -> Result<(
673 Option<DeserializedValueWithBytes<TableInfoValue>>,
674 Option<DeserializedValueWithBytes<TableRouteValue>>,
675 )> {
676 let table_info_key = TableInfoKey::new(table_id);
677 let table_route_key = TableRouteKey::new(table_id);
678 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
679 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
680
681 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
682 let mut res = self.kv_backend.txn(txn).await?;
683 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
684 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
685 let mut table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
686 if let Some(table_route_value) = &mut table_route_value {
687 self.table_route_manager()
688 .table_route_storage()
689 .remap_table_route(table_route_value)
690 .await?;
691 }
692 Ok((table_info_value, table_route_value))
693 }
694
695 pub async fn create_view_metadata(
705 &self,
706 view_info: TableInfo,
707 raw_logical_plan: Vec<u8>,
708 table_names: HashSet<TableName>,
709 columns: Vec<String>,
710 plan_columns: Vec<String>,
711 definition: String,
712 ) -> Result<()> {
713 let view_id = view_info.ident.table_id;
714
715 let view_name = TableNameKey::new(
717 &view_info.catalog_name,
718 &view_info.schema_name,
719 &view_info.name,
720 );
721 let create_table_name_txn = self
722 .table_name_manager()
723 .build_create_txn(&view_name, view_id)?;
724
725 let table_info_value = TableInfoValue::new(view_info);
727
728 let (create_table_info_txn, on_create_table_info_failure) = self
729 .table_info_manager()
730 .build_create_txn(view_id, &table_info_value)?;
731
732 let view_info_value = ViewInfoValue::new(
734 raw_logical_plan.into(),
735 table_names,
736 columns,
737 plan_columns,
738 definition,
739 );
740 let (create_view_info_txn, on_create_view_info_failure) = self
741 .view_info_manager()
742 .build_create_txn(view_id, &view_info_value)?;
743
744 let txn = Txn::merge_all(vec![
745 create_table_name_txn,
746 create_table_info_txn,
747 create_view_info_txn,
748 ]);
749
750 let mut r = self.kv_backend.txn(txn).await?;
751
752 if !r.succeeded {
754 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
755 let remote_table_info = on_create_table_info_failure(&mut set)?
756 .context(error::UnexpectedSnafu {
757 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
758 })?
759 .into_inner();
760
761 let remote_view_info = on_create_view_info_failure(&mut set)?
762 .context(error::UnexpectedSnafu {
763 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
764 })?
765 .into_inner();
766
767 let op_name = "the creating view metadata";
768 ensure_values!(remote_table_info, table_info_value, op_name);
769 ensure_values!(remote_view_info, view_info_value, op_name);
770 }
771
772 Ok(())
773 }
774
775 pub async fn create_table_metadata(
778 &self,
779 table_info: TableInfo,
780 table_route_value: TableRouteValue,
781 region_wal_options: HashMap<RegionNumber, String>,
782 ) -> Result<()> {
783 let table_id = table_info.ident.table_id;
784 let engine = table_info.meta.engine.clone();
785
786 let table_name = TableNameKey::new(
788 &table_info.catalog_name,
789 &table_info.schema_name,
790 &table_info.name,
791 );
792 let create_table_name_txn = self
793 .table_name_manager()
794 .build_create_txn(&table_name, table_id)?;
795
796 let region_options = table_info.to_region_options();
797 let table_info_value = TableInfoValue::new(table_info);
799 let (create_table_info_txn, on_create_table_info_failure) = self
800 .table_info_manager()
801 .build_create_txn(table_id, &table_info_value)?;
802
803 let (create_table_route_txn, on_create_table_route_failure) = self
804 .table_route_manager()
805 .table_route_storage()
806 .build_create_txn(table_id, &table_route_value)?;
807
808 let create_topic_region_txn = self
809 .topic_region_manager
810 .build_create_txn(table_id, ®ion_wal_options)?;
811
812 let mut txn = Txn::merge_all(vec![
813 create_table_name_txn,
814 create_table_info_txn,
815 create_table_route_txn,
816 create_topic_region_txn,
817 ]);
818
819 if let TableRouteValue::Physical(x) = &table_route_value {
820 let region_storage_path = table_info_value.region_storage_path();
821 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
822 table_id,
823 &engine,
824 ®ion_storage_path,
825 region_options,
826 region_wal_options,
827 region_distribution(&x.region_routes),
828 )?;
829 txn = txn.merge(create_datanode_table_txn);
830 }
831
832 let mut r = self.kv_backend.txn(txn).await?;
833
834 if !r.succeeded {
836 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
837 let remote_table_info = on_create_table_info_failure(&mut set)?
838 .context(error::UnexpectedSnafu {
839 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
840 })?
841 .into_inner();
842
843 let remote_table_route = on_create_table_route_failure(&mut set)?
844 .context(error::UnexpectedSnafu {
845 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
846 })?
847 .into_inner();
848
849 let op_name = "the creating table metadata";
850 ensure_values!(remote_table_info, table_info_value, op_name);
851 ensure_values!(remote_table_route, table_route_value, op_name);
852 }
853
854 Ok(())
855 }
856
857 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
858 self.kv_backend.max_txn_ops() / 3
861 }
862
863 pub async fn create_logical_tables_metadata(
865 &self,
866 tables_data: Vec<(TableInfo, TableRouteValue)>,
867 ) -> Result<()> {
868 let len = tables_data.len();
869 let mut txns = Vec::with_capacity(3 * len);
870 struct OnFailure<F1, R1, F2, R2>
871 where
872 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
873 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
874 {
875 table_info_value: TableInfoValue,
876 on_create_table_info_failure: F1,
877 table_route_value: TableRouteValue,
878 on_create_table_route_failure: F2,
879 }
880 let mut on_failures = Vec::with_capacity(len);
881 for (table_info, table_route_value) in tables_data {
882 let table_id = table_info.ident.table_id;
883
884 let table_name = TableNameKey::new(
886 &table_info.catalog_name,
887 &table_info.schema_name,
888 &table_info.name,
889 );
890 let create_table_name_txn = self
891 .table_name_manager()
892 .build_create_txn(&table_name, table_id)?;
893 txns.push(create_table_name_txn);
894
895 let table_info_value = TableInfoValue::new(table_info);
897 let (create_table_info_txn, on_create_table_info_failure) =
898 self.table_info_manager()
899 .build_create_txn(table_id, &table_info_value)?;
900 txns.push(create_table_info_txn);
901
902 let (create_table_route_txn, on_create_table_route_failure) = self
903 .table_route_manager()
904 .table_route_storage()
905 .build_create_txn(table_id, &table_route_value)?;
906 txns.push(create_table_route_txn);
907
908 on_failures.push(OnFailure {
909 table_info_value,
910 on_create_table_info_failure,
911 table_route_value,
912 on_create_table_route_failure,
913 });
914 }
915
916 let txn = Txn::merge_all(txns);
917 let mut r = self.kv_backend.txn(txn).await?;
918
919 if !r.succeeded {
921 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
922 for on_failure in on_failures {
923 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
924 .context(error::UnexpectedSnafu {
925 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
926 })?
927 .into_inner();
928
929 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
930 .context(error::UnexpectedSnafu {
931 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
932 })?
933 .into_inner();
934
935 let op_name = "the creating logical tables metadata";
936 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
937 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
938 }
939 }
940
941 Ok(())
942 }
943
944 fn table_metadata_keys(
945 &self,
946 table_id: TableId,
947 table_name: &TableName,
948 table_route_value: &TableRouteValue,
949 region_wal_options: &HashMap<RegionNumber, WalOptions>,
950 ) -> Result<Vec<Vec<u8>>> {
951 let datanode_ids = if table_route_value.is_physical() {
953 region_distribution(table_route_value.region_routes()?)
954 .into_keys()
955 .collect()
956 } else {
957 vec![]
958 };
959 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
960 let table_name = TableNameKey::new(
961 &table_name.catalog_name,
962 &table_name.schema_name,
963 &table_name.table_name,
964 );
965 let table_info_key = TableInfoKey::new(table_id);
966 let table_route_key = TableRouteKey::new(table_id);
967 let table_repart_key = TableRepartKey::new(table_id);
968 let datanode_table_keys = datanode_ids
969 .into_iter()
970 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
971 .collect::<HashSet<_>>();
972 let topic_region_map = self
973 .topic_region_manager
974 .get_topic_region_mapping(table_id, region_wal_options);
975 let topic_region_keys = topic_region_map
976 .iter()
977 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
978 .collect::<Vec<_>>();
979 keys.push(table_name.to_bytes());
980 keys.push(table_info_key.to_bytes());
981 keys.push(table_route_key.to_bytes());
982 keys.push(table_repart_key.to_bytes());
983 for key in &datanode_table_keys {
984 keys.push(key.to_bytes());
985 }
986 for key in topic_region_keys {
987 keys.push(key.to_bytes());
988 }
989 Ok(keys)
990 }
991
992 pub async fn delete_table_metadata(
995 &self,
996 table_id: TableId,
997 table_name: &TableName,
998 table_route_value: &TableRouteValue,
999 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1000 ) -> Result<()> {
1001 let keys =
1002 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1003 self.tombstone_manager.create(keys).await.map(|_| ())
1004 }
1005
1006 pub async fn list_dropped_tables(&self) -> Result<Vec<DroppedTableName>> {
1008 let mut stream = self.tombstone_manager.tombstoned_table_names();
1009 let mut dropped_tables = Vec::new();
1010
1011 while let Some(kv) = stream.try_next().await? {
1012 let raw_key = self.tombstone_manager.strip_tombstone_prefix(&kv.key)?;
1013 let table_name = TableNameKey::from_bytes(raw_key)?.into();
1014 let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1015 dropped_tables.push(DroppedTableName {
1016 table_id,
1017 table_name,
1018 });
1019 }
1020
1021 Ok(dropped_tables)
1022 }
1023
1024 pub async fn get_dropped_table(
1026 &self,
1027 table_name: &TableName,
1028 ) -> Result<Option<DroppedTableMetadata>> {
1029 let table_name_key = TableNameKey::from(table_name);
1030 let Some(kv) = self
1031 .tombstone_manager
1032 .get(&table_name_key.to_bytes())
1033 .await?
1034 else {
1035 return Ok(None);
1036 };
1037
1038 let table_id = TableNameValue::try_from_raw_value(&kv.value)?.table_id();
1039 self.get_dropped_table_metadata(table_id, table_name.clone())
1040 .await
1041 }
1042
1043 pub async fn get_dropped_table_by_id(
1045 &self,
1046 table_id: TableId,
1047 ) -> Result<Option<DroppedTableMetadata>> {
1048 self.get_dropped_table_metadata(table_id, None).await
1049 }
1050
1051 pub async fn delete_table_metadata_tombstone(
1054 &self,
1055 table_id: TableId,
1056 table_name: &TableName,
1057 table_route_value: &TableRouteValue,
1058 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1059 ) -> Result<()> {
1060 let table_metadata_keys =
1061 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1062 self.tombstone_manager
1063 .delete(table_metadata_keys)
1064 .await
1065 .map(|_| ())
1066 }
1067
1068 pub async fn restore_table_metadata(
1071 &self,
1072 table_id: TableId,
1073 table_name: &TableName,
1074 table_route_value: &TableRouteValue,
1075 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1076 ) -> Result<()> {
1077 let keys =
1078 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1079 self.tombstone_manager.restore(keys).await.map(|_| ())
1080 }
1081
1082 pub async fn destroy_table_metadata(
1085 &self,
1086 table_id: TableId,
1087 table_name: &TableName,
1088 table_route_value: &TableRouteValue,
1089 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1090 ) -> Result<()> {
1091 let keys =
1092 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1093 let _ = self
1094 .kv_backend
1095 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1096 .await?;
1097 Ok(())
1098 }
1099
1100 async fn get_dropped_table_metadata<T>(
1102 &self,
1103 table_id: TableId,
1104 table_name: T,
1105 ) -> Result<Option<DroppedTableMetadata>>
1106 where
1107 T: Into<Option<TableName>>,
1108 {
1109 let table_info_key = TableInfoKey::new(table_id);
1110 let Some(table_info_kv) = self
1111 .tombstone_manager
1112 .get(&table_info_key.to_bytes())
1113 .await?
1114 else {
1115 return Ok(None);
1116 };
1117
1118 let table_info_value = TableInfoValue::try_from_raw_value(&table_info_kv.value)?;
1119 let table_name = table_name
1120 .into()
1121 .unwrap_or_else(|| table_info_value.table_name());
1122
1123 let table_route_key = TableRouteKey::new(table_id);
1124 let table_route_kv = self
1125 .tombstone_manager
1126 .get(&table_route_key.to_bytes())
1127 .await?
1128 .with_context(|| error::UnexpectedSnafu {
1129 err_msg: format!("Missing tombstoned table route metadata for table id {table_id}"),
1130 })?;
1131 let mut table_route_value = TableRouteValue::try_from_raw_value(&table_route_kv.value)?;
1132 self.table_route_manager
1133 .table_route_storage()
1134 .remap_table_route(&mut table_route_value)
1135 .await?;
1136
1137 let region_wal_options = self
1138 .dropped_region_wal_options(table_id, &table_route_value)
1139 .await?;
1140
1141 Ok(Some(DroppedTableMetadata {
1142 table_id,
1143 table_name,
1144 table_info_value,
1145 table_route_value,
1146 region_wal_options,
1147 }))
1148 }
1149
1150 async fn dropped_region_wal_options(
1152 &self,
1153 table_id: TableId,
1154 table_route_value: &TableRouteValue,
1155 ) -> Result<HashMap<RegionNumber, WalOptions>> {
1156 let mut region_wal_options = HashMap::new();
1157 let datanode_table_keys = region_distribution(table_route_value.region_routes()?)
1158 .into_keys()
1159 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
1160 .collect::<Vec<_>>();
1161 let datanode_table_key_bytes = datanode_table_keys
1162 .iter()
1163 .map(|key| key.to_bytes())
1164 .collect::<Vec<_>>();
1165 let datanode_table_values = self
1166 .tombstone_manager
1167 .batch_get(&datanode_table_key_bytes)
1168 .await?;
1169
1170 for datanode_table_key in datanode_table_keys {
1171 let Some(kv) = datanode_table_values.get(&datanode_table_key.to_bytes()) else {
1172 continue;
1173 };
1174
1175 let datanode_table_value = DatanodeTableValue::try_from_raw_value(&kv.value)?;
1176 for (region_number, wal_options) in &datanode_table_value.region_info.region_wal_options
1177 {
1178 region_wal_options.insert(
1179 *region_number,
1180 serde_json::from_str(wal_options).context(error::SerdeJsonSnafu)?,
1181 );
1182 }
1183 }
1184
1185 Ok(region_wal_options)
1186 }
1187
1188 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1189 let mut keys = Vec::with_capacity(3);
1190 let view_name = TableNameKey::new(
1191 &view_name.catalog_name,
1192 &view_name.schema_name,
1193 &view_name.table_name,
1194 );
1195 let table_info_key = TableInfoKey::new(view_id);
1196 let view_info_key = ViewInfoKey::new(view_id);
1197 keys.push(view_name.to_bytes());
1198 keys.push(table_info_key.to_bytes());
1199 keys.push(view_info_key.to_bytes());
1200
1201 Ok(keys)
1202 }
1203
1204 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1207 let keys = self.view_info_keys(view_id, view_name)?;
1208 let _ = self
1209 .kv_backend
1210 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1211 .await?;
1212 Ok(())
1213 }
1214
1215 pub async fn rename_table(
1219 &self,
1220 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1221 new_table_name: String,
1222 ) -> Result<()> {
1223 let current_table_info = ¤t_table_info_value.table_info;
1224 let table_id = current_table_info.ident.table_id;
1225
1226 let table_name_key = TableNameKey::new(
1227 ¤t_table_info.catalog_name,
1228 ¤t_table_info.schema_name,
1229 ¤t_table_info.name,
1230 );
1231
1232 let new_table_name_key = TableNameKey::new(
1233 ¤t_table_info.catalog_name,
1234 ¤t_table_info.schema_name,
1235 &new_table_name,
1236 );
1237
1238 let update_table_name_txn = self.table_name_manager().build_update_txn(
1240 &table_name_key,
1241 &new_table_name_key,
1242 table_id,
1243 )?;
1244
1245 let new_table_info_value = current_table_info_value
1246 .inner
1247 .with_update(move |table_info| {
1248 table_info.name = new_table_name;
1249 });
1250
1251 let (update_table_info_txn, on_update_table_info_failure) = self
1253 .table_info_manager()
1254 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1255
1256 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1257
1258 let mut r = self.kv_backend.txn(txn).await?;
1259
1260 if !r.succeeded {
1262 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1263 let remote_table_info = on_update_table_info_failure(&mut set)?
1264 .context(error::UnexpectedSnafu {
1265 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1266 })?
1267 .into_inner();
1268
1269 let op_name = "the renaming table metadata";
1270 ensure_values!(remote_table_info, new_table_info_value, op_name);
1271 }
1272
1273 Ok(())
1274 }
1275
1276 pub async fn update_table_info(
1280 &self,
1281 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1282 region_distribution: Option<RegionDistribution>,
1283 new_table_info: TableInfo,
1284 ) -> Result<()> {
1285 let table_id = current_table_info_value.table_info.ident.table_id;
1286 let new_table_info_value = current_table_info_value.update(new_table_info);
1287
1288 let (update_table_info_txn, on_update_table_info_failure) = self
1290 .table_info_manager()
1291 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1292
1293 let txn = if let Some(region_distribution) = region_distribution {
1294 let new_region_options = new_table_info_value.table_info.to_region_options();
1296 let update_datanode_table_options_txn = self
1297 .datanode_table_manager
1298 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1299 .await?;
1300 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1301 } else {
1302 update_table_info_txn
1303 };
1304
1305 let mut r = self.kv_backend.txn(txn).await?;
1306 if !r.succeeded {
1308 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1309 let remote_table_info = on_update_table_info_failure(&mut set)?
1310 .context(error::UnexpectedSnafu {
1311 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1312 })?
1313 .into_inner();
1314
1315 let op_name = "the updating table info";
1316 ensure_values!(remote_table_info, new_table_info_value, op_name);
1317 }
1318 Ok(())
1319 }
1320
1321 #[allow(clippy::too_many_arguments)]
1332 pub async fn update_view_info(
1333 &self,
1334 view_id: TableId,
1335 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1336 new_view_info: Vec<u8>,
1337 table_names: HashSet<TableName>,
1338 columns: Vec<String>,
1339 plan_columns: Vec<String>,
1340 definition: String,
1341 ) -> Result<()> {
1342 let new_view_info_value = current_view_info_value.update(
1343 new_view_info.into(),
1344 table_names,
1345 columns,
1346 plan_columns,
1347 definition,
1348 );
1349
1350 let (update_view_info_txn, on_update_view_info_failure) = self
1352 .view_info_manager()
1353 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1354
1355 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1356
1357 if !r.succeeded {
1359 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1360 let remote_view_info = on_update_view_info_failure(&mut set)?
1361 .context(error::UnexpectedSnafu {
1362 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1363 })?
1364 .into_inner();
1365
1366 let op_name = "the updating view info";
1367 ensure_values!(remote_view_info, new_view_info_value, op_name);
1368 }
1369 Ok(())
1370 }
1371
1372 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1373 self.kv_backend.max_txn_ops()
1374 }
1375
1376 pub async fn batch_update_table_info_values(
1377 &self,
1378 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, TableInfo)>,
1379 ) -> Result<()> {
1380 let len = table_info_value_pairs.len();
1381 let mut txns = Vec::with_capacity(len);
1382 struct OnFailure<F, R>
1383 where
1384 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1385 {
1386 table_info_value: TableInfoValue,
1387 on_update_table_info_failure: F,
1388 }
1389 let mut on_failures = Vec::with_capacity(len);
1390
1391 for (table_info_value, new_table_info) in table_info_value_pairs {
1392 let table_id = table_info_value.table_info.ident.table_id;
1393
1394 let new_table_info_value = table_info_value.update(new_table_info);
1395
1396 let (update_table_info_txn, on_update_table_info_failure) =
1397 self.table_info_manager().build_update_txn(
1398 table_id,
1399 &table_info_value,
1400 &new_table_info_value,
1401 )?;
1402
1403 txns.push(update_table_info_txn);
1404
1405 on_failures.push(OnFailure {
1406 table_info_value: new_table_info_value,
1407 on_update_table_info_failure,
1408 });
1409 }
1410
1411 let txn = Txn::merge_all(txns);
1412 let mut r = self.kv_backend.txn(txn).await?;
1413
1414 if !r.succeeded {
1415 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1416 for on_failure in on_failures {
1417 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1418 .context(error::UnexpectedSnafu {
1419 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1420 })?
1421 .into_inner();
1422
1423 let op_name = "the batch updating table info";
1424 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1425 }
1426 }
1427
1428 Ok(())
1429 }
1430
1431 pub async fn update_table_route(
1432 &self,
1433 table_id: TableId,
1434 region_info: RegionInfo,
1435 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1436 new_region_routes: Vec<RegionRoute>,
1437 new_region_options: &HashMap<String, String>,
1438 new_region_wal_options: &HashMap<RegionNumber, String>,
1439 ) -> Result<()> {
1440 let current_region_distribution =
1442 region_distribution(current_table_route_value.region_routes()?);
1443 let new_region_distribution = region_distribution(&new_region_routes);
1444
1445 let update_topic_region_txn = self.topic_region_manager.build_update_txn(
1446 table_id,
1447 ®ion_info.region_wal_options,
1448 new_region_wal_options,
1449 )?;
1450 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1451 table_id,
1452 region_info,
1453 current_region_distribution,
1454 new_region_distribution,
1455 new_region_options,
1456 new_region_wal_options,
1457 )?;
1458
1459 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1461 let (update_table_route_txn, on_update_table_route_failure) = self
1462 .table_route_manager()
1463 .table_route_storage()
1464 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1465
1466 let txn = Txn::merge_all(vec![
1467 update_datanode_table_txn,
1468 update_table_route_txn,
1469 update_topic_region_txn,
1470 ]);
1471
1472 let mut r = self.kv_backend.txn(txn).await?;
1473
1474 if !r.succeeded {
1476 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1477 let remote_table_route = on_update_table_route_failure(&mut set)?
1478 .context(error::UnexpectedSnafu {
1479 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1480 })?
1481 .into_inner();
1482
1483 let op_name = "the updating table route";
1484 ensure_values!(remote_table_route, new_table_route_value, op_name);
1485 }
1486
1487 Ok(())
1488 }
1489
1490 pub async fn update_leader_region_status<F>(
1492 &self,
1493 table_id: TableId,
1494 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1495 next_region_route_status: F,
1496 ) -> Result<()>
1497 where
1498 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1499 {
1500 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1501
1502 let mut updated = 0;
1503 for route in &mut new_region_routes {
1504 if let Some(state) = next_region_route_status(route)
1505 && route.set_leader_state(state)
1506 {
1507 updated += 1;
1508 }
1509 }
1510
1511 if updated == 0 {
1512 warn!("No leader status updated");
1513 return Ok(());
1514 }
1515
1516 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1518
1519 let (update_table_route_txn, on_update_table_route_failure) = self
1520 .table_route_manager()
1521 .table_route_storage()
1522 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1523
1524 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1525
1526 if !r.succeeded {
1528 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1529 let remote_table_route = on_update_table_route_failure(&mut set)?
1530 .context(error::UnexpectedSnafu {
1531 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1532 })?
1533 .into_inner();
1534
1535 let op_name = "the updating leader region status";
1536 ensure_values!(remote_table_route, new_table_route_value, op_name);
1537 }
1538
1539 Ok(())
1540 }
1541}
1542
1543#[macro_export]
1544macro_rules! impl_metadata_value {
1545 ($($val_ty: ty), *) => {
1546 $(
1547 impl $crate::key::MetadataValue for $val_ty {
1548 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1549 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1550 }
1551
1552 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1553 serde_json::to_vec(self).context(SerdeJsonSnafu)
1554 }
1555 }
1556 )*
1557 }
1558}
1559
1560macro_rules! impl_metadata_key_get_txn_op {
1561 ($($key: ty), *) => {
1562 $(
1563 impl $crate::key::MetadataKeyGetTxnOp for $key {
1564 fn build_get_op(
1567 &self,
1568 ) -> (
1569 TxnOp,
1570 impl for<'a> FnMut(
1571 &'a mut TxnOpGetResponseSet,
1572 ) -> Option<Vec<u8>>,
1573 ) {
1574 let raw_key = self.to_bytes();
1575 (
1576 TxnOp::Get(raw_key.clone()),
1577 TxnOpGetResponseSet::filter(raw_key),
1578 )
1579 }
1580 }
1581 )*
1582 }
1583}
1584
1585impl_metadata_key_get_txn_op! {
1586 TableNameKey<'_>,
1587 TableInfoKey,
1588 ViewInfoKey,
1589 TableRouteKey,
1590 DatanodeTableKey
1591}
1592
1593#[macro_export]
1594macro_rules! impl_optional_metadata_value {
1595 ($($val_ty: ty), *) => {
1596 $(
1597 impl $val_ty {
1598 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1599 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1600 }
1601
1602 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1603 serde_json::to_vec(self).context(SerdeJsonSnafu)
1604 }
1605 }
1606 )*
1607 }
1608}
1609
1610impl_metadata_value! {
1611 TableNameValue,
1612 TableInfoValue,
1613 ViewInfoValue,
1614 DatanodeTableValue,
1615 FlowInfoValue,
1616 FlowNameValue,
1617 FlowRouteValue,
1618 TableFlowValue,
1619 NodeAddressValue,
1620 SchemaNameValue,
1621 FlowStateValue,
1622 PoisonValue,
1623 TopicRegionValue
1624}
1625
1626impl_optional_metadata_value! {
1627 CatalogNameValue,
1628 SchemaNameValue
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633 use std::collections::{BTreeMap, HashMap, HashSet};
1634 use std::sync::Arc;
1635
1636 use bytes::Bytes;
1637 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1638 use common_time::util::current_time_millis;
1639 use common_wal::options::{KafkaWalOptions, WalOptions};
1640 use futures::TryStreamExt;
1641 use store_api::storage::{RegionId, RegionNumber};
1642 use table::metadata::TableInfo;
1643 use table::table_name::TableName;
1644
1645 use super::datanode_table::DatanodeTableKey;
1646 use super::test_utils;
1647 use crate::ddl::allocator::wal_options::WalOptionsAllocator;
1648 use crate::ddl::test_util::create_table::test_create_table_task;
1649 use crate::ddl::utils::region_storage_path;
1650 use crate::error::Result;
1651 use crate::key::datanode_table::RegionInfo;
1652 use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
1653 use crate::key::table_info::TableInfoValue;
1654 use crate::key::table_name::TableNameKey;
1655 use crate::key::table_route::TableRouteValue;
1656 use crate::key::topic_region::TopicRegionKey;
1657 use crate::key::{
1658 DeserializedValueWithBytes, MetadataValue, RegionDistribution, RegionRoleSet,
1659 TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue,
1660 };
1661 use crate::kv_backend::KvBackend;
1662 use crate::kv_backend::memory::MemoryKvBackend;
1663 use crate::kv_backend::read_only::ReadOnlyKvBackend;
1664 use crate::peer::Peer;
1665 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1666 use crate::rpc::store::{PutRequest, RangeRequest};
1667 use crate::wal_provider::WalProvider;
1668
1669 #[test]
1670 fn test_deserialized_value_with_bytes() {
1671 let region_route = new_test_region_route();
1672 let region_routes = vec![region_route.clone()];
1673
1674 let expected_region_routes =
1675 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1676 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1677
1678 let value = DeserializedValueWithBytes {
1681 inner: TableRouteValue::physical(region_routes.clone()),
1683 bytes: Bytes::from(expected.clone()),
1684 };
1685
1686 let encoded = serde_json::to_vec(&value).unwrap();
1687
1688 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1691 serde_json::from_slice(&encoded).unwrap();
1692
1693 assert_eq!(decoded.inner, expected_region_routes);
1694 assert_eq!(decoded.bytes, expected);
1695 }
1696
1697 fn new_test_region_route() -> RegionRoute {
1698 new_region_route(1, 2)
1699 }
1700
1701 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1702 RegionRoute {
1703 region: Region {
1704 id: region_id.into(),
1705 name: "r1".to_string(),
1706 attrs: BTreeMap::new(),
1707 partition_expr: Default::default(),
1708 },
1709 leader_peer: Some(Peer::new(datanode, "a2")),
1710 follower_peers: vec![],
1711 leader_state: None,
1712 leader_down_since: None,
1713 write_route_policy: None,
1714 }
1715 }
1716
1717 fn new_test_table_info() -> TableInfo {
1718 test_utils::new_test_table_info(10)
1719 }
1720
1721 fn new_test_table_names() -> HashSet<TableName> {
1722 let mut set = HashSet::new();
1723 set.insert(TableName {
1724 catalog_name: "greptime".to_string(),
1725 schema_name: "public".to_string(),
1726 table_name: "a_table".to_string(),
1727 });
1728 set.insert(TableName {
1729 catalog_name: "greptime".to_string(),
1730 schema_name: "public".to_string(),
1731 table_name: "b_table".to_string(),
1732 });
1733 set
1734 }
1735
1736 async fn create_physical_table_metadata(
1737 table_metadata_manager: &TableMetadataManager,
1738 table_info: TableInfo,
1739 region_routes: Vec<RegionRoute>,
1740 region_wal_options: HashMap<RegionNumber, String>,
1741 ) -> Result<()> {
1742 table_metadata_manager
1743 .create_table_metadata(
1744 table_info,
1745 TableRouteValue::physical(region_routes),
1746 region_wal_options,
1747 )
1748 .await
1749 }
1750
1751 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1752 let topics = (0..2)
1753 .map(|i| format!("greptimedb_topic{}", i))
1754 .collect::<Vec<_>>();
1755 let wal_options = topics
1756 .iter()
1757 .map(|topic| {
1758 WalOptions::Kafka(KafkaWalOptions {
1759 topic: topic.clone(),
1760 })
1761 })
1762 .collect::<Vec<_>>();
1763
1764 (0..16)
1765 .enumerate()
1766 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1767 .collect()
1768 }
1769
1770 fn create_mixed_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1771 HashMap::from([
1772 (
1773 0,
1774 WalOptions::Kafka(KafkaWalOptions {
1775 topic: "greptimedb_topic0".to_string(),
1776 }),
1777 ),
1778 (1, WalOptions::RaftEngine),
1779 (2, WalOptions::Noop),
1780 (
1781 3,
1782 WalOptions::Kafka(KafkaWalOptions {
1783 topic: "greptimedb_topic1".to_string(),
1784 }),
1785 ),
1786 ])
1787 }
1788
1789 #[tokio::test]
1790 async fn test_raft_engine_topic_region_map() {
1791 let mem_kv = Arc::new(MemoryKvBackend::default());
1792 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1793 let region_route = new_test_region_route();
1794 let region_routes = &vec![region_route.clone()];
1795 let table_info = new_test_table_info();
1796 let wal_provider = WalProvider::RaftEngine;
1797 let regions: Vec<_> = (0..16).collect();
1798 let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap();
1799 create_physical_table_metadata(
1800 &table_metadata_manager,
1801 table_info.clone(),
1802 region_routes.clone(),
1803 region_wal_options.clone(),
1804 )
1805 .await
1806 .unwrap();
1807
1808 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1809 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1810 let resp = mem_kv.range(range_req).await.unwrap();
1811 assert!(resp.kvs.is_empty());
1813 }
1814
1815 #[tokio::test]
1816 async fn test_create_table_metadata() {
1817 let mem_kv = Arc::new(MemoryKvBackend::default());
1818 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1819 let region_route = new_test_region_route();
1820 let region_routes = &vec![region_route.clone()];
1821 let table_info = new_test_table_info();
1822 let region_wal_options = create_mock_region_wal_options()
1823 .into_iter()
1824 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1825 .collect::<HashMap<_, _>>();
1826
1827 create_physical_table_metadata(
1829 &table_metadata_manager,
1830 table_info.clone(),
1831 region_routes.clone(),
1832 region_wal_options.clone(),
1833 )
1834 .await
1835 .unwrap();
1836
1837 assert!(
1839 create_physical_table_metadata(
1840 &table_metadata_manager,
1841 table_info.clone(),
1842 region_routes.clone(),
1843 region_wal_options.clone(),
1844 )
1845 .await
1846 .is_ok()
1847 );
1848
1849 let mut modified_region_routes = region_routes.clone();
1850 modified_region_routes.push(region_route.clone());
1851 assert!(
1853 create_physical_table_metadata(
1854 &table_metadata_manager,
1855 table_info.clone(),
1856 modified_region_routes,
1857 region_wal_options.clone(),
1858 )
1859 .await
1860 .is_err()
1861 );
1862
1863 let (remote_table_info, remote_table_route) = table_metadata_manager
1864 .get_full_table_info(10)
1865 .await
1866 .unwrap();
1867
1868 assert_eq!(
1869 remote_table_info.unwrap().into_inner().table_info,
1870 table_info
1871 );
1872 assert_eq!(
1873 remote_table_route
1874 .unwrap()
1875 .into_inner()
1876 .region_routes()
1877 .unwrap(),
1878 region_routes
1879 );
1880
1881 for i in 0..2 {
1882 let region_number = i as u32;
1883 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1884 let topic = format!("greptimedb_topic{}", i);
1885 let regions = table_metadata_manager
1886 .topic_region_manager
1887 .regions(&topic)
1888 .await
1889 .unwrap()
1890 .into_keys()
1891 .collect::<Vec<_>>();
1892 assert_eq!(regions.len(), 8);
1893 assert!(regions.contains(®ion_id));
1894 }
1895 }
1896
1897 #[tokio::test]
1898 async fn test_get_full_table_info_remaps_route_address() {
1899 let mem_kv = Arc::new(MemoryKvBackend::default());
1900 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1901
1902 let mut region_route = new_test_region_route();
1903 region_route.follower_peers = vec![Peer::empty(3)];
1904 let region_routes = vec![region_route];
1905 let table_info = new_test_table_info();
1906 let table_id = table_info.ident.table_id;
1907
1908 create_physical_table_metadata(
1909 &table_metadata_manager,
1910 table_info,
1911 region_routes,
1912 HashMap::new(),
1913 )
1914 .await
1915 .unwrap();
1916
1917 mem_kv
1918 .put(PutRequest {
1919 key: NodeAddressKey::with_datanode(2).to_string().into_bytes(),
1920 value: NodeAddressValue::new(Peer::new(2, "new-a2"))
1921 .try_as_raw_value()
1922 .unwrap(),
1923 ..Default::default()
1924 })
1925 .await
1926 .unwrap();
1927 mem_kv
1928 .put(PutRequest {
1929 key: NodeAddressKey::with_datanode(3).to_string().into_bytes(),
1930 value: NodeAddressValue::new(Peer::new(3, "new-a3"))
1931 .try_as_raw_value()
1932 .unwrap(),
1933 ..Default::default()
1934 })
1935 .await
1936 .unwrap();
1937
1938 let (_, table_route) = table_metadata_manager
1939 .get_full_table_info(table_id)
1940 .await
1941 .unwrap();
1942 let table_route = table_route.unwrap().into_inner();
1943 let region_routes = table_route.region_routes().unwrap();
1944
1945 assert_eq!(
1946 region_routes[0].leader_peer.as_ref().unwrap().addr,
1947 "new-a2"
1948 );
1949 assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
1950 }
1951
1952 #[tokio::test]
1953 async fn test_get_full_table_info_with_read_only_kv_backend() {
1954 let mem_kv = Arc::new(MemoryKvBackend::default());
1955 let writable_manager = TableMetadataManager::new(mem_kv.clone());
1956
1957 let region_routes = vec![new_test_region_route()];
1958 let table_info = new_test_table_info();
1959 let table_id = table_info.ident.table_id;
1960
1961 create_physical_table_metadata(
1962 &writable_manager,
1963 table_info.clone(),
1964 region_routes.clone(),
1965 HashMap::new(),
1966 )
1967 .await
1968 .unwrap();
1969
1970 let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
1971 let read_only_manager = TableMetadataManager::new(read_only_kv);
1972
1973 let (remote_table_info, remote_table_route) = read_only_manager
1974 .get_full_table_info(table_id)
1975 .await
1976 .unwrap();
1977
1978 assert_eq!(
1979 remote_table_info.unwrap().into_inner().table_info,
1980 table_info
1981 );
1982 assert_eq!(
1983 remote_table_route
1984 .unwrap()
1985 .into_inner()
1986 .region_routes()
1987 .unwrap(),
1988 ®ion_routes
1989 );
1990 }
1991
1992 #[tokio::test]
1993 async fn test_create_logic_tables_metadata() {
1994 let mem_kv = Arc::new(MemoryKvBackend::default());
1995 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1996 let region_route = new_test_region_route();
1997 let region_routes = vec![region_route.clone()];
1998 let table_info = new_test_table_info();
1999 let table_id = table_info.ident.table_id;
2000 let table_route_value = TableRouteValue::physical(region_routes.clone());
2001
2002 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
2003 table_metadata_manager
2005 .create_logical_tables_metadata(tables_data.clone())
2006 .await
2007 .unwrap();
2008
2009 assert!(
2011 table_metadata_manager
2012 .create_logical_tables_metadata(tables_data)
2013 .await
2014 .is_ok()
2015 );
2016
2017 let mut modified_region_routes = region_routes.clone();
2018 modified_region_routes.push(new_region_route(2, 3));
2019 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
2020 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
2021 assert!(
2023 table_metadata_manager
2024 .create_logical_tables_metadata(modified_tables_data)
2025 .await
2026 .is_err()
2027 );
2028
2029 let (remote_table_info, remote_table_route) = table_metadata_manager
2030 .get_full_table_info(table_id)
2031 .await
2032 .unwrap();
2033
2034 assert_eq!(
2035 remote_table_info.unwrap().into_inner().table_info,
2036 table_info
2037 );
2038 assert_eq!(
2039 remote_table_route
2040 .unwrap()
2041 .into_inner()
2042 .region_routes()
2043 .unwrap(),
2044 ®ion_routes
2045 );
2046 }
2047
2048 #[tokio::test]
2049 async fn test_create_many_logical_tables_metadata() {
2050 let kv_backend = Arc::new(MemoryKvBackend::default());
2051 let table_metadata_manager = TableMetadataManager::new(kv_backend);
2052
2053 let mut tables_data = vec![];
2054 for i in 0..128 {
2055 let table_id = i + 1;
2056 let regin_number = table_id * 3;
2057 let region_id = RegionId::new(table_id, regin_number);
2058 let region_route = new_region_route(region_id.as_u64(), 2);
2059 let region_routes = vec![region_route.clone()];
2060 let table_info = test_utils::new_test_table_info_with_name(
2061 table_id,
2062 &format!("my_table_{}", table_id),
2063 );
2064 let table_route_value = TableRouteValue::physical(region_routes.clone());
2065
2066 tables_data.push((table_info, table_route_value));
2067 }
2068
2069 table_metadata_manager
2071 .create_logical_tables_metadata(tables_data)
2072 .await
2073 .unwrap();
2074 }
2075
2076 #[tokio::test]
2077 async fn test_delete_table_metadata() {
2078 let mem_kv = Arc::new(MemoryKvBackend::default());
2079 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2080 let region_route = new_test_region_route();
2081 let region_routes = &vec![region_route.clone()];
2082 let table_info = new_test_table_info();
2083 let table_id = table_info.ident.table_id;
2084 let datanode_id = 2;
2085 let region_wal_options = create_mock_region_wal_options();
2086 let serialized_region_wal_options = region_wal_options
2087 .iter()
2088 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2089 .collect::<HashMap<_, _>>();
2090
2091 create_physical_table_metadata(
2093 &table_metadata_manager,
2094 table_info.clone(),
2095 region_routes.clone(),
2096 serialized_region_wal_options,
2097 )
2098 .await
2099 .unwrap();
2100
2101 let table_name = TableName::new(
2102 table_info.catalog_name,
2103 table_info.schema_name,
2104 table_info.name,
2105 );
2106 let table_route_value = &TableRouteValue::physical(region_routes.clone());
2107 table_metadata_manager
2109 .delete_table_metadata(
2110 table_id,
2111 &table_name,
2112 table_route_value,
2113 ®ion_wal_options,
2114 )
2115 .await
2116 .unwrap();
2117 table_metadata_manager
2119 .delete_table_metadata(
2120 table_id,
2121 &table_name,
2122 table_route_value,
2123 ®ion_wal_options,
2124 )
2125 .await
2126 .unwrap();
2127 assert!(
2128 table_metadata_manager
2129 .table_info_manager()
2130 .get(table_id)
2131 .await
2132 .unwrap()
2133 .is_none()
2134 );
2135 assert!(
2136 table_metadata_manager
2137 .table_route_manager()
2138 .table_route_storage()
2139 .get(table_id)
2140 .await
2141 .unwrap()
2142 .is_none()
2143 );
2144 assert!(
2145 table_metadata_manager
2146 .datanode_table_manager()
2147 .tables(datanode_id)
2148 .try_collect::<Vec<_>>()
2149 .await
2150 .unwrap()
2151 .is_empty()
2152 );
2153 let table_info = table_metadata_manager
2155 .table_info_manager()
2156 .get(table_id)
2157 .await
2158 .unwrap();
2159 assert!(table_info.is_none());
2160 let table_route = table_metadata_manager
2161 .table_route_manager()
2162 .table_route_storage()
2163 .get(table_id)
2164 .await
2165 .unwrap();
2166 assert!(table_route.is_none());
2167 let regions = table_metadata_manager
2169 .topic_region_manager
2170 .regions("greptimedb_topic0")
2171 .await
2172 .unwrap();
2173 assert_eq!(regions.len(), 0);
2174 let regions = table_metadata_manager
2175 .topic_region_manager
2176 .regions("greptimedb_topic1")
2177 .await
2178 .unwrap();
2179 assert_eq!(regions.len(), 0);
2180 }
2181
2182 #[tokio::test]
2183 async fn test_rename_table() {
2184 let mem_kv = Arc::new(MemoryKvBackend::default());
2185 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2186 let region_route = new_test_region_route();
2187 let region_routes = vec![region_route.clone()];
2188 let table_info = new_test_table_info();
2189 let table_id = table_info.ident.table_id;
2190 create_physical_table_metadata(
2192 &table_metadata_manager,
2193 table_info.clone(),
2194 region_routes.clone(),
2195 HashMap::new(),
2196 )
2197 .await
2198 .unwrap();
2199
2200 let new_table_name = "another_name".to_string();
2201 let table_info_value =
2202 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2203
2204 table_metadata_manager
2205 .rename_table(&table_info_value, new_table_name.clone())
2206 .await
2207 .unwrap();
2208 table_metadata_manager
2210 .rename_table(&table_info_value, new_table_name.clone())
2211 .await
2212 .unwrap();
2213 let mut modified_table_info = table_info.clone();
2214 modified_table_info.name = "hi".to_string();
2215 let modified_table_info_value =
2216 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
2217 assert!(
2220 table_metadata_manager
2221 .rename_table(&modified_table_info_value, new_table_name.clone())
2222 .await
2223 .is_err()
2224 );
2225
2226 let old_table_name = TableNameKey::new(
2227 &table_info.catalog_name,
2228 &table_info.schema_name,
2229 &table_info.name,
2230 );
2231 let new_table_name = TableNameKey::new(
2232 &table_info.catalog_name,
2233 &table_info.schema_name,
2234 &new_table_name,
2235 );
2236
2237 assert!(
2238 table_metadata_manager
2239 .table_name_manager()
2240 .get(old_table_name)
2241 .await
2242 .unwrap()
2243 .is_none()
2244 );
2245
2246 assert_eq!(
2247 table_metadata_manager
2248 .table_name_manager()
2249 .get(new_table_name)
2250 .await
2251 .unwrap()
2252 .unwrap()
2253 .table_id(),
2254 table_id
2255 );
2256 }
2257
2258 #[tokio::test]
2259 async fn test_update_table_info() {
2260 let mem_kv = Arc::new(MemoryKvBackend::default());
2261 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2262 let region_route = new_test_region_route();
2263 let region_routes = vec![region_route.clone()];
2264 let table_info = new_test_table_info();
2265 let table_id = table_info.ident.table_id;
2266 create_physical_table_metadata(
2268 &table_metadata_manager,
2269 table_info.clone(),
2270 region_routes.clone(),
2271 HashMap::new(),
2272 )
2273 .await
2274 .unwrap();
2275
2276 let mut new_table_info = table_info.clone();
2277 new_table_info.name = "hi".to_string();
2278 let current_table_info_value =
2279 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
2280 table_metadata_manager
2282 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2283 .await
2284 .unwrap();
2285 table_metadata_manager
2287 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
2288 .await
2289 .unwrap();
2290
2291 let updated_table_info = table_metadata_manager
2293 .table_info_manager()
2294 .get(table_id)
2295 .await
2296 .unwrap()
2297 .unwrap()
2298 .into_inner();
2299 assert_eq!(updated_table_info.table_info, new_table_info);
2300
2301 let mut wrong_table_info = table_info.clone();
2302 wrong_table_info.name = "wrong".to_string();
2303 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2304 current_table_info_value.update(wrong_table_info),
2305 );
2306 assert!(
2309 table_metadata_manager
2310 .update_table_info(&wrong_table_info_value, None, new_table_info)
2311 .await
2312 .is_err()
2313 )
2314 }
2315
2316 #[tokio::test]
2317 async fn test_update_table_leader_region_status() {
2318 let mem_kv = Arc::new(MemoryKvBackend::default());
2319 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2320 let datanode = 1;
2321 let region_routes = vec![
2322 RegionRoute {
2323 region: Region {
2324 id: 1.into(),
2325 name: "r1".to_string(),
2326 attrs: BTreeMap::new(),
2327 partition_expr: Default::default(),
2328 },
2329 leader_peer: Some(Peer::new(datanode, "a2")),
2330 leader_state: Some(LeaderState::Downgrading),
2331 follower_peers: vec![],
2332 leader_down_since: Some(current_time_millis()),
2333 write_route_policy: None,
2334 },
2335 RegionRoute {
2336 region: Region {
2337 id: 2.into(),
2338 name: "r2".to_string(),
2339 attrs: BTreeMap::new(),
2340 partition_expr: Default::default(),
2341 },
2342 leader_peer: Some(Peer::new(datanode, "a1")),
2343 leader_state: None,
2344 follower_peers: vec![],
2345 leader_down_since: None,
2346 write_route_policy: None,
2347 },
2348 ];
2349 let table_info = new_test_table_info();
2350 let table_id = table_info.ident.table_id;
2351 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2352 TableRouteValue::physical(region_routes.clone()),
2353 );
2354
2355 create_physical_table_metadata(
2357 &table_metadata_manager,
2358 table_info.clone(),
2359 region_routes.clone(),
2360 HashMap::new(),
2361 )
2362 .await
2363 .unwrap();
2364
2365 table_metadata_manager
2366 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2367 if region_route.leader_state.is_some() {
2368 None
2369 } else {
2370 Some(Some(LeaderState::Downgrading))
2371 }
2372 })
2373 .await
2374 .unwrap();
2375
2376 let updated_route_value = table_metadata_manager
2377 .table_route_manager()
2378 .table_route_storage()
2379 .get(table_id)
2380 .await
2381 .unwrap()
2382 .unwrap();
2383
2384 assert_eq!(
2385 updated_route_value.region_routes().unwrap()[0].leader_state,
2386 Some(LeaderState::Downgrading)
2387 );
2388
2389 assert!(
2390 updated_route_value.region_routes().unwrap()[0]
2391 .leader_down_since
2392 .is_some()
2393 );
2394
2395 assert_eq!(
2396 updated_route_value.region_routes().unwrap()[1].leader_state,
2397 Some(LeaderState::Downgrading)
2398 );
2399 assert!(
2400 updated_route_value.region_routes().unwrap()[1]
2401 .leader_down_since
2402 .is_some()
2403 );
2404 }
2405
2406 async fn assert_datanode_table(
2407 table_metadata_manager: &TableMetadataManager,
2408 table_id: u32,
2409 region_routes: &[RegionRoute],
2410 ) {
2411 let region_distribution = region_distribution(region_routes);
2412 for (datanode, regions) in region_distribution {
2413 let got = table_metadata_manager
2414 .datanode_table_manager()
2415 .get(&DatanodeTableKey::new(datanode, table_id))
2416 .await
2417 .unwrap()
2418 .unwrap();
2419
2420 assert_eq!(got.regions, regions.leader_regions);
2421 assert_eq!(got.follower_regions, regions.follower_regions);
2422 }
2423 }
2424
2425 #[tokio::test]
2426 async fn test_update_table_route() {
2427 let mem_kv = Arc::new(MemoryKvBackend::default());
2428 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2429 let region_route = new_test_region_route();
2430 let region_routes = vec![region_route.clone()];
2431 let table_info = new_test_table_info();
2432 let table_id = table_info.ident.table_id;
2433 let engine = table_info.meta.engine.as_str();
2434 let region_storage_path =
2435 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2436 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2437 TableRouteValue::physical(region_routes.clone()),
2438 );
2439
2440 create_physical_table_metadata(
2442 &table_metadata_manager,
2443 table_info.clone(),
2444 region_routes.clone(),
2445 HashMap::new(),
2446 )
2447 .await
2448 .unwrap();
2449
2450 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2451 let new_region_routes = vec![
2452 new_region_route(1, 1),
2453 new_region_route(2, 2),
2454 new_region_route(3, 3),
2455 ];
2456 table_metadata_manager
2458 .update_table_route(
2459 table_id,
2460 RegionInfo {
2461 engine: engine.to_string(),
2462 region_storage_path: region_storage_path.clone(),
2463 region_options: HashMap::new(),
2464 region_wal_options: HashMap::new(),
2465 },
2466 ¤t_table_route_value,
2467 new_region_routes.clone(),
2468 &HashMap::new(),
2469 &HashMap::new(),
2470 )
2471 .await
2472 .unwrap();
2473 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2474
2475 table_metadata_manager
2477 .update_table_route(
2478 table_id,
2479 RegionInfo {
2480 engine: engine.to_string(),
2481 region_storage_path: region_storage_path.clone(),
2482 region_options: HashMap::new(),
2483 region_wal_options: HashMap::new(),
2484 },
2485 ¤t_table_route_value,
2486 new_region_routes.clone(),
2487 &HashMap::new(),
2488 &HashMap::new(),
2489 )
2490 .await
2491 .unwrap();
2492
2493 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2494 current_table_route_value
2495 .inner
2496 .update(new_region_routes.clone())
2497 .unwrap(),
2498 );
2499 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2500 table_metadata_manager
2502 .update_table_route(
2503 table_id,
2504 RegionInfo {
2505 engine: engine.to_string(),
2506 region_storage_path: region_storage_path.clone(),
2507 region_options: HashMap::new(),
2508 region_wal_options: HashMap::new(),
2509 },
2510 ¤t_table_route_value,
2511 new_region_routes.clone(),
2512 &HashMap::new(),
2513 &HashMap::new(),
2514 )
2515 .await
2516 .unwrap();
2517 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2518
2519 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2522 current_table_route_value
2523 .update(vec![
2524 new_region_route(1, 1),
2525 new_region_route(2, 2),
2526 new_region_route(3, 3),
2527 new_region_route(4, 4),
2528 ])
2529 .unwrap(),
2530 );
2531 assert!(
2532 table_metadata_manager
2533 .update_table_route(
2534 table_id,
2535 RegionInfo {
2536 engine: engine.to_string(),
2537 region_storage_path: region_storage_path.clone(),
2538 region_options: HashMap::new(),
2539 region_wal_options: HashMap::new(),
2540 },
2541 &wrong_table_route_value,
2542 new_region_routes,
2543 &HashMap::new(),
2544 &HashMap::new(),
2545 )
2546 .await
2547 .is_err()
2548 );
2549 }
2550
2551 #[tokio::test]
2552 async fn test_update_table_route_with_topic_region_mapping() {
2553 let mem_kv = Arc::new(MemoryKvBackend::default());
2554 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2555 let region_route = new_test_region_route();
2556 let region_routes = vec![region_route.clone()];
2557 let table_info = new_test_table_info();
2558 let table_id = table_info.ident.table_id;
2559 let engine = table_info.meta.engine.as_str();
2560 let region_storage_path =
2561 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2562
2563 let old_region_wal_options: HashMap<RegionNumber, String> = vec![
2565 (
2566 1,
2567 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2568 topic: "topic_1".to_string(),
2569 }))
2570 .unwrap(),
2571 ),
2572 (
2573 2,
2574 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2575 topic: "topic_2".to_string(),
2576 }))
2577 .unwrap(),
2578 ),
2579 ]
2580 .into_iter()
2581 .collect();
2582
2583 create_physical_table_metadata(
2584 &table_metadata_manager,
2585 table_info.clone(),
2586 region_routes.clone(),
2587 old_region_wal_options.clone(),
2588 )
2589 .await
2590 .unwrap();
2591
2592 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2593 TableRouteValue::physical(region_routes.clone()),
2594 );
2595
2596 let region_id_1 = RegionId::new(table_id, 1);
2598 let region_id_2 = RegionId::new(table_id, 2);
2599 let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
2600 let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
2601 assert!(
2602 table_metadata_manager
2603 .topic_region_manager
2604 .get(topic_1_key.clone())
2605 .await
2606 .unwrap()
2607 .is_some()
2608 );
2609 assert!(
2610 table_metadata_manager
2611 .topic_region_manager
2612 .get(topic_2_key.clone())
2613 .await
2614 .unwrap()
2615 .is_some()
2616 );
2617
2618 let new_region_routes = vec![
2620 new_region_route(1, 1),
2621 new_region_route(2, 2),
2622 new_region_route(3, 3), ];
2624 let new_region_wal_options: HashMap<RegionNumber, String> = vec![
2625 (
2626 1,
2627 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2628 topic: "topic_1".to_string(), }))
2630 .unwrap(),
2631 ),
2632 (
2633 2,
2634 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2635 topic: "topic_2".to_string(), }))
2637 .unwrap(),
2638 ),
2639 (
2640 3,
2641 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2642 topic: "topic_3".to_string(), }))
2644 .unwrap(),
2645 ),
2646 ]
2647 .into_iter()
2648 .collect();
2649 let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
2650 current_table_route_value
2651 .inner
2652 .update(new_region_routes.clone())
2653 .unwrap(),
2654 );
2655 table_metadata_manager
2656 .update_table_route(
2657 table_id,
2658 RegionInfo {
2659 engine: engine.to_string(),
2660 region_storage_path: region_storage_path.clone(),
2661 region_options: HashMap::new(),
2662 region_wal_options: old_region_wal_options.clone(),
2663 },
2664 ¤t_table_route_value,
2665 new_region_routes.clone(),
2666 &HashMap::new(),
2667 &new_region_wal_options,
2668 )
2669 .await
2670 .unwrap();
2671 let region_id_3 = RegionId::new(table_id, 3);
2673 let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
2674 assert!(
2675 table_metadata_manager
2676 .topic_region_manager
2677 .get(topic_3_key)
2678 .await
2679 .unwrap()
2680 .is_some()
2681 );
2682 let newer_region_routes = vec![
2684 new_region_route(1, 1),
2685 ];
2688 let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
2689 (
2690 1,
2691 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2692 topic: "topic_1".to_string(), }))
2694 .unwrap(),
2695 ),
2696 (
2697 3,
2698 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
2699 topic: "topic_3_new".to_string(), }))
2701 .unwrap(),
2702 ),
2703 ]
2704 .into_iter()
2705 .collect();
2706 table_metadata_manager
2707 .update_table_route(
2708 table_id,
2709 RegionInfo {
2710 engine: engine.to_string(),
2711 region_storage_path: region_storage_path.clone(),
2712 region_options: HashMap::new(),
2713 region_wal_options: new_region_wal_options.clone(),
2714 },
2715 ¤t_table_route_value_updated,
2716 newer_region_routes.clone(),
2717 &HashMap::new(),
2718 &newer_region_wal_options,
2719 )
2720 .await
2721 .unwrap();
2722 let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
2724 assert!(
2725 table_metadata_manager
2726 .topic_region_manager
2727 .get(topic_2_key_new)
2728 .await
2729 .unwrap()
2730 .is_none()
2731 );
2732 let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
2734 assert!(
2735 table_metadata_manager
2736 .topic_region_manager
2737 .get(topic_3_key_old)
2738 .await
2739 .unwrap()
2740 .is_none()
2741 );
2742 let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
2744 assert!(
2745 table_metadata_manager
2746 .topic_region_manager
2747 .get(topic_3_key_new)
2748 .await
2749 .unwrap()
2750 .is_some()
2751 );
2752 assert!(
2754 table_metadata_manager
2755 .topic_region_manager
2756 .get(topic_1_key)
2757 .await
2758 .unwrap()
2759 .is_some()
2760 );
2761 }
2762
2763 #[tokio::test]
2764 async fn test_destroy_table_metadata() {
2765 let mem_kv = Arc::new(MemoryKvBackend::default());
2766 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2767 let table_id = 1025;
2768 let table_name = "foo";
2769 let task = test_create_table_task(table_name, table_id);
2770 let options = create_mixed_region_wal_options();
2771 let serialized_options = options
2772 .iter()
2773 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2774 .collect::<HashMap<_, _>>();
2775 table_metadata_manager
2776 .create_table_metadata(
2777 task.table_info,
2778 TableRouteValue::physical(vec![
2779 RegionRoute {
2780 region: Region::new_test(RegionId::new(table_id, 1)),
2781 leader_peer: Some(Peer::empty(1)),
2782 follower_peers: vec![Peer::empty(5)],
2783 leader_state: None,
2784 leader_down_since: None,
2785 write_route_policy: None,
2786 },
2787 RegionRoute {
2788 region: Region::new_test(RegionId::new(table_id, 2)),
2789 leader_peer: Some(Peer::empty(2)),
2790 follower_peers: vec![Peer::empty(4)],
2791 leader_state: None,
2792 leader_down_since: None,
2793 write_route_policy: None,
2794 },
2795 RegionRoute {
2796 region: Region::new_test(RegionId::new(table_id, 3)),
2797 leader_peer: Some(Peer::empty(3)),
2798 follower_peers: vec![],
2799 leader_state: None,
2800 leader_down_since: None,
2801 write_route_policy: None,
2802 },
2803 ]),
2804 serialized_options,
2805 )
2806 .await
2807 .unwrap();
2808 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2809 let table_route_value = table_metadata_manager
2810 .table_route_manager
2811 .table_route_storage()
2812 .get_with_raw_bytes(table_id)
2813 .await
2814 .unwrap()
2815 .unwrap();
2816 table_metadata_manager
2817 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2818 .await
2819 .unwrap();
2820 assert!(mem_kv.is_empty());
2821 }
2822
2823 #[tokio::test]
2824 async fn test_restore_table_metadata() {
2825 let mem_kv = Arc::new(MemoryKvBackend::default());
2826 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2827 let table_id = 1025;
2828 let table_name = "foo";
2829 let task = test_create_table_task(table_name, table_id);
2830 let options = create_mixed_region_wal_options();
2831 let serialized_options = options
2832 .iter()
2833 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2834 .collect::<HashMap<_, _>>();
2835 table_metadata_manager
2836 .create_table_metadata(
2837 task.table_info,
2838 TableRouteValue::physical(vec![
2839 RegionRoute {
2840 region: Region::new_test(RegionId::new(table_id, 1)),
2841 leader_peer: Some(Peer::empty(1)),
2842 follower_peers: vec![Peer::empty(5)],
2843 leader_state: None,
2844 leader_down_since: None,
2845 write_route_policy: None,
2846 },
2847 RegionRoute {
2848 region: Region::new_test(RegionId::new(table_id, 2)),
2849 leader_peer: Some(Peer::empty(2)),
2850 follower_peers: vec![Peer::empty(4)],
2851 leader_state: None,
2852 leader_down_since: None,
2853 write_route_policy: None,
2854 },
2855 RegionRoute {
2856 region: Region::new_test(RegionId::new(table_id, 3)),
2857 leader_peer: Some(Peer::empty(3)),
2858 follower_peers: vec![],
2859 leader_state: None,
2860 leader_down_since: None,
2861 write_route_policy: None,
2862 },
2863 ]),
2864 serialized_options,
2865 )
2866 .await
2867 .unwrap();
2868 let expected_result = mem_kv.dump();
2869 let table_route_value = table_metadata_manager
2870 .table_route_manager
2871 .table_route_storage()
2872 .get_with_raw_bytes(table_id)
2873 .await
2874 .unwrap()
2875 .unwrap();
2876 let region_routes = table_route_value.region_routes().unwrap();
2877 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2878 let table_route_value = TableRouteValue::physical(region_routes.clone());
2879 table_metadata_manager
2880 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2881 .await
2882 .unwrap();
2883 table_metadata_manager
2884 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2885 .await
2886 .unwrap();
2887 let kvs = mem_kv.dump();
2888 assert_eq!(kvs, expected_result);
2889 table_metadata_manager
2891 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2892 .await
2893 .unwrap();
2894 let kvs = mem_kv.dump();
2895 assert_eq!(kvs, expected_result);
2896 }
2897
2898 #[tokio::test]
2899 async fn test_dropped_table_metadata_enumeration_and_lookup() {
2900 let mem_kv = Arc::new(MemoryKvBackend::default());
2901 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2902 let table_id = 1025;
2903 let table_name = "foo";
2904 let task = test_create_table_task(table_name, table_id);
2905 let table_info = task.table_info.clone();
2906 let options = create_mixed_region_wal_options();
2907 let serialized_options = options
2908 .iter()
2909 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2910 .collect::<HashMap<_, _>>();
2911 table_metadata_manager
2912 .create_table_metadata(
2913 table_info.clone(),
2914 TableRouteValue::physical(vec![
2915 RegionRoute {
2916 region: Region::new_test(RegionId::new(table_id, 1)),
2917 leader_peer: Some(Peer::empty(1)),
2918 follower_peers: vec![Peer::empty(5)],
2919 leader_state: None,
2920 leader_down_since: None,
2921 write_route_policy: None,
2922 },
2923 RegionRoute {
2924 region: Region::new_test(RegionId::new(table_id, 2)),
2925 leader_peer: Some(Peer::empty(2)),
2926 follower_peers: vec![Peer::empty(4)],
2927 leader_state: None,
2928 leader_down_since: None,
2929 write_route_policy: None,
2930 },
2931 RegionRoute {
2932 region: Region::new_test(RegionId::new(table_id, 3)),
2933 leader_peer: Some(Peer::empty(3)),
2934 follower_peers: vec![],
2935 leader_state: None,
2936 leader_down_since: None,
2937 write_route_policy: None,
2938 },
2939 ]),
2940 serialized_options,
2941 )
2942 .await
2943 .unwrap();
2944 let table_route_value = table_metadata_manager
2945 .table_route_manager
2946 .table_route_storage()
2947 .get_with_raw_bytes(table_id)
2948 .await
2949 .unwrap()
2950 .unwrap();
2951 let region_routes = table_route_value.region_routes().unwrap();
2952 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2953 let table_route_value = TableRouteValue::physical(region_routes.clone());
2954
2955 table_metadata_manager
2956 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2957 .await
2958 .unwrap();
2959
2960 let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
2961 assert_eq!(dropped_tables.len(), 1);
2962 assert_eq!(dropped_tables[0].table_id, table_id);
2963 assert_eq!(dropped_tables[0].table_name, table_name);
2964
2965 let dropped_table = table_metadata_manager
2966 .get_dropped_table(&table_name)
2967 .await
2968 .unwrap()
2969 .unwrap();
2970 assert_eq!(dropped_table.table_id, table_id);
2971 assert_eq!(dropped_table.table_name, table_name);
2972 assert_eq!(dropped_table.table_info_value.table_info, table_info);
2973 assert_eq!(
2974 dropped_table.table_route_value.region_routes().unwrap(),
2975 region_routes
2976 );
2977 assert_eq!(dropped_table.region_wal_options, options);
2978
2979 let dropped_table_by_id = table_metadata_manager
2980 .get_dropped_table_by_id(table_id)
2981 .await
2982 .unwrap()
2983 .unwrap();
2984 assert_eq!(dropped_table_by_id.table_id, table_id);
2985 assert_eq!(dropped_table_by_id.table_name, table_name);
2986 assert_eq!(dropped_table_by_id.table_info_value.table_info, table_info);
2987 assert_eq!(
2988 dropped_table_by_id
2989 .table_route_value
2990 .region_routes()
2991 .unwrap(),
2992 region_routes
2993 );
2994 assert_eq!(dropped_table_by_id.region_wal_options, options);
2995 }
2996
2997 #[tokio::test]
2998 async fn test_dropped_table_lookup_survives_live_name_recreation() {
2999 let mem_kv = Arc::new(MemoryKvBackend::default());
3000 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3001 let dropped_table_id = 1025;
3002 let recreated_table_id = 1026;
3003 let table_name = "foo";
3004 let dropped_task = test_create_table_task(table_name, dropped_table_id);
3005 let dropped_table_info = dropped_task.table_info.clone();
3006 let options = create_mock_region_wal_options();
3007 let serialized_options = options
3008 .iter()
3009 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
3010 .collect::<HashMap<_, _>>();
3011 table_metadata_manager
3012 .create_table_metadata(
3013 dropped_table_info.clone(),
3014 TableRouteValue::physical(vec![
3015 RegionRoute {
3016 region: Region::new_test(RegionId::new(dropped_table_id, 1)),
3017 leader_peer: Some(Peer::empty(1)),
3018 follower_peers: vec![Peer::empty(5)],
3019 leader_state: None,
3020 leader_down_since: None,
3021 write_route_policy: None,
3022 },
3023 RegionRoute {
3024 region: Region::new_test(RegionId::new(dropped_table_id, 2)),
3025 leader_peer: Some(Peer::empty(2)),
3026 follower_peers: vec![Peer::empty(4)],
3027 leader_state: None,
3028 leader_down_since: None,
3029 write_route_policy: None,
3030 },
3031 ]),
3032 serialized_options.clone(),
3033 )
3034 .await
3035 .unwrap();
3036
3037 let dropped_table_name =
3038 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3039 let dropped_table_route = table_metadata_manager
3040 .table_route_manager
3041 .table_route_storage()
3042 .get_with_raw_bytes(dropped_table_id)
3043 .await
3044 .unwrap()
3045 .unwrap();
3046 let dropped_table_route =
3047 TableRouteValue::physical(dropped_table_route.region_routes().unwrap().clone());
3048 table_metadata_manager
3049 .delete_table_metadata(
3050 dropped_table_id,
3051 &dropped_table_name,
3052 &dropped_table_route,
3053 &options,
3054 )
3055 .await
3056 .unwrap();
3057
3058 let recreated_task = test_create_table_task(table_name, recreated_table_id);
3059 table_metadata_manager
3060 .create_table_metadata(
3061 recreated_task.table_info,
3062 TableRouteValue::physical(vec![RegionRoute {
3063 region: Region::new_test(RegionId::new(recreated_table_id, 1)),
3064 leader_peer: Some(Peer::empty(4)),
3065 follower_peers: vec![],
3066 leader_state: None,
3067 leader_down_since: None,
3068 write_route_policy: None,
3069 }]),
3070 serialized_options,
3071 )
3072 .await
3073 .unwrap();
3074
3075 assert_eq!(
3076 table_metadata_manager
3077 .table_name_manager()
3078 .get(TableNameKey::from(&dropped_table_name))
3079 .await
3080 .unwrap()
3081 .unwrap()
3082 .table_id(),
3083 recreated_table_id
3084 );
3085
3086 let dropped_table = table_metadata_manager
3087 .get_dropped_table(&dropped_table_name)
3088 .await
3089 .unwrap()
3090 .unwrap();
3091 assert_eq!(dropped_table.table_id, dropped_table_id);
3092 assert_eq!(dropped_table.table_name, dropped_table_name);
3093 assert_eq!(
3094 dropped_table.table_info_value.table_info,
3095 dropped_table_info
3096 );
3097
3098 let dropped_tables = table_metadata_manager.list_dropped_tables().await.unwrap();
3099 assert_eq!(dropped_tables.len(), 1);
3100 assert_eq!(dropped_tables[0].table_id, dropped_table_id);
3101 assert_eq!(dropped_tables[0].table_name, dropped_table_name);
3102 }
3103
3104 #[tokio::test]
3105 async fn test_dropped_table_lookup_ignores_unrelated_malformed_datanode_tombstones() {
3106 let mem_kv = Arc::new(MemoryKvBackend::default());
3107 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
3108 let table_id = 1025;
3109 let table_name = "foo";
3110 let task = test_create_table_task(table_name, table_id);
3111 let table_info = task.table_info.clone();
3112 let options = create_mixed_region_wal_options();
3113 let serialized_options = options
3114 .iter()
3115 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
3116 .collect::<HashMap<_, _>>();
3117 table_metadata_manager
3118 .create_table_metadata(
3119 table_info.clone(),
3120 TableRouteValue::physical(vec![
3121 RegionRoute {
3122 region: Region::new_test(RegionId::new(table_id, 1)),
3123 leader_peer: Some(Peer::empty(1)),
3124 follower_peers: vec![Peer::empty(5)],
3125 leader_state: None,
3126 leader_down_since: None,
3127 write_route_policy: None,
3128 },
3129 RegionRoute {
3130 region: Region::new_test(RegionId::new(table_id, 2)),
3131 leader_peer: Some(Peer::empty(2)),
3132 follower_peers: vec![Peer::empty(4)],
3133 leader_state: None,
3134 leader_down_since: None,
3135 write_route_policy: None,
3136 },
3137 ]),
3138 serialized_options,
3139 )
3140 .await
3141 .unwrap();
3142
3143 let table_route_value = table_metadata_manager
3144 .table_route_manager
3145 .table_route_storage()
3146 .get_with_raw_bytes(table_id)
3147 .await
3148 .unwrap()
3149 .unwrap();
3150 let region_routes = table_route_value.region_routes().unwrap();
3151 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
3152 let table_route_value = TableRouteValue::physical(region_routes.clone());
3153
3154 table_metadata_manager
3155 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
3156 .await
3157 .unwrap();
3158
3159 mem_kv
3160 .put(
3161 PutRequest::new()
3162 .with_key("__tombstone/__dn_table/not-a-datanode-table-key")
3163 .with_value("malformed"),
3164 )
3165 .await
3166 .unwrap();
3167
3168 let dropped_table = table_metadata_manager
3169 .get_dropped_table(&table_name)
3170 .await
3171 .unwrap()
3172 .unwrap();
3173 assert_eq!(dropped_table.table_id, table_id);
3174 assert_eq!(dropped_table.table_name, table_name);
3175 assert_eq!(dropped_table.table_info_value.table_info, table_info);
3176 assert_eq!(dropped_table.region_wal_options, options);
3177 }
3178
3179 #[tokio::test]
3180 async fn test_create_update_view_info() {
3181 let mem_kv = Arc::new(MemoryKvBackend::default());
3182 let table_metadata_manager = TableMetadataManager::new(mem_kv);
3183
3184 let view_info = new_test_table_info();
3185
3186 let view_id = view_info.ident.table_id;
3187
3188 let logical_plan: Vec<u8> = vec![1, 2, 3];
3189 let columns = vec!["a".to_string()];
3190 let plan_columns = vec!["number".to_string()];
3191 let table_names = new_test_table_names();
3192 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
3193
3194 table_metadata_manager
3196 .create_view_metadata(
3197 view_info.clone(),
3198 logical_plan.clone(),
3199 table_names.clone(),
3200 columns.clone(),
3201 plan_columns.clone(),
3202 definition.to_string(),
3203 )
3204 .await
3205 .unwrap();
3206
3207 {
3208 let current_view_info = table_metadata_manager
3210 .view_info_manager()
3211 .get(view_id)
3212 .await
3213 .unwrap()
3214 .unwrap()
3215 .into_inner();
3216 assert_eq!(current_view_info.view_info, logical_plan);
3217 assert_eq!(current_view_info.table_names, table_names);
3218 assert_eq!(current_view_info.definition, definition);
3219 assert_eq!(current_view_info.columns, columns);
3220 assert_eq!(current_view_info.plan_columns, plan_columns);
3221 let current_table_info = table_metadata_manager
3223 .table_info_manager()
3224 .get(view_id)
3225 .await
3226 .unwrap()
3227 .unwrap()
3228 .into_inner();
3229 assert_eq!(current_table_info.table_info, view_info);
3230 }
3231
3232 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
3233 let new_table_names = {
3234 let mut set = HashSet::new();
3235 set.insert(TableName {
3236 catalog_name: "greptime".to_string(),
3237 schema_name: "public".to_string(),
3238 table_name: "b_table".to_string(),
3239 });
3240 set.insert(TableName {
3241 catalog_name: "greptime".to_string(),
3242 schema_name: "public".to_string(),
3243 table_name: "c_table".to_string(),
3244 });
3245 set
3246 };
3247 let new_columns = vec!["b".to_string()];
3248 let new_plan_columns = vec!["number2".to_string()];
3249 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
3250
3251 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
3252 logical_plan.clone().into(),
3253 table_names,
3254 columns,
3255 plan_columns,
3256 definition.to_string(),
3257 ));
3258 table_metadata_manager
3260 .update_view_info(
3261 view_id,
3262 ¤t_view_info_value,
3263 new_logical_plan.clone(),
3264 new_table_names.clone(),
3265 new_columns.clone(),
3266 new_plan_columns.clone(),
3267 new_definition.to_string(),
3268 )
3269 .await
3270 .unwrap();
3271 table_metadata_manager
3273 .update_view_info(
3274 view_id,
3275 ¤t_view_info_value,
3276 new_logical_plan.clone(),
3277 new_table_names.clone(),
3278 new_columns.clone(),
3279 new_plan_columns.clone(),
3280 new_definition.to_string(),
3281 )
3282 .await
3283 .unwrap();
3284
3285 let updated_view_info = table_metadata_manager
3287 .view_info_manager()
3288 .get(view_id)
3289 .await
3290 .unwrap()
3291 .unwrap()
3292 .into_inner();
3293 assert_eq!(updated_view_info.view_info, new_logical_plan);
3294 assert_eq!(updated_view_info.table_names, new_table_names);
3295 assert_eq!(updated_view_info.definition, new_definition);
3296 assert_eq!(updated_view_info.columns, new_columns);
3297 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
3298
3299 let wrong_view_info = logical_plan.clone();
3300 let wrong_definition = "wrong_definition";
3301 let wrong_view_info_value =
3302 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
3303 wrong_view_info.into(),
3304 new_table_names.clone(),
3305 new_columns.clone(),
3306 new_plan_columns.clone(),
3307 wrong_definition.to_string(),
3308 ));
3309 assert!(
3312 table_metadata_manager
3313 .update_view_info(
3314 view_id,
3315 &wrong_view_info_value,
3316 new_logical_plan.clone(),
3317 new_table_names.clone(),
3318 vec!["c".to_string()],
3319 vec!["number3".to_string()],
3320 wrong_definition.to_string(),
3321 )
3322 .await
3323 .is_err()
3324 );
3325
3326 let current_view_info = table_metadata_manager
3328 .view_info_manager()
3329 .get(view_id)
3330 .await
3331 .unwrap()
3332 .unwrap()
3333 .into_inner();
3334 assert_eq!(current_view_info.view_info, new_logical_plan);
3335 assert_eq!(current_view_info.table_names, new_table_names);
3336 assert_eq!(current_view_info.definition, new_definition);
3337 assert_eq!(current_view_info.columns, new_columns);
3338 assert_eq!(current_view_info.plan_columns, new_plan_columns);
3339 }
3340
3341 #[test]
3342 fn test_region_role_set_deserialize() {
3343 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
3344 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3345 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3346 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
3347
3348 let s = r#"[1, 2, 3]"#;
3349 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
3350 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
3351 assert!(region_role_set.follower_regions.is_empty());
3352 }
3353
3354 #[test]
3355 fn test_region_distribution_deserialize() {
3356 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
3357 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
3358 assert_eq!(region_distribution.len(), 2);
3359 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
3360 assert!(region_distribution[&1].follower_regions.is_empty());
3361 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
3362 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
3363 }
3364}