1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{OptionExt, ResultExt, ensure};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::DatanodeId;
155use crate::error::{self, Result, SerdeJsonSnafu};
156use crate::key::flow::flow_state::FlowStateValue;
157use crate::key::node_address::NodeAddressValue;
158use crate::key::table_route::TableRouteKey;
159use crate::key::topic_region::TopicRegionValue;
160use crate::key::txn_helper::TxnOpGetResponseSet;
161use crate::kv_backend::KvBackendRef;
162use crate::kv_backend::txn::{Txn, TxnOp};
163use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
164use crate::rpc::store::BatchDeleteRequest;
165use crate::state_store::PoisonValue;
166
167pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
168pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
169pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
170pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
171pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
172pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
173
174pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
175pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
176pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
177pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
178pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
179pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
180pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
181pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
182pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
183pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
185pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
186
187pub const ELECTION_KEY: &str = "__metasrv_election";
189pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
191
192pub const CACHE_KEY_PREFIXES: [&str; 5] = [
194 TABLE_NAME_KEY_PREFIX,
195 CATALOG_NAME_KEY_PREFIX,
196 SCHEMA_NAME_KEY_PREFIX,
197 TABLE_ROUTE_PREFIX,
198 NODE_ADDRESS_PREFIX,
199];
200
201#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
203pub struct RegionRoleSet {
204 pub leader_regions: Vec<RegionNumber>,
206 pub follower_regions: Vec<RegionNumber>,
208}
209
210impl<'de> Deserialize<'de> for RegionRoleSet {
211 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
212 where
213 D: serde::Deserializer<'de>,
214 {
215 #[derive(Deserialize)]
216 #[serde(untagged)]
217 enum RegionRoleSetOrLeaderOnly {
218 Full {
219 leader_regions: Vec<RegionNumber>,
220 follower_regions: Vec<RegionNumber>,
221 },
222 LeaderOnly(Vec<RegionNumber>),
223 }
224 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
225 RegionRoleSetOrLeaderOnly::Full {
226 leader_regions,
227 follower_regions,
228 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
229 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
230 Ok(RegionRoleSet::new(leader_regions, vec![]))
231 }
232 }
233 }
234}
235
236impl RegionRoleSet {
237 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
239 Self {
240 leader_regions,
241 follower_regions,
242 }
243 }
244
245 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
247 self.leader_regions.push(region_number);
248 }
249
250 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
252 self.follower_regions.push(region_number);
253 }
254
255 pub fn sort(&mut self) {
257 self.follower_regions.sort();
258 self.leader_regions.sort();
259 }
260}
261
262pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
266
267pub type FlowId = u32;
269pub type FlowPartitionId = u32;
271
272lazy_static! {
273 pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
274}
275
276lazy_static! {
277 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
278}
279
280lazy_static! {
281 static ref TABLE_INFO_KEY_PATTERN: Regex =
282 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
283}
284
285lazy_static! {
286 static ref VIEW_INFO_KEY_PATTERN: Regex =
287 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
288}
289
290lazy_static! {
291 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
292 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
293}
294
295lazy_static! {
296 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
297 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
298}
299
300lazy_static! {
301 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
302 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
303 ))
304 .unwrap();
305}
306
307lazy_static! {
308 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
310 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
311 ))
312 .unwrap();
313}
314
315lazy_static! {
316 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
318 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
319 ))
320 .unwrap();
321}
322
323lazy_static! {
324 static ref NODE_ADDRESS_PATTERN: Regex =
325 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
326}
327
328lazy_static! {
329 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
330 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
331}
332
333lazy_static! {
334 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
335 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
336 ))
337 .unwrap();
338}
339
340pub trait MetadataKey<'a, T> {
342 fn to_bytes(&self) -> Vec<u8>;
343
344 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
345}
346
347#[derive(Debug, Clone, PartialEq)]
348pub struct BytesAdapter(Vec<u8>);
349
350impl From<Vec<u8>> for BytesAdapter {
351 fn from(value: Vec<u8>) -> Self {
352 Self(value)
353 }
354}
355
356impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
357 fn to_bytes(&self) -> Vec<u8> {
358 self.0.clone()
359 }
360
361 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
362 Ok(BytesAdapter(bytes.to_vec()))
363 }
364}
365
366pub(crate) trait MetadataKeyGetTxnOp {
367 fn build_get_op(
368 &self,
369 ) -> (
370 TxnOp,
371 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
372 );
373}
374
375pub trait MetadataValue {
376 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
377 where
378 Self: Sized;
379
380 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
381}
382
383pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
384
385pub struct TableMetadataManager {
386 table_name_manager: TableNameManager,
387 table_info_manager: TableInfoManager,
388 view_info_manager: ViewInfoManager,
389 datanode_table_manager: DatanodeTableManager,
390 catalog_manager: CatalogManager,
391 schema_manager: SchemaManager,
392 table_route_manager: TableRouteManager,
393 tombstone_manager: TombstoneManager,
394 topic_name_manager: TopicNameManager,
395 topic_region_manager: TopicRegionManager,
396 kv_backend: KvBackendRef,
397}
398
399#[macro_export]
400macro_rules! ensure_values {
401 ($got:expr, $expected_value:expr, $name:expr) => {
402 ensure!(
403 $got == $expected_value,
404 error::UnexpectedSnafu {
405 err_msg: format!(
406 "Reads the different value: {:?} during {}, expected: {:?}",
407 $got, $name, $expected_value
408 )
409 }
410 );
411 };
412}
413
414pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
424 bytes: Bytes,
426 inner: T,
428}
429
430impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
431 type Target = T;
432
433 fn deref(&self) -> &Self::Target {
434 &self.inner
435 }
436}
437
438impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
439 fn deref_mut(&mut self) -> &mut Self::Target {
440 &mut self.inner
441 }
442}
443
444impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446 write!(
447 f,
448 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
449 self.inner, self.bytes
450 )
451 }
452}
453
454impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
455 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
459 where
460 S: serde::Serializer,
461 {
462 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
465 }
466}
467
468impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
469 for DeserializedValueWithBytes<T>
470{
471 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
475 where
476 D: serde::Deserializer<'de>,
477 {
478 let buf = String::deserialize(deserializer)?;
479 let bytes = Bytes::from(buf);
480
481 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
482 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
483
484 Ok(value)
485 }
486}
487
488impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
489 fn clone(&self) -> Self {
490 Self {
491 bytes: self.bytes.clone(),
492 inner: self.inner.clone(),
493 }
494 }
495}
496
497impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
498 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
501 let inner = T::try_from_raw_value(&bytes)?;
502 Ok(Self { bytes, inner })
503 }
504
505 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
508 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
509 }
510
511 pub fn into_inner(self) -> T {
512 self.inner
513 }
514
515 pub fn get_inner_ref(&self) -> &T {
516 &self.inner
517 }
518
519 pub fn get_raw_bytes(&self) -> Vec<u8> {
521 self.bytes.to_vec()
522 }
523
524 #[cfg(any(test, feature = "testing"))]
525 pub fn from_inner(inner: T) -> Self {
526 let bytes = serde_json::to_vec(&inner).unwrap();
527
528 Self {
529 bytes: Bytes::from(bytes),
530 inner,
531 }
532 }
533}
534
535impl TableMetadataManager {
536 pub fn new(kv_backend: KvBackendRef) -> Self {
537 TableMetadataManager {
538 table_name_manager: TableNameManager::new(kv_backend.clone()),
539 table_info_manager: TableInfoManager::new(kv_backend.clone()),
540 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
541 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
542 catalog_manager: CatalogManager::new(kv_backend.clone()),
543 schema_manager: SchemaManager::new(kv_backend.clone()),
544 table_route_manager: TableRouteManager::new(kv_backend.clone()),
545 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
546 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
547 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
548 kv_backend,
549 }
550 }
551
552 pub fn new_with_custom_tombstone_prefix(
554 kv_backend: KvBackendRef,
555 tombstone_prefix: &str,
556 ) -> Self {
557 Self {
558 table_name_manager: TableNameManager::new(kv_backend.clone()),
559 table_info_manager: TableInfoManager::new(kv_backend.clone()),
560 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
561 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
562 catalog_manager: CatalogManager::new(kv_backend.clone()),
563 schema_manager: SchemaManager::new(kv_backend.clone()),
564 table_route_manager: TableRouteManager::new(kv_backend.clone()),
565 tombstone_manager: TombstoneManager::new_with_prefix(
566 kv_backend.clone(),
567 tombstone_prefix,
568 ),
569 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
570 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
571 kv_backend,
572 }
573 }
574
575 pub async fn init(&self) -> Result<()> {
576 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
577
578 self.catalog_manager().create(catalog_name, true).await?;
579
580 let internal_schemas = [
581 DEFAULT_SCHEMA_NAME,
582 INFORMATION_SCHEMA_NAME,
583 DEFAULT_PRIVATE_SCHEMA_NAME,
584 ];
585
586 for schema_name in internal_schemas {
587 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
588
589 self.schema_manager().create(schema_key, None, true).await?;
590 }
591
592 Ok(())
593 }
594
595 pub fn table_name_manager(&self) -> &TableNameManager {
596 &self.table_name_manager
597 }
598
599 pub fn table_info_manager(&self) -> &TableInfoManager {
600 &self.table_info_manager
601 }
602
603 pub fn view_info_manager(&self) -> &ViewInfoManager {
604 &self.view_info_manager
605 }
606
607 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
608 &self.datanode_table_manager
609 }
610
611 pub fn catalog_manager(&self) -> &CatalogManager {
612 &self.catalog_manager
613 }
614
615 pub fn schema_manager(&self) -> &SchemaManager {
616 &self.schema_manager
617 }
618
619 pub fn table_route_manager(&self) -> &TableRouteManager {
620 &self.table_route_manager
621 }
622
623 pub fn topic_name_manager(&self) -> &TopicNameManager {
624 &self.topic_name_manager
625 }
626
627 pub fn topic_region_manager(&self) -> &TopicRegionManager {
628 &self.topic_region_manager
629 }
630
631 pub fn kv_backend(&self) -> &KvBackendRef {
632 &self.kv_backend
633 }
634
635 pub async fn get_full_table_info(
636 &self,
637 table_id: TableId,
638 ) -> Result<(
639 Option<DeserializedValueWithBytes<TableInfoValue>>,
640 Option<DeserializedValueWithBytes<TableRouteValue>>,
641 )> {
642 let table_info_key = TableInfoKey::new(table_id);
643 let table_route_key = TableRouteKey::new(table_id);
644 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
645 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
646
647 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
648 let mut res = self.kv_backend.txn(txn).await?;
649 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
650 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
651 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
652 Ok((table_info_value, table_route_value))
653 }
654
655 pub async fn create_view_metadata(
665 &self,
666 view_info: RawTableInfo,
667 raw_logical_plan: Vec<u8>,
668 table_names: HashSet<TableName>,
669 columns: Vec<String>,
670 plan_columns: Vec<String>,
671 definition: String,
672 ) -> Result<()> {
673 let view_id = view_info.ident.table_id;
674
675 let view_name = TableNameKey::new(
677 &view_info.catalog_name,
678 &view_info.schema_name,
679 &view_info.name,
680 );
681 let create_table_name_txn = self
682 .table_name_manager()
683 .build_create_txn(&view_name, view_id)?;
684
685 let table_info_value = TableInfoValue::new(view_info);
687
688 let (create_table_info_txn, on_create_table_info_failure) = self
689 .table_info_manager()
690 .build_create_txn(view_id, &table_info_value)?;
691
692 let view_info_value = ViewInfoValue::new(
694 raw_logical_plan,
695 table_names,
696 columns,
697 plan_columns,
698 definition,
699 );
700 let (create_view_info_txn, on_create_view_info_failure) = self
701 .view_info_manager()
702 .build_create_txn(view_id, &view_info_value)?;
703
704 let txn = Txn::merge_all(vec![
705 create_table_name_txn,
706 create_table_info_txn,
707 create_view_info_txn,
708 ]);
709
710 let mut r = self.kv_backend.txn(txn).await?;
711
712 if !r.succeeded {
714 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
715 let remote_table_info = on_create_table_info_failure(&mut set)?
716 .context(error::UnexpectedSnafu {
717 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
718 })?
719 .into_inner();
720
721 let remote_view_info = on_create_view_info_failure(&mut set)?
722 .context(error::UnexpectedSnafu {
723 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
724 })?
725 .into_inner();
726
727 let op_name = "the creating view metadata";
728 ensure_values!(remote_table_info, table_info_value, op_name);
729 ensure_values!(remote_view_info, view_info_value, op_name);
730 }
731
732 Ok(())
733 }
734
735 pub async fn create_table_metadata(
738 &self,
739 mut table_info: RawTableInfo,
740 table_route_value: TableRouteValue,
741 region_wal_options: HashMap<RegionNumber, String>,
742 ) -> Result<()> {
743 let region_numbers = table_route_value.region_numbers();
744 table_info.meta.region_numbers = region_numbers;
745 let table_id = table_info.ident.table_id;
746 let engine = table_info.meta.engine.clone();
747
748 let table_name = TableNameKey::new(
750 &table_info.catalog_name,
751 &table_info.schema_name,
752 &table_info.name,
753 );
754 let create_table_name_txn = self
755 .table_name_manager()
756 .build_create_txn(&table_name, table_id)?;
757
758 let region_options = table_info.to_region_options();
759 let table_info_value = TableInfoValue::new(table_info);
761 let (create_table_info_txn, on_create_table_info_failure) = self
762 .table_info_manager()
763 .build_create_txn(table_id, &table_info_value)?;
764
765 let (create_table_route_txn, on_create_table_route_failure) = self
766 .table_route_manager()
767 .table_route_storage()
768 .build_create_txn(table_id, &table_route_value)?;
769
770 let create_topic_region_txn = self
771 .topic_region_manager
772 .build_create_txn(table_id, ®ion_wal_options)?;
773
774 let mut txn = Txn::merge_all(vec![
775 create_table_name_txn,
776 create_table_info_txn,
777 create_table_route_txn,
778 create_topic_region_txn,
779 ]);
780
781 if let TableRouteValue::Physical(x) = &table_route_value {
782 let region_storage_path = table_info_value.region_storage_path();
783 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
784 table_id,
785 &engine,
786 ®ion_storage_path,
787 region_options,
788 region_wal_options,
789 region_distribution(&x.region_routes),
790 )?;
791 txn = txn.merge(create_datanode_table_txn);
792 }
793
794 let mut r = self.kv_backend.txn(txn).await?;
795
796 if !r.succeeded {
798 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
799 let remote_table_info = on_create_table_info_failure(&mut set)?
800 .context(error::UnexpectedSnafu {
801 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
802 })?
803 .into_inner();
804
805 let remote_table_route = on_create_table_route_failure(&mut set)?
806 .context(error::UnexpectedSnafu {
807 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
808 })?
809 .into_inner();
810
811 let op_name = "the creating table metadata";
812 ensure_values!(remote_table_info, table_info_value, op_name);
813 ensure_values!(remote_table_route, table_route_value, op_name);
814 }
815
816 Ok(())
817 }
818
819 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
820 self.kv_backend.max_txn_ops() / 3
823 }
824
825 pub async fn create_logical_tables_metadata(
827 &self,
828 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
829 ) -> Result<()> {
830 let len = tables_data.len();
831 let mut txns = Vec::with_capacity(3 * len);
832 struct OnFailure<F1, R1, F2, R2>
833 where
834 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
835 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
836 {
837 table_info_value: TableInfoValue,
838 on_create_table_info_failure: F1,
839 table_route_value: TableRouteValue,
840 on_create_table_route_failure: F2,
841 }
842 let mut on_failures = Vec::with_capacity(len);
843 for (mut table_info, table_route_value) in tables_data {
844 table_info.meta.region_numbers = table_route_value.region_numbers();
845 let table_id = table_info.ident.table_id;
846
847 let table_name = TableNameKey::new(
849 &table_info.catalog_name,
850 &table_info.schema_name,
851 &table_info.name,
852 );
853 let create_table_name_txn = self
854 .table_name_manager()
855 .build_create_txn(&table_name, table_id)?;
856 txns.push(create_table_name_txn);
857
858 let table_info_value = TableInfoValue::new(table_info);
860 let (create_table_info_txn, on_create_table_info_failure) =
861 self.table_info_manager()
862 .build_create_txn(table_id, &table_info_value)?;
863 txns.push(create_table_info_txn);
864
865 let (create_table_route_txn, on_create_table_route_failure) = self
866 .table_route_manager()
867 .table_route_storage()
868 .build_create_txn(table_id, &table_route_value)?;
869 txns.push(create_table_route_txn);
870
871 on_failures.push(OnFailure {
872 table_info_value,
873 on_create_table_info_failure,
874 table_route_value,
875 on_create_table_route_failure,
876 });
877 }
878
879 let txn = Txn::merge_all(txns);
880 let mut r = self.kv_backend.txn(txn).await?;
881
882 if !r.succeeded {
884 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
885 for on_failure in on_failures {
886 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
887 .context(error::UnexpectedSnafu {
888 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
889 })?
890 .into_inner();
891
892 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
893 .context(error::UnexpectedSnafu {
894 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
895 })?
896 .into_inner();
897
898 let op_name = "the creating logical tables metadata";
899 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
900 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
901 }
902 }
903
904 Ok(())
905 }
906
907 fn table_metadata_keys(
908 &self,
909 table_id: TableId,
910 table_name: &TableName,
911 table_route_value: &TableRouteValue,
912 region_wal_options: &HashMap<RegionNumber, WalOptions>,
913 ) -> Result<Vec<Vec<u8>>> {
914 let datanode_ids = if table_route_value.is_physical() {
916 region_distribution(table_route_value.region_routes()?)
917 .into_keys()
918 .collect()
919 } else {
920 vec![]
921 };
922 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
923 let table_name = TableNameKey::new(
924 &table_name.catalog_name,
925 &table_name.schema_name,
926 &table_name.table_name,
927 );
928 let table_info_key = TableInfoKey::new(table_id);
929 let table_route_key = TableRouteKey::new(table_id);
930 let datanode_table_keys = datanode_ids
931 .into_iter()
932 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
933 .collect::<HashSet<_>>();
934 let topic_region_map = self
935 .topic_region_manager
936 .get_topic_region_mapping(table_id, region_wal_options);
937 let topic_region_keys = topic_region_map
938 .iter()
939 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
940 .collect::<Vec<_>>();
941 keys.push(table_name.to_bytes());
942 keys.push(table_info_key.to_bytes());
943 keys.push(table_route_key.to_bytes());
944 for key in &datanode_table_keys {
945 keys.push(key.to_bytes());
946 }
947 for key in topic_region_keys {
948 keys.push(key.to_bytes());
949 }
950 Ok(keys)
951 }
952
953 pub async fn delete_table_metadata(
956 &self,
957 table_id: TableId,
958 table_name: &TableName,
959 table_route_value: &TableRouteValue,
960 region_wal_options: &HashMap<RegionNumber, WalOptions>,
961 ) -> Result<()> {
962 let keys =
963 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
964 self.tombstone_manager.create(keys).await.map(|_| ())
965 }
966
967 pub async fn delete_table_metadata_tombstone(
970 &self,
971 table_id: TableId,
972 table_name: &TableName,
973 table_route_value: &TableRouteValue,
974 region_wal_options: &HashMap<RegionNumber, WalOptions>,
975 ) -> Result<()> {
976 let table_metadata_keys =
977 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
978 self.tombstone_manager
979 .delete(table_metadata_keys)
980 .await
981 .map(|_| ())
982 }
983
984 pub async fn restore_table_metadata(
987 &self,
988 table_id: TableId,
989 table_name: &TableName,
990 table_route_value: &TableRouteValue,
991 region_wal_options: &HashMap<RegionNumber, WalOptions>,
992 ) -> Result<()> {
993 let keys =
994 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
995 self.tombstone_manager.restore(keys).await.map(|_| ())
996 }
997
998 pub async fn destroy_table_metadata(
1001 &self,
1002 table_id: TableId,
1003 table_name: &TableName,
1004 table_route_value: &TableRouteValue,
1005 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1006 ) -> Result<()> {
1007 let keys =
1008 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1009 let _ = self
1010 .kv_backend
1011 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1012 .await?;
1013 Ok(())
1014 }
1015
1016 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1017 let mut keys = Vec::with_capacity(3);
1018 let view_name = TableNameKey::new(
1019 &view_name.catalog_name,
1020 &view_name.schema_name,
1021 &view_name.table_name,
1022 );
1023 let table_info_key = TableInfoKey::new(view_id);
1024 let view_info_key = ViewInfoKey::new(view_id);
1025 keys.push(view_name.to_bytes());
1026 keys.push(table_info_key.to_bytes());
1027 keys.push(view_info_key.to_bytes());
1028
1029 Ok(keys)
1030 }
1031
1032 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1035 let keys = self.view_info_keys(view_id, view_name)?;
1036 let _ = self
1037 .kv_backend
1038 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1039 .await?;
1040 Ok(())
1041 }
1042
1043 pub async fn rename_table(
1047 &self,
1048 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1049 new_table_name: String,
1050 ) -> Result<()> {
1051 let current_table_info = ¤t_table_info_value.table_info;
1052 let table_id = current_table_info.ident.table_id;
1053
1054 let table_name_key = TableNameKey::new(
1055 ¤t_table_info.catalog_name,
1056 ¤t_table_info.schema_name,
1057 ¤t_table_info.name,
1058 );
1059
1060 let new_table_name_key = TableNameKey::new(
1061 ¤t_table_info.catalog_name,
1062 ¤t_table_info.schema_name,
1063 &new_table_name,
1064 );
1065
1066 let update_table_name_txn = self.table_name_manager().build_update_txn(
1068 &table_name_key,
1069 &new_table_name_key,
1070 table_id,
1071 )?;
1072
1073 let new_table_info_value = current_table_info_value
1074 .inner
1075 .with_update(move |table_info| {
1076 table_info.name = new_table_name;
1077 });
1078
1079 let (update_table_info_txn, on_update_table_info_failure) = self
1081 .table_info_manager()
1082 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1083
1084 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1085
1086 let mut r = self.kv_backend.txn(txn).await?;
1087
1088 if !r.succeeded {
1090 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1091 let remote_table_info = on_update_table_info_failure(&mut set)?
1092 .context(error::UnexpectedSnafu {
1093 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1094 })?
1095 .into_inner();
1096
1097 let op_name = "the renaming table metadata";
1098 ensure_values!(remote_table_info, new_table_info_value, op_name);
1099 }
1100
1101 Ok(())
1102 }
1103
1104 pub async fn update_table_info(
1108 &self,
1109 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1110 region_distribution: Option<RegionDistribution>,
1111 new_table_info: RawTableInfo,
1112 ) -> Result<()> {
1113 let table_id = current_table_info_value.table_info.ident.table_id;
1114 let new_table_info_value = current_table_info_value.update(new_table_info);
1115
1116 let (update_table_info_txn, on_update_table_info_failure) = self
1118 .table_info_manager()
1119 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1120
1121 let txn = if let Some(region_distribution) = region_distribution {
1122 let new_region_options = new_table_info_value.table_info.to_region_options();
1124 let update_datanode_table_options_txn = self
1125 .datanode_table_manager
1126 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1127 .await?;
1128 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1129 } else {
1130 update_table_info_txn
1131 };
1132
1133 let mut r = self.kv_backend.txn(txn).await?;
1134 if !r.succeeded {
1136 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1137 let remote_table_info = on_update_table_info_failure(&mut set)?
1138 .context(error::UnexpectedSnafu {
1139 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1140 })?
1141 .into_inner();
1142
1143 let op_name = "the updating table info";
1144 ensure_values!(remote_table_info, new_table_info_value, op_name);
1145 }
1146 Ok(())
1147 }
1148
1149 #[allow(clippy::too_many_arguments)]
1160 pub async fn update_view_info(
1161 &self,
1162 view_id: TableId,
1163 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1164 new_view_info: Vec<u8>,
1165 table_names: HashSet<TableName>,
1166 columns: Vec<String>,
1167 plan_columns: Vec<String>,
1168 definition: String,
1169 ) -> Result<()> {
1170 let new_view_info_value = current_view_info_value.update(
1171 new_view_info,
1172 table_names,
1173 columns,
1174 plan_columns,
1175 definition,
1176 );
1177
1178 let (update_view_info_txn, on_update_view_info_failure) = self
1180 .view_info_manager()
1181 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1182
1183 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1184
1185 if !r.succeeded {
1187 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1188 let remote_view_info = on_update_view_info_failure(&mut set)?
1189 .context(error::UnexpectedSnafu {
1190 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1191 })?
1192 .into_inner();
1193
1194 let op_name = "the updating view info";
1195 ensure_values!(remote_view_info, new_view_info_value, op_name);
1196 }
1197 Ok(())
1198 }
1199
1200 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1201 self.kv_backend.max_txn_ops()
1202 }
1203
1204 pub async fn batch_update_table_info_values(
1205 &self,
1206 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1207 ) -> Result<()> {
1208 let len = table_info_value_pairs.len();
1209 let mut txns = Vec::with_capacity(len);
1210 struct OnFailure<F, R>
1211 where
1212 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1213 {
1214 table_info_value: TableInfoValue,
1215 on_update_table_info_failure: F,
1216 }
1217 let mut on_failures = Vec::with_capacity(len);
1218
1219 for (table_info_value, new_table_info) in table_info_value_pairs {
1220 let table_id = table_info_value.table_info.ident.table_id;
1221
1222 let new_table_info_value = table_info_value.update(new_table_info);
1223
1224 let (update_table_info_txn, on_update_table_info_failure) =
1225 self.table_info_manager().build_update_txn(
1226 table_id,
1227 &table_info_value,
1228 &new_table_info_value,
1229 )?;
1230
1231 txns.push(update_table_info_txn);
1232
1233 on_failures.push(OnFailure {
1234 table_info_value: new_table_info_value,
1235 on_update_table_info_failure,
1236 });
1237 }
1238
1239 let txn = Txn::merge_all(txns);
1240 let mut r = self.kv_backend.txn(txn).await?;
1241
1242 if !r.succeeded {
1243 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1244 for on_failure in on_failures {
1245 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1246 .context(error::UnexpectedSnafu {
1247 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1248 })?
1249 .into_inner();
1250
1251 let op_name = "the batch updating table info";
1252 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1253 }
1254 }
1255
1256 Ok(())
1257 }
1258
1259 pub async fn update_table_route(
1260 &self,
1261 table_id: TableId,
1262 region_info: RegionInfo,
1263 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1264 new_region_routes: Vec<RegionRoute>,
1265 new_region_options: &HashMap<String, String>,
1266 new_region_wal_options: &HashMap<RegionNumber, String>,
1267 ) -> Result<()> {
1268 let current_region_distribution =
1270 region_distribution(current_table_route_value.region_routes()?);
1271 let new_region_distribution = region_distribution(&new_region_routes);
1272
1273 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1274 table_id,
1275 region_info,
1276 current_region_distribution,
1277 new_region_distribution,
1278 new_region_options,
1279 new_region_wal_options,
1280 )?;
1281
1282 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1284
1285 let (update_table_route_txn, on_update_table_route_failure) = self
1286 .table_route_manager()
1287 .table_route_storage()
1288 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1289
1290 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1291
1292 let mut r = self.kv_backend.txn(txn).await?;
1293
1294 if !r.succeeded {
1296 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1297 let remote_table_route = on_update_table_route_failure(&mut set)?
1298 .context(error::UnexpectedSnafu {
1299 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1300 })?
1301 .into_inner();
1302
1303 let op_name = "the updating table route";
1304 ensure_values!(remote_table_route, new_table_route_value, op_name);
1305 }
1306
1307 Ok(())
1308 }
1309
1310 pub async fn update_leader_region_status<F>(
1312 &self,
1313 table_id: TableId,
1314 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1315 next_region_route_status: F,
1316 ) -> Result<()>
1317 where
1318 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1319 {
1320 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1321
1322 let mut updated = 0;
1323 for route in &mut new_region_routes {
1324 if let Some(state) = next_region_route_status(route)
1325 && route.set_leader_state(state)
1326 {
1327 updated += 1;
1328 }
1329 }
1330
1331 if updated == 0 {
1332 warn!("No leader status updated");
1333 return Ok(());
1334 }
1335
1336 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1338
1339 let (update_table_route_txn, on_update_table_route_failure) = self
1340 .table_route_manager()
1341 .table_route_storage()
1342 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1343
1344 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1345
1346 if !r.succeeded {
1348 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1349 let remote_table_route = on_update_table_route_failure(&mut set)?
1350 .context(error::UnexpectedSnafu {
1351 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1352 })?
1353 .into_inner();
1354
1355 let op_name = "the updating leader region status";
1356 ensure_values!(remote_table_route, new_table_route_value, op_name);
1357 }
1358
1359 Ok(())
1360 }
1361}
1362
1363#[macro_export]
1364macro_rules! impl_metadata_value {
1365 ($($val_ty: ty), *) => {
1366 $(
1367 impl $crate::key::MetadataValue for $val_ty {
1368 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1369 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1370 }
1371
1372 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1373 serde_json::to_vec(self).context(SerdeJsonSnafu)
1374 }
1375 }
1376 )*
1377 }
1378}
1379
1380macro_rules! impl_metadata_key_get_txn_op {
1381 ($($key: ty), *) => {
1382 $(
1383 impl $crate::key::MetadataKeyGetTxnOp for $key {
1384 fn build_get_op(
1387 &self,
1388 ) -> (
1389 TxnOp,
1390 impl for<'a> FnMut(
1391 &'a mut TxnOpGetResponseSet,
1392 ) -> Option<Vec<u8>>,
1393 ) {
1394 let raw_key = self.to_bytes();
1395 (
1396 TxnOp::Get(raw_key.clone()),
1397 TxnOpGetResponseSet::filter(raw_key),
1398 )
1399 }
1400 }
1401 )*
1402 }
1403}
1404
1405impl_metadata_key_get_txn_op! {
1406 TableNameKey<'_>,
1407 TableInfoKey,
1408 ViewInfoKey,
1409 TableRouteKey,
1410 DatanodeTableKey
1411}
1412
1413#[macro_export]
1414macro_rules! impl_optional_metadata_value {
1415 ($($val_ty: ty), *) => {
1416 $(
1417 impl $val_ty {
1418 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1419 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1420 }
1421
1422 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1423 serde_json::to_vec(self).context(SerdeJsonSnafu)
1424 }
1425 }
1426 )*
1427 }
1428}
1429
1430impl_metadata_value! {
1431 TableNameValue,
1432 TableInfoValue,
1433 ViewInfoValue,
1434 DatanodeTableValue,
1435 FlowInfoValue,
1436 FlowNameValue,
1437 FlowRouteValue,
1438 TableFlowValue,
1439 NodeAddressValue,
1440 SchemaNameValue,
1441 FlowStateValue,
1442 PoisonValue,
1443 TopicRegionValue
1444}
1445
1446impl_optional_metadata_value! {
1447 CatalogNameValue,
1448 SchemaNameValue
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453 use std::collections::{BTreeMap, HashMap, HashSet};
1454 use std::sync::Arc;
1455
1456 use bytes::Bytes;
1457 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1458 use common_time::util::current_time_millis;
1459 use common_wal::options::{KafkaWalOptions, WalOptions};
1460 use futures::TryStreamExt;
1461 use store_api::storage::{RegionId, RegionNumber};
1462 use table::metadata::{RawTableInfo, TableInfo};
1463 use table::table_name::TableName;
1464
1465 use super::datanode_table::DatanodeTableKey;
1466 use super::test_utils;
1467 use crate::ddl::test_util::create_table::test_create_table_task;
1468 use crate::ddl::utils::region_storage_path;
1469 use crate::error::Result;
1470 use crate::key::datanode_table::RegionInfo;
1471 use crate::key::table_info::TableInfoValue;
1472 use crate::key::table_name::TableNameKey;
1473 use crate::key::table_route::TableRouteValue;
1474 use crate::key::{
1475 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1476 TableMetadataManager, ViewInfoValue,
1477 };
1478 use crate::kv_backend::KvBackend;
1479 use crate::kv_backend::memory::MemoryKvBackend;
1480 use crate::peer::Peer;
1481 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1482 use crate::rpc::store::RangeRequest;
1483 use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
1484
1485 #[test]
1486 fn test_deserialized_value_with_bytes() {
1487 let region_route = new_test_region_route();
1488 let region_routes = vec![region_route.clone()];
1489
1490 let expected_region_routes =
1491 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1492 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1493
1494 let value = DeserializedValueWithBytes {
1497 inner: TableRouteValue::physical(region_routes.clone()),
1499 bytes: Bytes::from(expected.clone()),
1500 };
1501
1502 let encoded = serde_json::to_vec(&value).unwrap();
1503
1504 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1507 serde_json::from_slice(&encoded).unwrap();
1508
1509 assert_eq!(decoded.inner, expected_region_routes);
1510 assert_eq!(decoded.bytes, expected);
1511 }
1512
1513 fn new_test_region_route() -> RegionRoute {
1514 new_region_route(1, 2)
1515 }
1516
1517 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1518 RegionRoute {
1519 region: Region {
1520 id: region_id.into(),
1521 name: "r1".to_string(),
1522 partition: None,
1523 attrs: BTreeMap::new(),
1524 partition_expr: Default::default(),
1525 },
1526 leader_peer: Some(Peer::new(datanode, "a2")),
1527 follower_peers: vec![],
1528 leader_state: None,
1529 leader_down_since: None,
1530 }
1531 }
1532
1533 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1534 test_utils::new_test_table_info(10, region_numbers)
1535 }
1536
1537 fn new_test_table_names() -> HashSet<TableName> {
1538 let mut set = HashSet::new();
1539 set.insert(TableName {
1540 catalog_name: "greptime".to_string(),
1541 schema_name: "public".to_string(),
1542 table_name: "a_table".to_string(),
1543 });
1544 set.insert(TableName {
1545 catalog_name: "greptime".to_string(),
1546 schema_name: "public".to_string(),
1547 table_name: "b_table".to_string(),
1548 });
1549 set
1550 }
1551
1552 async fn create_physical_table_metadata(
1553 table_metadata_manager: &TableMetadataManager,
1554 table_info: RawTableInfo,
1555 region_routes: Vec<RegionRoute>,
1556 region_wal_options: HashMap<RegionNumber, String>,
1557 ) -> Result<()> {
1558 table_metadata_manager
1559 .create_table_metadata(
1560 table_info,
1561 TableRouteValue::physical(region_routes),
1562 region_wal_options,
1563 )
1564 .await
1565 }
1566
1567 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1568 let topics = (0..2)
1569 .map(|i| format!("greptimedb_topic{}", i))
1570 .collect::<Vec<_>>();
1571 let wal_options = topics
1572 .iter()
1573 .map(|topic| {
1574 WalOptions::Kafka(KafkaWalOptions {
1575 topic: topic.clone(),
1576 })
1577 })
1578 .collect::<Vec<_>>();
1579
1580 (0..16)
1581 .enumerate()
1582 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1583 .collect()
1584 }
1585
1586 #[tokio::test]
1587 async fn test_raft_engine_topic_region_map() {
1588 let mem_kv = Arc::new(MemoryKvBackend::default());
1589 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1590 let region_route = new_test_region_route();
1591 let region_routes = &vec![region_route.clone()];
1592 let table_info: RawTableInfo =
1593 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1594 let wal_allocator = WalOptionsAllocator::RaftEngine;
1595 let regions = (0..16).collect();
1596 let region_wal_options =
1597 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1598 create_physical_table_metadata(
1599 &table_metadata_manager,
1600 table_info.clone(),
1601 region_routes.clone(),
1602 region_wal_options.clone(),
1603 )
1604 .await
1605 .unwrap();
1606
1607 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1608 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1609 let resp = mem_kv.range(range_req).await.unwrap();
1610 assert!(resp.kvs.is_empty());
1612 }
1613
1614 #[tokio::test]
1615 async fn test_create_table_metadata() {
1616 let mem_kv = Arc::new(MemoryKvBackend::default());
1617 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1618 let region_route = new_test_region_route();
1619 let region_routes = &vec![region_route.clone()];
1620 let table_info: RawTableInfo =
1621 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1622 let region_wal_options = create_mock_region_wal_options()
1623 .into_iter()
1624 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1625 .collect::<HashMap<_, _>>();
1626
1627 create_physical_table_metadata(
1629 &table_metadata_manager,
1630 table_info.clone(),
1631 region_routes.clone(),
1632 region_wal_options.clone(),
1633 )
1634 .await
1635 .unwrap();
1636
1637 assert!(
1639 create_physical_table_metadata(
1640 &table_metadata_manager,
1641 table_info.clone(),
1642 region_routes.clone(),
1643 region_wal_options.clone(),
1644 )
1645 .await
1646 .is_ok()
1647 );
1648
1649 let mut modified_region_routes = region_routes.clone();
1650 modified_region_routes.push(region_route.clone());
1651 assert!(
1653 create_physical_table_metadata(
1654 &table_metadata_manager,
1655 table_info.clone(),
1656 modified_region_routes,
1657 region_wal_options.clone(),
1658 )
1659 .await
1660 .is_err()
1661 );
1662
1663 let (remote_table_info, remote_table_route) = table_metadata_manager
1664 .get_full_table_info(10)
1665 .await
1666 .unwrap();
1667
1668 assert_eq!(
1669 remote_table_info.unwrap().into_inner().table_info,
1670 table_info
1671 );
1672 assert_eq!(
1673 remote_table_route
1674 .unwrap()
1675 .into_inner()
1676 .region_routes()
1677 .unwrap(),
1678 region_routes
1679 );
1680
1681 for i in 0..2 {
1682 let region_number = i as u32;
1683 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1684 let topic = format!("greptimedb_topic{}", i);
1685 let regions = table_metadata_manager
1686 .topic_region_manager
1687 .regions(&topic)
1688 .await
1689 .unwrap()
1690 .into_keys()
1691 .collect::<Vec<_>>();
1692 assert_eq!(regions.len(), 8);
1693 assert!(regions.contains(®ion_id));
1694 }
1695 }
1696
1697 #[tokio::test]
1698 async fn test_create_logic_tables_metadata() {
1699 let mem_kv = Arc::new(MemoryKvBackend::default());
1700 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1701 let region_route = new_test_region_route();
1702 let region_routes = vec![region_route.clone()];
1703 let table_info: RawTableInfo =
1704 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1705 let table_id = table_info.ident.table_id;
1706 let table_route_value = TableRouteValue::physical(region_routes.clone());
1707
1708 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1709 table_metadata_manager
1711 .create_logical_tables_metadata(tables_data.clone())
1712 .await
1713 .unwrap();
1714
1715 assert!(
1717 table_metadata_manager
1718 .create_logical_tables_metadata(tables_data)
1719 .await
1720 .is_ok()
1721 );
1722
1723 let mut modified_region_routes = region_routes.clone();
1724 modified_region_routes.push(new_region_route(2, 3));
1725 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1726 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1727 assert!(
1729 table_metadata_manager
1730 .create_logical_tables_metadata(modified_tables_data)
1731 .await
1732 .is_err()
1733 );
1734
1735 let (remote_table_info, remote_table_route) = table_metadata_manager
1736 .get_full_table_info(table_id)
1737 .await
1738 .unwrap();
1739
1740 assert_eq!(
1741 remote_table_info.unwrap().into_inner().table_info,
1742 table_info
1743 );
1744 assert_eq!(
1745 remote_table_route
1746 .unwrap()
1747 .into_inner()
1748 .region_routes()
1749 .unwrap(),
1750 ®ion_routes
1751 );
1752 }
1753
1754 #[tokio::test]
1755 async fn test_create_many_logical_tables_metadata() {
1756 let kv_backend = Arc::new(MemoryKvBackend::default());
1757 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1758
1759 let mut tables_data = vec![];
1760 for i in 0..128 {
1761 let table_id = i + 1;
1762 let regin_number = table_id * 3;
1763 let region_id = RegionId::new(table_id, regin_number);
1764 let region_route = new_region_route(region_id.as_u64(), 2);
1765 let region_routes = vec![region_route.clone()];
1766 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1767 table_id,
1768 &format!("my_table_{}", table_id),
1769 region_routes.iter().map(|r| r.region.id.region_number()),
1770 )
1771 .into();
1772 let table_route_value = TableRouteValue::physical(region_routes.clone());
1773
1774 tables_data.push((table_info, table_route_value));
1775 }
1776
1777 table_metadata_manager
1779 .create_logical_tables_metadata(tables_data)
1780 .await
1781 .unwrap();
1782 }
1783
1784 #[tokio::test]
1785 async fn test_delete_table_metadata() {
1786 let mem_kv = Arc::new(MemoryKvBackend::default());
1787 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1788 let region_route = new_test_region_route();
1789 let region_routes = &vec![region_route.clone()];
1790 let table_info: RawTableInfo =
1791 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1792 let table_id = table_info.ident.table_id;
1793 let datanode_id = 2;
1794 let region_wal_options = create_mock_region_wal_options();
1795 let serialized_region_wal_options = region_wal_options
1796 .iter()
1797 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1798 .collect::<HashMap<_, _>>();
1799
1800 create_physical_table_metadata(
1802 &table_metadata_manager,
1803 table_info.clone(),
1804 region_routes.clone(),
1805 serialized_region_wal_options,
1806 )
1807 .await
1808 .unwrap();
1809
1810 let table_name = TableName::new(
1811 table_info.catalog_name,
1812 table_info.schema_name,
1813 table_info.name,
1814 );
1815 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1816 table_metadata_manager
1818 .delete_table_metadata(
1819 table_id,
1820 &table_name,
1821 table_route_value,
1822 ®ion_wal_options,
1823 )
1824 .await
1825 .unwrap();
1826 table_metadata_manager
1828 .delete_table_metadata(
1829 table_id,
1830 &table_name,
1831 table_route_value,
1832 ®ion_wal_options,
1833 )
1834 .await
1835 .unwrap();
1836 assert!(
1837 table_metadata_manager
1838 .table_info_manager()
1839 .get(table_id)
1840 .await
1841 .unwrap()
1842 .is_none()
1843 );
1844 assert!(
1845 table_metadata_manager
1846 .table_route_manager()
1847 .table_route_storage()
1848 .get(table_id)
1849 .await
1850 .unwrap()
1851 .is_none()
1852 );
1853 assert!(
1854 table_metadata_manager
1855 .datanode_table_manager()
1856 .tables(datanode_id)
1857 .try_collect::<Vec<_>>()
1858 .await
1859 .unwrap()
1860 .is_empty()
1861 );
1862 let table_info = table_metadata_manager
1864 .table_info_manager()
1865 .get(table_id)
1866 .await
1867 .unwrap();
1868 assert!(table_info.is_none());
1869 let table_route = table_metadata_manager
1870 .table_route_manager()
1871 .table_route_storage()
1872 .get(table_id)
1873 .await
1874 .unwrap();
1875 assert!(table_route.is_none());
1876 let regions = table_metadata_manager
1878 .topic_region_manager
1879 .regions("greptimedb_topic0")
1880 .await
1881 .unwrap();
1882 assert_eq!(regions.len(), 0);
1883 let regions = table_metadata_manager
1884 .topic_region_manager
1885 .regions("greptimedb_topic1")
1886 .await
1887 .unwrap();
1888 assert_eq!(regions.len(), 0);
1889 }
1890
1891 #[tokio::test]
1892 async fn test_rename_table() {
1893 let mem_kv = Arc::new(MemoryKvBackend::default());
1894 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1895 let region_route = new_test_region_route();
1896 let region_routes = vec![region_route.clone()];
1897 let table_info: RawTableInfo =
1898 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1899 let table_id = table_info.ident.table_id;
1900 create_physical_table_metadata(
1902 &table_metadata_manager,
1903 table_info.clone(),
1904 region_routes.clone(),
1905 HashMap::new(),
1906 )
1907 .await
1908 .unwrap();
1909
1910 let new_table_name = "another_name".to_string();
1911 let table_info_value =
1912 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1913
1914 table_metadata_manager
1915 .rename_table(&table_info_value, new_table_name.clone())
1916 .await
1917 .unwrap();
1918 table_metadata_manager
1920 .rename_table(&table_info_value, new_table_name.clone())
1921 .await
1922 .unwrap();
1923 let mut modified_table_info = table_info.clone();
1924 modified_table_info.name = "hi".to_string();
1925 let modified_table_info_value =
1926 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1927 assert!(
1930 table_metadata_manager
1931 .rename_table(&modified_table_info_value, new_table_name.clone())
1932 .await
1933 .is_err()
1934 );
1935
1936 let old_table_name = TableNameKey::new(
1937 &table_info.catalog_name,
1938 &table_info.schema_name,
1939 &table_info.name,
1940 );
1941 let new_table_name = TableNameKey::new(
1942 &table_info.catalog_name,
1943 &table_info.schema_name,
1944 &new_table_name,
1945 );
1946
1947 assert!(
1948 table_metadata_manager
1949 .table_name_manager()
1950 .get(old_table_name)
1951 .await
1952 .unwrap()
1953 .is_none()
1954 );
1955
1956 assert_eq!(
1957 table_metadata_manager
1958 .table_name_manager()
1959 .get(new_table_name)
1960 .await
1961 .unwrap()
1962 .unwrap()
1963 .table_id(),
1964 table_id
1965 );
1966 }
1967
1968 #[tokio::test]
1969 async fn test_update_table_info() {
1970 let mem_kv = Arc::new(MemoryKvBackend::default());
1971 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1972 let region_route = new_test_region_route();
1973 let region_routes = vec![region_route.clone()];
1974 let table_info: RawTableInfo =
1975 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1976 let table_id = table_info.ident.table_id;
1977 create_physical_table_metadata(
1979 &table_metadata_manager,
1980 table_info.clone(),
1981 region_routes.clone(),
1982 HashMap::new(),
1983 )
1984 .await
1985 .unwrap();
1986
1987 let mut new_table_info = table_info.clone();
1988 new_table_info.name = "hi".to_string();
1989 let current_table_info_value =
1990 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1991 table_metadata_manager
1993 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1994 .await
1995 .unwrap();
1996 table_metadata_manager
1998 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1999 .await
2000 .unwrap();
2001
2002 let updated_table_info = table_metadata_manager
2004 .table_info_manager()
2005 .get(table_id)
2006 .await
2007 .unwrap()
2008 .unwrap()
2009 .into_inner();
2010 assert_eq!(updated_table_info.table_info, new_table_info);
2011
2012 let mut wrong_table_info = table_info.clone();
2013 wrong_table_info.name = "wrong".to_string();
2014 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2015 current_table_info_value.update(wrong_table_info),
2016 );
2017 assert!(
2020 table_metadata_manager
2021 .update_table_info(&wrong_table_info_value, None, new_table_info)
2022 .await
2023 .is_err()
2024 )
2025 }
2026
2027 #[tokio::test]
2028 async fn test_update_table_leader_region_status() {
2029 let mem_kv = Arc::new(MemoryKvBackend::default());
2030 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2031 let datanode = 1;
2032 let region_routes = vec![
2033 RegionRoute {
2034 region: Region {
2035 id: 1.into(),
2036 name: "r1".to_string(),
2037 partition: None,
2038 attrs: BTreeMap::new(),
2039 partition_expr: Default::default(),
2040 },
2041 leader_peer: Some(Peer::new(datanode, "a2")),
2042 leader_state: Some(LeaderState::Downgrading),
2043 follower_peers: vec![],
2044 leader_down_since: Some(current_time_millis()),
2045 },
2046 RegionRoute {
2047 region: Region {
2048 id: 2.into(),
2049 name: "r2".to_string(),
2050 partition: None,
2051 attrs: BTreeMap::new(),
2052 partition_expr: Default::default(),
2053 },
2054 leader_peer: Some(Peer::new(datanode, "a1")),
2055 leader_state: None,
2056 follower_peers: vec![],
2057 leader_down_since: None,
2058 },
2059 ];
2060 let table_info: RawTableInfo =
2061 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2062 let table_id = table_info.ident.table_id;
2063 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2064 TableRouteValue::physical(region_routes.clone()),
2065 );
2066
2067 create_physical_table_metadata(
2069 &table_metadata_manager,
2070 table_info.clone(),
2071 region_routes.clone(),
2072 HashMap::new(),
2073 )
2074 .await
2075 .unwrap();
2076
2077 table_metadata_manager
2078 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2079 if region_route.leader_state.is_some() {
2080 None
2081 } else {
2082 Some(Some(LeaderState::Downgrading))
2083 }
2084 })
2085 .await
2086 .unwrap();
2087
2088 let updated_route_value = table_metadata_manager
2089 .table_route_manager()
2090 .table_route_storage()
2091 .get(table_id)
2092 .await
2093 .unwrap()
2094 .unwrap();
2095
2096 assert_eq!(
2097 updated_route_value.region_routes().unwrap()[0].leader_state,
2098 Some(LeaderState::Downgrading)
2099 );
2100
2101 assert!(
2102 updated_route_value.region_routes().unwrap()[0]
2103 .leader_down_since
2104 .is_some()
2105 );
2106
2107 assert_eq!(
2108 updated_route_value.region_routes().unwrap()[1].leader_state,
2109 Some(LeaderState::Downgrading)
2110 );
2111 assert!(
2112 updated_route_value.region_routes().unwrap()[1]
2113 .leader_down_since
2114 .is_some()
2115 );
2116 }
2117
2118 async fn assert_datanode_table(
2119 table_metadata_manager: &TableMetadataManager,
2120 table_id: u32,
2121 region_routes: &[RegionRoute],
2122 ) {
2123 let region_distribution = region_distribution(region_routes);
2124 for (datanode, regions) in region_distribution {
2125 let got = table_metadata_manager
2126 .datanode_table_manager()
2127 .get(&DatanodeTableKey::new(datanode, table_id))
2128 .await
2129 .unwrap()
2130 .unwrap();
2131
2132 assert_eq!(got.regions, regions.leader_regions);
2133 assert_eq!(got.follower_regions, regions.follower_regions);
2134 }
2135 }
2136
2137 #[tokio::test]
2138 async fn test_update_table_route() {
2139 let mem_kv = Arc::new(MemoryKvBackend::default());
2140 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2141 let region_route = new_test_region_route();
2142 let region_routes = vec![region_route.clone()];
2143 let table_info: RawTableInfo =
2144 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2145 let table_id = table_info.ident.table_id;
2146 let engine = table_info.meta.engine.as_str();
2147 let region_storage_path =
2148 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2149 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2150 TableRouteValue::physical(region_routes.clone()),
2151 );
2152
2153 create_physical_table_metadata(
2155 &table_metadata_manager,
2156 table_info.clone(),
2157 region_routes.clone(),
2158 HashMap::new(),
2159 )
2160 .await
2161 .unwrap();
2162
2163 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2164 let new_region_routes = vec![
2165 new_region_route(1, 1),
2166 new_region_route(2, 2),
2167 new_region_route(3, 3),
2168 ];
2169 table_metadata_manager
2171 .update_table_route(
2172 table_id,
2173 RegionInfo {
2174 engine: engine.to_string(),
2175 region_storage_path: region_storage_path.to_string(),
2176 region_options: HashMap::new(),
2177 region_wal_options: HashMap::new(),
2178 },
2179 ¤t_table_route_value,
2180 new_region_routes.clone(),
2181 &HashMap::new(),
2182 &HashMap::new(),
2183 )
2184 .await
2185 .unwrap();
2186 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2187
2188 table_metadata_manager
2190 .update_table_route(
2191 table_id,
2192 RegionInfo {
2193 engine: engine.to_string(),
2194 region_storage_path: region_storage_path.to_string(),
2195 region_options: HashMap::new(),
2196 region_wal_options: HashMap::new(),
2197 },
2198 ¤t_table_route_value,
2199 new_region_routes.clone(),
2200 &HashMap::new(),
2201 &HashMap::new(),
2202 )
2203 .await
2204 .unwrap();
2205
2206 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2207 current_table_route_value
2208 .inner
2209 .update(new_region_routes.clone())
2210 .unwrap(),
2211 );
2212 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2213 table_metadata_manager
2215 .update_table_route(
2216 table_id,
2217 RegionInfo {
2218 engine: engine.to_string(),
2219 region_storage_path: region_storage_path.to_string(),
2220 region_options: HashMap::new(),
2221 region_wal_options: HashMap::new(),
2222 },
2223 ¤t_table_route_value,
2224 new_region_routes.clone(),
2225 &HashMap::new(),
2226 &HashMap::new(),
2227 )
2228 .await
2229 .unwrap();
2230 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2231
2232 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2235 current_table_route_value
2236 .update(vec![
2237 new_region_route(1, 1),
2238 new_region_route(2, 2),
2239 new_region_route(3, 3),
2240 new_region_route(4, 4),
2241 ])
2242 .unwrap(),
2243 );
2244 assert!(
2245 table_metadata_manager
2246 .update_table_route(
2247 table_id,
2248 RegionInfo {
2249 engine: engine.to_string(),
2250 region_storage_path: region_storage_path.to_string(),
2251 region_options: HashMap::new(),
2252 region_wal_options: HashMap::new(),
2253 },
2254 &wrong_table_route_value,
2255 new_region_routes,
2256 &HashMap::new(),
2257 &HashMap::new(),
2258 )
2259 .await
2260 .is_err()
2261 );
2262 }
2263
2264 #[tokio::test]
2265 async fn test_destroy_table_metadata() {
2266 let mem_kv = Arc::new(MemoryKvBackend::default());
2267 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2268 let table_id = 1025;
2269 let table_name = "foo";
2270 let task = test_create_table_task(table_name, table_id);
2271 let options = create_mock_region_wal_options();
2272 let serialized_options = options
2273 .iter()
2274 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2275 .collect::<HashMap<_, _>>();
2276 table_metadata_manager
2277 .create_table_metadata(
2278 task.table_info,
2279 TableRouteValue::physical(vec![
2280 RegionRoute {
2281 region: Region::new_test(RegionId::new(table_id, 1)),
2282 leader_peer: Some(Peer::empty(1)),
2283 follower_peers: vec![Peer::empty(5)],
2284 leader_state: None,
2285 leader_down_since: None,
2286 },
2287 RegionRoute {
2288 region: Region::new_test(RegionId::new(table_id, 2)),
2289 leader_peer: Some(Peer::empty(2)),
2290 follower_peers: vec![Peer::empty(4)],
2291 leader_state: None,
2292 leader_down_since: None,
2293 },
2294 RegionRoute {
2295 region: Region::new_test(RegionId::new(table_id, 3)),
2296 leader_peer: Some(Peer::empty(3)),
2297 follower_peers: vec![],
2298 leader_state: None,
2299 leader_down_since: None,
2300 },
2301 ]),
2302 serialized_options,
2303 )
2304 .await
2305 .unwrap();
2306 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2307 let table_route_value = table_metadata_manager
2308 .table_route_manager
2309 .table_route_storage()
2310 .get_with_raw_bytes(table_id)
2311 .await
2312 .unwrap()
2313 .unwrap();
2314 table_metadata_manager
2315 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2316 .await
2317 .unwrap();
2318 assert!(mem_kv.is_empty());
2319 }
2320
2321 #[tokio::test]
2322 async fn test_restore_table_metadata() {
2323 let mem_kv = Arc::new(MemoryKvBackend::default());
2324 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2325 let table_id = 1025;
2326 let table_name = "foo";
2327 let task = test_create_table_task(table_name, table_id);
2328 let options = create_mock_region_wal_options();
2329 let serialized_options = options
2330 .iter()
2331 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2332 .collect::<HashMap<_, _>>();
2333 table_metadata_manager
2334 .create_table_metadata(
2335 task.table_info,
2336 TableRouteValue::physical(vec![
2337 RegionRoute {
2338 region: Region::new_test(RegionId::new(table_id, 1)),
2339 leader_peer: Some(Peer::empty(1)),
2340 follower_peers: vec![Peer::empty(5)],
2341 leader_state: None,
2342 leader_down_since: None,
2343 },
2344 RegionRoute {
2345 region: Region::new_test(RegionId::new(table_id, 2)),
2346 leader_peer: Some(Peer::empty(2)),
2347 follower_peers: vec![Peer::empty(4)],
2348 leader_state: None,
2349 leader_down_since: None,
2350 },
2351 RegionRoute {
2352 region: Region::new_test(RegionId::new(table_id, 3)),
2353 leader_peer: Some(Peer::empty(3)),
2354 follower_peers: vec![],
2355 leader_state: None,
2356 leader_down_since: None,
2357 },
2358 ]),
2359 serialized_options,
2360 )
2361 .await
2362 .unwrap();
2363 let expected_result = mem_kv.dump();
2364 let table_route_value = table_metadata_manager
2365 .table_route_manager
2366 .table_route_storage()
2367 .get_with_raw_bytes(table_id)
2368 .await
2369 .unwrap()
2370 .unwrap();
2371 let region_routes = table_route_value.region_routes().unwrap();
2372 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2373 let table_route_value = TableRouteValue::physical(region_routes.clone());
2374 table_metadata_manager
2375 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2376 .await
2377 .unwrap();
2378 table_metadata_manager
2379 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2380 .await
2381 .unwrap();
2382 let kvs = mem_kv.dump();
2383 assert_eq!(kvs, expected_result);
2384 table_metadata_manager
2386 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2387 .await
2388 .unwrap();
2389 let kvs = mem_kv.dump();
2390 assert_eq!(kvs, expected_result);
2391 }
2392
2393 #[tokio::test]
2394 async fn test_create_update_view_info() {
2395 let mem_kv = Arc::new(MemoryKvBackend::default());
2396 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2397
2398 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2399
2400 let view_id = view_info.ident.table_id;
2401
2402 let logical_plan: Vec<u8> = vec![1, 2, 3];
2403 let columns = vec!["a".to_string()];
2404 let plan_columns = vec!["number".to_string()];
2405 let table_names = new_test_table_names();
2406 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2407
2408 table_metadata_manager
2410 .create_view_metadata(
2411 view_info.clone(),
2412 logical_plan.clone(),
2413 table_names.clone(),
2414 columns.clone(),
2415 plan_columns.clone(),
2416 definition.to_string(),
2417 )
2418 .await
2419 .unwrap();
2420
2421 {
2422 let current_view_info = table_metadata_manager
2424 .view_info_manager()
2425 .get(view_id)
2426 .await
2427 .unwrap()
2428 .unwrap()
2429 .into_inner();
2430 assert_eq!(current_view_info.view_info, logical_plan);
2431 assert_eq!(current_view_info.table_names, table_names);
2432 assert_eq!(current_view_info.definition, definition);
2433 assert_eq!(current_view_info.columns, columns);
2434 assert_eq!(current_view_info.plan_columns, plan_columns);
2435 let current_table_info = table_metadata_manager
2437 .table_info_manager()
2438 .get(view_id)
2439 .await
2440 .unwrap()
2441 .unwrap()
2442 .into_inner();
2443 assert_eq!(current_table_info.table_info, view_info);
2444 }
2445
2446 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2447 let new_table_names = {
2448 let mut set = HashSet::new();
2449 set.insert(TableName {
2450 catalog_name: "greptime".to_string(),
2451 schema_name: "public".to_string(),
2452 table_name: "b_table".to_string(),
2453 });
2454 set.insert(TableName {
2455 catalog_name: "greptime".to_string(),
2456 schema_name: "public".to_string(),
2457 table_name: "c_table".to_string(),
2458 });
2459 set
2460 };
2461 let new_columns = vec!["b".to_string()];
2462 let new_plan_columns = vec!["number2".to_string()];
2463 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2464
2465 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2466 logical_plan.clone(),
2467 table_names,
2468 columns,
2469 plan_columns,
2470 definition.to_string(),
2471 ));
2472 table_metadata_manager
2474 .update_view_info(
2475 view_id,
2476 ¤t_view_info_value,
2477 new_logical_plan.clone(),
2478 new_table_names.clone(),
2479 new_columns.clone(),
2480 new_plan_columns.clone(),
2481 new_definition.to_string(),
2482 )
2483 .await
2484 .unwrap();
2485 table_metadata_manager
2487 .update_view_info(
2488 view_id,
2489 ¤t_view_info_value,
2490 new_logical_plan.clone(),
2491 new_table_names.clone(),
2492 new_columns.clone(),
2493 new_plan_columns.clone(),
2494 new_definition.to_string(),
2495 )
2496 .await
2497 .unwrap();
2498
2499 let updated_view_info = table_metadata_manager
2501 .view_info_manager()
2502 .get(view_id)
2503 .await
2504 .unwrap()
2505 .unwrap()
2506 .into_inner();
2507 assert_eq!(updated_view_info.view_info, new_logical_plan);
2508 assert_eq!(updated_view_info.table_names, new_table_names);
2509 assert_eq!(updated_view_info.definition, new_definition);
2510 assert_eq!(updated_view_info.columns, new_columns);
2511 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2512
2513 let wrong_view_info = logical_plan.clone();
2514 let wrong_definition = "wrong_definition";
2515 let wrong_view_info_value =
2516 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2517 wrong_view_info,
2518 new_table_names.clone(),
2519 new_columns.clone(),
2520 new_plan_columns.clone(),
2521 wrong_definition.to_string(),
2522 ));
2523 assert!(
2526 table_metadata_manager
2527 .update_view_info(
2528 view_id,
2529 &wrong_view_info_value,
2530 new_logical_plan.clone(),
2531 new_table_names.clone(),
2532 vec!["c".to_string()],
2533 vec!["number3".to_string()],
2534 wrong_definition.to_string(),
2535 )
2536 .await
2537 .is_err()
2538 );
2539
2540 let current_view_info = table_metadata_manager
2542 .view_info_manager()
2543 .get(view_id)
2544 .await
2545 .unwrap()
2546 .unwrap()
2547 .into_inner();
2548 assert_eq!(current_view_info.view_info, new_logical_plan);
2549 assert_eq!(current_view_info.table_names, new_table_names);
2550 assert_eq!(current_view_info.definition, new_definition);
2551 assert_eq!(current_view_info.columns, new_columns);
2552 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2553 }
2554
2555 #[test]
2556 fn test_region_role_set_deserialize() {
2557 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2558 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2559 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2560 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2561
2562 let s = r#"[1, 2, 3]"#;
2563 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2564 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2565 assert!(region_role_set.follower_regions.is_empty());
2566 }
2567
2568 #[test]
2569 fn test_region_distribution_deserialize() {
2570 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2571 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2572 assert_eq!(region_distribution.len(), 2);
2573 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2574 assert!(region_distribution[&1].follower_regions.is_empty());
2575 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2576 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2577 }
2578}