1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_base::regex_pattern::NAME_PATTERN;
125use common_catalog::consts::{
126 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
127};
128use common_telemetry::warn;
129use common_wal::options::WalOptions;
130use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
131use flow::flow_route::FlowRouteValue;
132use flow::table_flow::TableFlowValue;
133use lazy_static::lazy_static;
134use regex::Regex;
135pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
136use serde::de::DeserializeOwned;
137use serde::{Deserialize, Serialize};
138use snafu::{OptionExt, ResultExt, ensure};
139use store_api::storage::RegionNumber;
140use table::metadata::{RawTableInfo, TableId};
141use table::table_name::TableName;
142use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
143use table_name::{TableNameKey, TableNameManager, TableNameValue};
144use topic_name::TopicNameManager;
145use topic_region::{TopicRegionKey, TopicRegionManager};
146use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
147
148use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
149use self::datanode_table::RegionInfo;
150use self::flow::flow_info::FlowInfoValue;
151use self::flow::flow_name::FlowNameValue;
152use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
153use self::table_route::{TableRouteManager, TableRouteValue};
154use self::tombstone::TombstoneManager;
155use crate::DatanodeId;
156use crate::error::{self, Result, SerdeJsonSnafu};
157use crate::key::flow::flow_state::FlowStateValue;
158use crate::key::node_address::NodeAddressValue;
159use crate::key::table_route::TableRouteKey;
160use crate::key::topic_region::TopicRegionValue;
161use crate::key::txn_helper::TxnOpGetResponseSet;
162use crate::kv_backend::KvBackendRef;
163use crate::kv_backend::txn::{Txn, TxnOp};
164use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
165use crate::rpc::store::BatchDeleteRequest;
166use crate::state_store::PoisonValue;
167
168pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
169pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
170pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
171pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
172pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
173
174pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
175pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
176pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
177pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
178pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
179pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
180pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
181pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
182pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
183pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
185pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
186
187pub const ELECTION_KEY: &str = "__metasrv_election";
189pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
191
192pub const CACHE_KEY_PREFIXES: [&str; 5] = [
194 TABLE_NAME_KEY_PREFIX,
195 CATALOG_NAME_KEY_PREFIX,
196 SCHEMA_NAME_KEY_PREFIX,
197 TABLE_ROUTE_PREFIX,
198 NODE_ADDRESS_PREFIX,
199];
200
201#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
203pub struct RegionRoleSet {
204 pub leader_regions: Vec<RegionNumber>,
206 pub follower_regions: Vec<RegionNumber>,
208}
209
210impl<'de> Deserialize<'de> for RegionRoleSet {
211 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
212 where
213 D: serde::Deserializer<'de>,
214 {
215 #[derive(Deserialize)]
216 #[serde(untagged)]
217 enum RegionRoleSetOrLeaderOnly {
218 Full {
219 leader_regions: Vec<RegionNumber>,
220 follower_regions: Vec<RegionNumber>,
221 },
222 LeaderOnly(Vec<RegionNumber>),
223 }
224 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
225 RegionRoleSetOrLeaderOnly::Full {
226 leader_regions,
227 follower_regions,
228 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
229 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
230 Ok(RegionRoleSet::new(leader_regions, vec![]))
231 }
232 }
233 }
234}
235
236impl RegionRoleSet {
237 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
239 Self {
240 leader_regions,
241 follower_regions,
242 }
243 }
244
245 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
247 self.leader_regions.push(region_number);
248 }
249
250 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
252 self.follower_regions.push(region_number);
253 }
254
255 pub fn sort(&mut self) {
257 self.follower_regions.sort();
258 self.leader_regions.sort();
259 }
260}
261
262pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
266
267pub type FlowId = u32;
269pub type FlowPartitionId = u32;
271
272lazy_static! {
273 pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
274}
275
276lazy_static! {
277 static ref TABLE_INFO_KEY_PATTERN: Regex =
278 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
279}
280
281lazy_static! {
282 static ref VIEW_INFO_KEY_PATTERN: Regex =
283 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
288 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
289}
290
291lazy_static! {
292 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
293 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
294}
295
296lazy_static! {
297 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
298 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
299 ))
300 .unwrap();
301}
302
303lazy_static! {
304 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
306 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
314 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
315 ))
316 .unwrap();
317}
318
319lazy_static! {
320 static ref NODE_ADDRESS_PATTERN: Regex =
321 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
322}
323
324lazy_static! {
325 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
326 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
327}
328
329lazy_static! {
330 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
331 "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
332 ))
333 .unwrap();
334}
335
336pub trait MetadataKey<'a, T> {
338 fn to_bytes(&self) -> Vec<u8>;
339
340 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
341}
342
343#[derive(Debug, Clone, PartialEq)]
344pub struct BytesAdapter(Vec<u8>);
345
346impl From<Vec<u8>> for BytesAdapter {
347 fn from(value: Vec<u8>) -> Self {
348 Self(value)
349 }
350}
351
352impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
353 fn to_bytes(&self) -> Vec<u8> {
354 self.0.clone()
355 }
356
357 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
358 Ok(BytesAdapter(bytes.to_vec()))
359 }
360}
361
362pub(crate) trait MetadataKeyGetTxnOp {
363 fn build_get_op(
364 &self,
365 ) -> (
366 TxnOp,
367 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
368 );
369}
370
371pub trait MetadataValue {
372 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
373 where
374 Self: Sized;
375
376 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
377}
378
379pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
380
381pub struct TableMetadataManager {
382 table_name_manager: TableNameManager,
383 table_info_manager: TableInfoManager,
384 view_info_manager: ViewInfoManager,
385 datanode_table_manager: DatanodeTableManager,
386 catalog_manager: CatalogManager,
387 schema_manager: SchemaManager,
388 table_route_manager: TableRouteManager,
389 tombstone_manager: TombstoneManager,
390 topic_name_manager: TopicNameManager,
391 topic_region_manager: TopicRegionManager,
392 kv_backend: KvBackendRef,
393}
394
395#[macro_export]
396macro_rules! ensure_values {
397 ($got:expr, $expected_value:expr, $name:expr) => {
398 ensure!(
399 $got == $expected_value,
400 error::UnexpectedSnafu {
401 err_msg: format!(
402 "Reads the different value: {:?} during {}, expected: {:?}",
403 $got, $name, $expected_value
404 )
405 }
406 );
407 };
408}
409
410pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
420 bytes: Bytes,
422 inner: T,
424}
425
426impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
427 type Target = T;
428
429 fn deref(&self) -> &Self::Target {
430 &self.inner
431 }
432}
433
434impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
435 fn deref_mut(&mut self) -> &mut Self::Target {
436 &mut self.inner
437 }
438}
439
440impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 write!(
443 f,
444 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
445 self.inner, self.bytes
446 )
447 }
448}
449
450impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
451 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
455 where
456 S: serde::Serializer,
457 {
458 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
461 }
462}
463
464impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
465 for DeserializedValueWithBytes<T>
466{
467 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
471 where
472 D: serde::Deserializer<'de>,
473 {
474 let buf = String::deserialize(deserializer)?;
475 let bytes = Bytes::from(buf);
476
477 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
478 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480 Ok(value)
481 }
482}
483
484impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
485 fn clone(&self) -> Self {
486 Self {
487 bytes: self.bytes.clone(),
488 inner: self.inner.clone(),
489 }
490 }
491}
492
493impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
494 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
497 let inner = T::try_from_raw_value(&bytes)?;
498 Ok(Self { bytes, inner })
499 }
500
501 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
504 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
505 }
506
507 pub fn into_inner(self) -> T {
508 self.inner
509 }
510
511 pub fn get_inner_ref(&self) -> &T {
512 &self.inner
513 }
514
515 pub fn get_raw_bytes(&self) -> Vec<u8> {
517 self.bytes.to_vec()
518 }
519
520 #[cfg(any(test, feature = "testing"))]
521 pub fn from_inner(inner: T) -> Self {
522 let bytes = serde_json::to_vec(&inner).unwrap();
523
524 Self {
525 bytes: Bytes::from(bytes),
526 inner,
527 }
528 }
529}
530
531impl TableMetadataManager {
532 pub fn new(kv_backend: KvBackendRef) -> Self {
533 TableMetadataManager {
534 table_name_manager: TableNameManager::new(kv_backend.clone()),
535 table_info_manager: TableInfoManager::new(kv_backend.clone()),
536 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
537 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
538 catalog_manager: CatalogManager::new(kv_backend.clone()),
539 schema_manager: SchemaManager::new(kv_backend.clone()),
540 table_route_manager: TableRouteManager::new(kv_backend.clone()),
541 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
542 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
543 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
544 kv_backend,
545 }
546 }
547
548 pub fn new_with_custom_tombstone_prefix(
550 kv_backend: KvBackendRef,
551 tombstone_prefix: &str,
552 ) -> Self {
553 Self {
554 table_name_manager: TableNameManager::new(kv_backend.clone()),
555 table_info_manager: TableInfoManager::new(kv_backend.clone()),
556 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
557 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
558 catalog_manager: CatalogManager::new(kv_backend.clone()),
559 schema_manager: SchemaManager::new(kv_backend.clone()),
560 table_route_manager: TableRouteManager::new(kv_backend.clone()),
561 tombstone_manager: TombstoneManager::new_with_prefix(
562 kv_backend.clone(),
563 tombstone_prefix,
564 ),
565 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
566 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
567 kv_backend,
568 }
569 }
570
571 pub async fn init(&self) -> Result<()> {
572 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
573
574 self.catalog_manager().create(catalog_name, true).await?;
575
576 let internal_schemas = [
577 DEFAULT_SCHEMA_NAME,
578 INFORMATION_SCHEMA_NAME,
579 DEFAULT_PRIVATE_SCHEMA_NAME,
580 ];
581
582 for schema_name in internal_schemas {
583 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
584
585 self.schema_manager().create(schema_key, None, true).await?;
586 }
587
588 Ok(())
589 }
590
591 pub fn table_name_manager(&self) -> &TableNameManager {
592 &self.table_name_manager
593 }
594
595 pub fn table_info_manager(&self) -> &TableInfoManager {
596 &self.table_info_manager
597 }
598
599 pub fn view_info_manager(&self) -> &ViewInfoManager {
600 &self.view_info_manager
601 }
602
603 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
604 &self.datanode_table_manager
605 }
606
607 pub fn catalog_manager(&self) -> &CatalogManager {
608 &self.catalog_manager
609 }
610
611 pub fn schema_manager(&self) -> &SchemaManager {
612 &self.schema_manager
613 }
614
615 pub fn table_route_manager(&self) -> &TableRouteManager {
616 &self.table_route_manager
617 }
618
619 pub fn topic_name_manager(&self) -> &TopicNameManager {
620 &self.topic_name_manager
621 }
622
623 pub fn topic_region_manager(&self) -> &TopicRegionManager {
624 &self.topic_region_manager
625 }
626
627 pub fn kv_backend(&self) -> &KvBackendRef {
628 &self.kv_backend
629 }
630
631 pub async fn get_full_table_info(
632 &self,
633 table_id: TableId,
634 ) -> Result<(
635 Option<DeserializedValueWithBytes<TableInfoValue>>,
636 Option<DeserializedValueWithBytes<TableRouteValue>>,
637 )> {
638 let table_info_key = TableInfoKey::new(table_id);
639 let table_route_key = TableRouteKey::new(table_id);
640 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
641 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
642
643 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
644 let mut res = self.kv_backend.txn(txn).await?;
645 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
646 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
647 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
648 Ok((table_info_value, table_route_value))
649 }
650
651 pub async fn create_view_metadata(
661 &self,
662 view_info: RawTableInfo,
663 raw_logical_plan: Vec<u8>,
664 table_names: HashSet<TableName>,
665 columns: Vec<String>,
666 plan_columns: Vec<String>,
667 definition: String,
668 ) -> Result<()> {
669 let view_id = view_info.ident.table_id;
670
671 let view_name = TableNameKey::new(
673 &view_info.catalog_name,
674 &view_info.schema_name,
675 &view_info.name,
676 );
677 let create_table_name_txn = self
678 .table_name_manager()
679 .build_create_txn(&view_name, view_id)?;
680
681 let table_info_value = TableInfoValue::new(view_info);
683
684 let (create_table_info_txn, on_create_table_info_failure) = self
685 .table_info_manager()
686 .build_create_txn(view_id, &table_info_value)?;
687
688 let view_info_value = ViewInfoValue::new(
690 raw_logical_plan,
691 table_names,
692 columns,
693 plan_columns,
694 definition,
695 );
696 let (create_view_info_txn, on_create_view_info_failure) = self
697 .view_info_manager()
698 .build_create_txn(view_id, &view_info_value)?;
699
700 let txn = Txn::merge_all(vec![
701 create_table_name_txn,
702 create_table_info_txn,
703 create_view_info_txn,
704 ]);
705
706 let mut r = self.kv_backend.txn(txn).await?;
707
708 if !r.succeeded {
710 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
711 let remote_table_info = on_create_table_info_failure(&mut set)?
712 .context(error::UnexpectedSnafu {
713 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
714 })?
715 .into_inner();
716
717 let remote_view_info = on_create_view_info_failure(&mut set)?
718 .context(error::UnexpectedSnafu {
719 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
720 })?
721 .into_inner();
722
723 let op_name = "the creating view metadata";
724 ensure_values!(remote_table_info, table_info_value, op_name);
725 ensure_values!(remote_view_info, view_info_value, op_name);
726 }
727
728 Ok(())
729 }
730
731 pub async fn create_table_metadata(
734 &self,
735 mut table_info: RawTableInfo,
736 table_route_value: TableRouteValue,
737 region_wal_options: HashMap<RegionNumber, String>,
738 ) -> Result<()> {
739 let region_numbers = table_route_value.region_numbers();
740 table_info.meta.region_numbers = region_numbers;
741 let table_id = table_info.ident.table_id;
742 let engine = table_info.meta.engine.clone();
743
744 let table_name = TableNameKey::new(
746 &table_info.catalog_name,
747 &table_info.schema_name,
748 &table_info.name,
749 );
750 let create_table_name_txn = self
751 .table_name_manager()
752 .build_create_txn(&table_name, table_id)?;
753
754 let region_options = table_info.to_region_options();
755 let table_info_value = TableInfoValue::new(table_info);
757 let (create_table_info_txn, on_create_table_info_failure) = self
758 .table_info_manager()
759 .build_create_txn(table_id, &table_info_value)?;
760
761 let (create_table_route_txn, on_create_table_route_failure) = self
762 .table_route_manager()
763 .table_route_storage()
764 .build_create_txn(table_id, &table_route_value)?;
765
766 let create_topic_region_txn = self
767 .topic_region_manager
768 .build_create_txn(table_id, ®ion_wal_options)?;
769
770 let mut txn = Txn::merge_all(vec![
771 create_table_name_txn,
772 create_table_info_txn,
773 create_table_route_txn,
774 create_topic_region_txn,
775 ]);
776
777 if let TableRouteValue::Physical(x) = &table_route_value {
778 let region_storage_path = table_info_value.region_storage_path();
779 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
780 table_id,
781 &engine,
782 ®ion_storage_path,
783 region_options,
784 region_wal_options,
785 region_distribution(&x.region_routes),
786 )?;
787 txn = txn.merge(create_datanode_table_txn);
788 }
789
790 let mut r = self.kv_backend.txn(txn).await?;
791
792 if !r.succeeded {
794 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
795 let remote_table_info = on_create_table_info_failure(&mut set)?
796 .context(error::UnexpectedSnafu {
797 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
798 })?
799 .into_inner();
800
801 let remote_table_route = on_create_table_route_failure(&mut set)?
802 .context(error::UnexpectedSnafu {
803 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
804 })?
805 .into_inner();
806
807 let op_name = "the creating table metadata";
808 ensure_values!(remote_table_info, table_info_value, op_name);
809 ensure_values!(remote_table_route, table_route_value, op_name);
810 }
811
812 Ok(())
813 }
814
815 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
816 self.kv_backend.max_txn_ops() / 3
819 }
820
821 pub async fn create_logical_tables_metadata(
823 &self,
824 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
825 ) -> Result<()> {
826 let len = tables_data.len();
827 let mut txns = Vec::with_capacity(3 * len);
828 struct OnFailure<F1, R1, F2, R2>
829 where
830 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
831 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
832 {
833 table_info_value: TableInfoValue,
834 on_create_table_info_failure: F1,
835 table_route_value: TableRouteValue,
836 on_create_table_route_failure: F2,
837 }
838 let mut on_failures = Vec::with_capacity(len);
839 for (mut table_info, table_route_value) in tables_data {
840 table_info.meta.region_numbers = table_route_value.region_numbers();
841 let table_id = table_info.ident.table_id;
842
843 let table_name = TableNameKey::new(
845 &table_info.catalog_name,
846 &table_info.schema_name,
847 &table_info.name,
848 );
849 let create_table_name_txn = self
850 .table_name_manager()
851 .build_create_txn(&table_name, table_id)?;
852 txns.push(create_table_name_txn);
853
854 let table_info_value = TableInfoValue::new(table_info);
856 let (create_table_info_txn, on_create_table_info_failure) =
857 self.table_info_manager()
858 .build_create_txn(table_id, &table_info_value)?;
859 txns.push(create_table_info_txn);
860
861 let (create_table_route_txn, on_create_table_route_failure) = self
862 .table_route_manager()
863 .table_route_storage()
864 .build_create_txn(table_id, &table_route_value)?;
865 txns.push(create_table_route_txn);
866
867 on_failures.push(OnFailure {
868 table_info_value,
869 on_create_table_info_failure,
870 table_route_value,
871 on_create_table_route_failure,
872 });
873 }
874
875 let txn = Txn::merge_all(txns);
876 let mut r = self.kv_backend.txn(txn).await?;
877
878 if !r.succeeded {
880 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
881 for on_failure in on_failures {
882 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
883 .context(error::UnexpectedSnafu {
884 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
885 })?
886 .into_inner();
887
888 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
889 .context(error::UnexpectedSnafu {
890 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
891 })?
892 .into_inner();
893
894 let op_name = "the creating logical tables metadata";
895 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
896 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
897 }
898 }
899
900 Ok(())
901 }
902
903 fn table_metadata_keys(
904 &self,
905 table_id: TableId,
906 table_name: &TableName,
907 table_route_value: &TableRouteValue,
908 region_wal_options: &HashMap<RegionNumber, WalOptions>,
909 ) -> Result<Vec<Vec<u8>>> {
910 let datanode_ids = if table_route_value.is_physical() {
912 region_distribution(table_route_value.region_routes()?)
913 .into_keys()
914 .collect()
915 } else {
916 vec![]
917 };
918 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
919 let table_name = TableNameKey::new(
920 &table_name.catalog_name,
921 &table_name.schema_name,
922 &table_name.table_name,
923 );
924 let table_info_key = TableInfoKey::new(table_id);
925 let table_route_key = TableRouteKey::new(table_id);
926 let datanode_table_keys = datanode_ids
927 .into_iter()
928 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
929 .collect::<HashSet<_>>();
930 let topic_region_map = self
931 .topic_region_manager
932 .get_topic_region_mapping(table_id, region_wal_options);
933 let topic_region_keys = topic_region_map
934 .iter()
935 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
936 .collect::<Vec<_>>();
937 keys.push(table_name.to_bytes());
938 keys.push(table_info_key.to_bytes());
939 keys.push(table_route_key.to_bytes());
940 for key in &datanode_table_keys {
941 keys.push(key.to_bytes());
942 }
943 for key in topic_region_keys {
944 keys.push(key.to_bytes());
945 }
946 Ok(keys)
947 }
948
949 pub async fn delete_table_metadata(
952 &self,
953 table_id: TableId,
954 table_name: &TableName,
955 table_route_value: &TableRouteValue,
956 region_wal_options: &HashMap<RegionNumber, WalOptions>,
957 ) -> Result<()> {
958 let keys =
959 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
960 self.tombstone_manager.create(keys).await.map(|_| ())
961 }
962
963 pub async fn delete_table_metadata_tombstone(
966 &self,
967 table_id: TableId,
968 table_name: &TableName,
969 table_route_value: &TableRouteValue,
970 region_wal_options: &HashMap<RegionNumber, WalOptions>,
971 ) -> Result<()> {
972 let table_metadata_keys =
973 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
974 self.tombstone_manager
975 .delete(table_metadata_keys)
976 .await
977 .map(|_| ())
978 }
979
980 pub async fn restore_table_metadata(
983 &self,
984 table_id: TableId,
985 table_name: &TableName,
986 table_route_value: &TableRouteValue,
987 region_wal_options: &HashMap<RegionNumber, WalOptions>,
988 ) -> Result<()> {
989 let keys =
990 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
991 self.tombstone_manager.restore(keys).await.map(|_| ())
992 }
993
994 pub async fn destroy_table_metadata(
997 &self,
998 table_id: TableId,
999 table_name: &TableName,
1000 table_route_value: &TableRouteValue,
1001 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1002 ) -> Result<()> {
1003 let keys =
1004 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1005 let _ = self
1006 .kv_backend
1007 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1008 .await?;
1009 Ok(())
1010 }
1011
1012 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1013 let mut keys = Vec::with_capacity(3);
1014 let view_name = TableNameKey::new(
1015 &view_name.catalog_name,
1016 &view_name.schema_name,
1017 &view_name.table_name,
1018 );
1019 let table_info_key = TableInfoKey::new(view_id);
1020 let view_info_key = ViewInfoKey::new(view_id);
1021 keys.push(view_name.to_bytes());
1022 keys.push(table_info_key.to_bytes());
1023 keys.push(view_info_key.to_bytes());
1024
1025 Ok(keys)
1026 }
1027
1028 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1031 let keys = self.view_info_keys(view_id, view_name)?;
1032 let _ = self
1033 .kv_backend
1034 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1035 .await?;
1036 Ok(())
1037 }
1038
1039 pub async fn rename_table(
1043 &self,
1044 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1045 new_table_name: String,
1046 ) -> Result<()> {
1047 let current_table_info = ¤t_table_info_value.table_info;
1048 let table_id = current_table_info.ident.table_id;
1049
1050 let table_name_key = TableNameKey::new(
1051 ¤t_table_info.catalog_name,
1052 ¤t_table_info.schema_name,
1053 ¤t_table_info.name,
1054 );
1055
1056 let new_table_name_key = TableNameKey::new(
1057 ¤t_table_info.catalog_name,
1058 ¤t_table_info.schema_name,
1059 &new_table_name,
1060 );
1061
1062 let update_table_name_txn = self.table_name_manager().build_update_txn(
1064 &table_name_key,
1065 &new_table_name_key,
1066 table_id,
1067 )?;
1068
1069 let new_table_info_value = current_table_info_value
1070 .inner
1071 .with_update(move |table_info| {
1072 table_info.name = new_table_name;
1073 });
1074
1075 let (update_table_info_txn, on_update_table_info_failure) = self
1077 .table_info_manager()
1078 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1079
1080 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1081
1082 let mut r = self.kv_backend.txn(txn).await?;
1083
1084 if !r.succeeded {
1086 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1087 let remote_table_info = on_update_table_info_failure(&mut set)?
1088 .context(error::UnexpectedSnafu {
1089 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1090 })?
1091 .into_inner();
1092
1093 let op_name = "the renaming table metadata";
1094 ensure_values!(remote_table_info, new_table_info_value, op_name);
1095 }
1096
1097 Ok(())
1098 }
1099
1100 pub async fn update_table_info(
1104 &self,
1105 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1106 region_distribution: Option<RegionDistribution>,
1107 new_table_info: RawTableInfo,
1108 ) -> Result<()> {
1109 let table_id = current_table_info_value.table_info.ident.table_id;
1110 let new_table_info_value = current_table_info_value.update(new_table_info);
1111
1112 let (update_table_info_txn, on_update_table_info_failure) = self
1114 .table_info_manager()
1115 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1116
1117 let txn = if let Some(region_distribution) = region_distribution {
1118 let new_region_options = new_table_info_value.table_info.to_region_options();
1120 let update_datanode_table_options_txn = self
1121 .datanode_table_manager
1122 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1123 .await?;
1124 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1125 } else {
1126 update_table_info_txn
1127 };
1128
1129 let mut r = self.kv_backend.txn(txn).await?;
1130 if !r.succeeded {
1132 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1133 let remote_table_info = on_update_table_info_failure(&mut set)?
1134 .context(error::UnexpectedSnafu {
1135 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1136 })?
1137 .into_inner();
1138
1139 let op_name = "the updating table info";
1140 ensure_values!(remote_table_info, new_table_info_value, op_name);
1141 }
1142 Ok(())
1143 }
1144
1145 #[allow(clippy::too_many_arguments)]
1156 pub async fn update_view_info(
1157 &self,
1158 view_id: TableId,
1159 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1160 new_view_info: Vec<u8>,
1161 table_names: HashSet<TableName>,
1162 columns: Vec<String>,
1163 plan_columns: Vec<String>,
1164 definition: String,
1165 ) -> Result<()> {
1166 let new_view_info_value = current_view_info_value.update(
1167 new_view_info,
1168 table_names,
1169 columns,
1170 plan_columns,
1171 definition,
1172 );
1173
1174 let (update_view_info_txn, on_update_view_info_failure) = self
1176 .view_info_manager()
1177 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1178
1179 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1180
1181 if !r.succeeded {
1183 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1184 let remote_view_info = on_update_view_info_failure(&mut set)?
1185 .context(error::UnexpectedSnafu {
1186 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1187 })?
1188 .into_inner();
1189
1190 let op_name = "the updating view info";
1191 ensure_values!(remote_view_info, new_view_info_value, op_name);
1192 }
1193 Ok(())
1194 }
1195
1196 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1197 self.kv_backend.max_txn_ops()
1198 }
1199
1200 pub async fn batch_update_table_info_values(
1201 &self,
1202 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1203 ) -> Result<()> {
1204 let len = table_info_value_pairs.len();
1205 let mut txns = Vec::with_capacity(len);
1206 struct OnFailure<F, R>
1207 where
1208 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1209 {
1210 table_info_value: TableInfoValue,
1211 on_update_table_info_failure: F,
1212 }
1213 let mut on_failures = Vec::with_capacity(len);
1214
1215 for (table_info_value, new_table_info) in table_info_value_pairs {
1216 let table_id = table_info_value.table_info.ident.table_id;
1217
1218 let new_table_info_value = table_info_value.update(new_table_info);
1219
1220 let (update_table_info_txn, on_update_table_info_failure) =
1221 self.table_info_manager().build_update_txn(
1222 table_id,
1223 &table_info_value,
1224 &new_table_info_value,
1225 )?;
1226
1227 txns.push(update_table_info_txn);
1228
1229 on_failures.push(OnFailure {
1230 table_info_value: new_table_info_value,
1231 on_update_table_info_failure,
1232 });
1233 }
1234
1235 let txn = Txn::merge_all(txns);
1236 let mut r = self.kv_backend.txn(txn).await?;
1237
1238 if !r.succeeded {
1239 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1240 for on_failure in on_failures {
1241 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1242 .context(error::UnexpectedSnafu {
1243 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1244 })?
1245 .into_inner();
1246
1247 let op_name = "the batch updating table info";
1248 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1249 }
1250 }
1251
1252 Ok(())
1253 }
1254
1255 pub async fn update_table_route(
1256 &self,
1257 table_id: TableId,
1258 region_info: RegionInfo,
1259 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1260 new_region_routes: Vec<RegionRoute>,
1261 new_region_options: &HashMap<String, String>,
1262 new_region_wal_options: &HashMap<RegionNumber, String>,
1263 ) -> Result<()> {
1264 let current_region_distribution =
1266 region_distribution(current_table_route_value.region_routes()?);
1267 let new_region_distribution = region_distribution(&new_region_routes);
1268
1269 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1270 table_id,
1271 region_info,
1272 current_region_distribution,
1273 new_region_distribution,
1274 new_region_options,
1275 new_region_wal_options,
1276 )?;
1277
1278 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1280
1281 let (update_table_route_txn, on_update_table_route_failure) = self
1282 .table_route_manager()
1283 .table_route_storage()
1284 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1285
1286 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1287
1288 let mut r = self.kv_backend.txn(txn).await?;
1289
1290 if !r.succeeded {
1292 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1293 let remote_table_route = on_update_table_route_failure(&mut set)?
1294 .context(error::UnexpectedSnafu {
1295 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1296 })?
1297 .into_inner();
1298
1299 let op_name = "the updating table route";
1300 ensure_values!(remote_table_route, new_table_route_value, op_name);
1301 }
1302
1303 Ok(())
1304 }
1305
1306 pub async fn update_leader_region_status<F>(
1308 &self,
1309 table_id: TableId,
1310 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1311 next_region_route_status: F,
1312 ) -> Result<()>
1313 where
1314 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1315 {
1316 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1317
1318 let mut updated = 0;
1319 for route in &mut new_region_routes {
1320 if let Some(state) = next_region_route_status(route)
1321 && route.set_leader_state(state)
1322 {
1323 updated += 1;
1324 }
1325 }
1326
1327 if updated == 0 {
1328 warn!("No leader status updated");
1329 return Ok(());
1330 }
1331
1332 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1334
1335 let (update_table_route_txn, on_update_table_route_failure) = self
1336 .table_route_manager()
1337 .table_route_storage()
1338 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1339
1340 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1341
1342 if !r.succeeded {
1344 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1345 let remote_table_route = on_update_table_route_failure(&mut set)?
1346 .context(error::UnexpectedSnafu {
1347 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1348 })?
1349 .into_inner();
1350
1351 let op_name = "the updating leader region status";
1352 ensure_values!(remote_table_route, new_table_route_value, op_name);
1353 }
1354
1355 Ok(())
1356 }
1357}
1358
1359#[macro_export]
1360macro_rules! impl_metadata_value {
1361 ($($val_ty: ty), *) => {
1362 $(
1363 impl $crate::key::MetadataValue for $val_ty {
1364 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1365 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1366 }
1367
1368 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1369 serde_json::to_vec(self).context(SerdeJsonSnafu)
1370 }
1371 }
1372 )*
1373 }
1374}
1375
1376macro_rules! impl_metadata_key_get_txn_op {
1377 ($($key: ty), *) => {
1378 $(
1379 impl $crate::key::MetadataKeyGetTxnOp for $key {
1380 fn build_get_op(
1383 &self,
1384 ) -> (
1385 TxnOp,
1386 impl for<'a> FnMut(
1387 &'a mut TxnOpGetResponseSet,
1388 ) -> Option<Vec<u8>>,
1389 ) {
1390 let raw_key = self.to_bytes();
1391 (
1392 TxnOp::Get(raw_key.clone()),
1393 TxnOpGetResponseSet::filter(raw_key),
1394 )
1395 }
1396 }
1397 )*
1398 }
1399}
1400
1401impl_metadata_key_get_txn_op! {
1402 TableNameKey<'_>,
1403 TableInfoKey,
1404 ViewInfoKey,
1405 TableRouteKey,
1406 DatanodeTableKey
1407}
1408
1409#[macro_export]
1410macro_rules! impl_optional_metadata_value {
1411 ($($val_ty: ty), *) => {
1412 $(
1413 impl $val_ty {
1414 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1415 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1416 }
1417
1418 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1419 serde_json::to_vec(self).context(SerdeJsonSnafu)
1420 }
1421 }
1422 )*
1423 }
1424}
1425
1426impl_metadata_value! {
1427 TableNameValue,
1428 TableInfoValue,
1429 ViewInfoValue,
1430 DatanodeTableValue,
1431 FlowInfoValue,
1432 FlowNameValue,
1433 FlowRouteValue,
1434 TableFlowValue,
1435 NodeAddressValue,
1436 SchemaNameValue,
1437 FlowStateValue,
1438 PoisonValue,
1439 TopicRegionValue
1440}
1441
1442impl_optional_metadata_value! {
1443 CatalogNameValue,
1444 SchemaNameValue
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449 use std::collections::{BTreeMap, HashMap, HashSet};
1450 use std::sync::Arc;
1451
1452 use bytes::Bytes;
1453 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1454 use common_time::util::current_time_millis;
1455 use common_wal::options::{KafkaWalOptions, WalOptions};
1456 use futures::TryStreamExt;
1457 use store_api::storage::{RegionId, RegionNumber};
1458 use table::metadata::{RawTableInfo, TableInfo};
1459 use table::table_name::TableName;
1460
1461 use super::datanode_table::DatanodeTableKey;
1462 use super::test_utils;
1463 use crate::ddl::test_util::create_table::test_create_table_task;
1464 use crate::ddl::utils::region_storage_path;
1465 use crate::error::Result;
1466 use crate::key::datanode_table::RegionInfo;
1467 use crate::key::table_info::TableInfoValue;
1468 use crate::key::table_name::TableNameKey;
1469 use crate::key::table_route::TableRouteValue;
1470 use crate::key::{
1471 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
1472 TableMetadataManager, ViewInfoValue,
1473 };
1474 use crate::kv_backend::KvBackend;
1475 use crate::kv_backend::memory::MemoryKvBackend;
1476 use crate::peer::Peer;
1477 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
1478 use crate::rpc::store::RangeRequest;
1479 use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
1480
1481 #[test]
1482 fn test_deserialized_value_with_bytes() {
1483 let region_route = new_test_region_route();
1484 let region_routes = vec![region_route.clone()];
1485
1486 let expected_region_routes =
1487 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1488 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1489
1490 let value = DeserializedValueWithBytes {
1493 inner: TableRouteValue::physical(region_routes.clone()),
1495 bytes: Bytes::from(expected.clone()),
1496 };
1497
1498 let encoded = serde_json::to_vec(&value).unwrap();
1499
1500 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1503 serde_json::from_slice(&encoded).unwrap();
1504
1505 assert_eq!(decoded.inner, expected_region_routes);
1506 assert_eq!(decoded.bytes, expected);
1507 }
1508
1509 fn new_test_region_route() -> RegionRoute {
1510 new_region_route(1, 2)
1511 }
1512
1513 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1514 RegionRoute {
1515 region: Region {
1516 id: region_id.into(),
1517 name: "r1".to_string(),
1518 partition: None,
1519 attrs: BTreeMap::new(),
1520 partition_expr: Default::default(),
1521 },
1522 leader_peer: Some(Peer::new(datanode, "a2")),
1523 follower_peers: vec![],
1524 leader_state: None,
1525 leader_down_since: None,
1526 }
1527 }
1528
1529 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1530 test_utils::new_test_table_info(10, region_numbers)
1531 }
1532
1533 fn new_test_table_names() -> HashSet<TableName> {
1534 let mut set = HashSet::new();
1535 set.insert(TableName {
1536 catalog_name: "greptime".to_string(),
1537 schema_name: "public".to_string(),
1538 table_name: "a_table".to_string(),
1539 });
1540 set.insert(TableName {
1541 catalog_name: "greptime".to_string(),
1542 schema_name: "public".to_string(),
1543 table_name: "b_table".to_string(),
1544 });
1545 set
1546 }
1547
1548 async fn create_physical_table_metadata(
1549 table_metadata_manager: &TableMetadataManager,
1550 table_info: RawTableInfo,
1551 region_routes: Vec<RegionRoute>,
1552 region_wal_options: HashMap<RegionNumber, String>,
1553 ) -> Result<()> {
1554 table_metadata_manager
1555 .create_table_metadata(
1556 table_info,
1557 TableRouteValue::physical(region_routes),
1558 region_wal_options,
1559 )
1560 .await
1561 }
1562
1563 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1564 let topics = (0..2)
1565 .map(|i| format!("greptimedb_topic{}", i))
1566 .collect::<Vec<_>>();
1567 let wal_options = topics
1568 .iter()
1569 .map(|topic| {
1570 WalOptions::Kafka(KafkaWalOptions {
1571 topic: topic.clone(),
1572 })
1573 })
1574 .collect::<Vec<_>>();
1575
1576 (0..16)
1577 .enumerate()
1578 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1579 .collect()
1580 }
1581
1582 #[tokio::test]
1583 async fn test_raft_engine_topic_region_map() {
1584 let mem_kv = Arc::new(MemoryKvBackend::default());
1585 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1586 let region_route = new_test_region_route();
1587 let region_routes = &vec![region_route.clone()];
1588 let table_info: RawTableInfo =
1589 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1590 let wal_allocator = WalOptionsAllocator::RaftEngine;
1591 let regions = (0..16).collect();
1592 let region_wal_options =
1593 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1594 create_physical_table_metadata(
1595 &table_metadata_manager,
1596 table_info.clone(),
1597 region_routes.clone(),
1598 region_wal_options.clone(),
1599 )
1600 .await
1601 .unwrap();
1602
1603 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1604 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1605 let resp = mem_kv.range(range_req).await.unwrap();
1606 assert!(resp.kvs.is_empty());
1608 }
1609
1610 #[tokio::test]
1611 async fn test_create_table_metadata() {
1612 let mem_kv = Arc::new(MemoryKvBackend::default());
1613 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1614 let region_route = new_test_region_route();
1615 let region_routes = &vec![region_route.clone()];
1616 let table_info: RawTableInfo =
1617 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1618 let region_wal_options = create_mock_region_wal_options()
1619 .into_iter()
1620 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1621 .collect::<HashMap<_, _>>();
1622
1623 create_physical_table_metadata(
1625 &table_metadata_manager,
1626 table_info.clone(),
1627 region_routes.clone(),
1628 region_wal_options.clone(),
1629 )
1630 .await
1631 .unwrap();
1632
1633 assert!(
1635 create_physical_table_metadata(
1636 &table_metadata_manager,
1637 table_info.clone(),
1638 region_routes.clone(),
1639 region_wal_options.clone(),
1640 )
1641 .await
1642 .is_ok()
1643 );
1644
1645 let mut modified_region_routes = region_routes.clone();
1646 modified_region_routes.push(region_route.clone());
1647 assert!(
1649 create_physical_table_metadata(
1650 &table_metadata_manager,
1651 table_info.clone(),
1652 modified_region_routes,
1653 region_wal_options.clone(),
1654 )
1655 .await
1656 .is_err()
1657 );
1658
1659 let (remote_table_info, remote_table_route) = table_metadata_manager
1660 .get_full_table_info(10)
1661 .await
1662 .unwrap();
1663
1664 assert_eq!(
1665 remote_table_info.unwrap().into_inner().table_info,
1666 table_info
1667 );
1668 assert_eq!(
1669 remote_table_route
1670 .unwrap()
1671 .into_inner()
1672 .region_routes()
1673 .unwrap(),
1674 region_routes
1675 );
1676
1677 for i in 0..2 {
1678 let region_number = i as u32;
1679 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1680 let topic = format!("greptimedb_topic{}", i);
1681 let regions = table_metadata_manager
1682 .topic_region_manager
1683 .regions(&topic)
1684 .await
1685 .unwrap()
1686 .into_keys()
1687 .collect::<Vec<_>>();
1688 assert_eq!(regions.len(), 8);
1689 assert!(regions.contains(®ion_id));
1690 }
1691 }
1692
1693 #[tokio::test]
1694 async fn test_create_logic_tables_metadata() {
1695 let mem_kv = Arc::new(MemoryKvBackend::default());
1696 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1697 let region_route = new_test_region_route();
1698 let region_routes = vec![region_route.clone()];
1699 let table_info: RawTableInfo =
1700 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1701 let table_id = table_info.ident.table_id;
1702 let table_route_value = TableRouteValue::physical(region_routes.clone());
1703
1704 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1705 table_metadata_manager
1707 .create_logical_tables_metadata(tables_data.clone())
1708 .await
1709 .unwrap();
1710
1711 assert!(
1713 table_metadata_manager
1714 .create_logical_tables_metadata(tables_data)
1715 .await
1716 .is_ok()
1717 );
1718
1719 let mut modified_region_routes = region_routes.clone();
1720 modified_region_routes.push(new_region_route(2, 3));
1721 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1722 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1723 assert!(
1725 table_metadata_manager
1726 .create_logical_tables_metadata(modified_tables_data)
1727 .await
1728 .is_err()
1729 );
1730
1731 let (remote_table_info, remote_table_route) = table_metadata_manager
1732 .get_full_table_info(table_id)
1733 .await
1734 .unwrap();
1735
1736 assert_eq!(
1737 remote_table_info.unwrap().into_inner().table_info,
1738 table_info
1739 );
1740 assert_eq!(
1741 remote_table_route
1742 .unwrap()
1743 .into_inner()
1744 .region_routes()
1745 .unwrap(),
1746 ®ion_routes
1747 );
1748 }
1749
1750 #[tokio::test]
1751 async fn test_create_many_logical_tables_metadata() {
1752 let kv_backend = Arc::new(MemoryKvBackend::default());
1753 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1754
1755 let mut tables_data = vec![];
1756 for i in 0..128 {
1757 let table_id = i + 1;
1758 let regin_number = table_id * 3;
1759 let region_id = RegionId::new(table_id, regin_number);
1760 let region_route = new_region_route(region_id.as_u64(), 2);
1761 let region_routes = vec![region_route.clone()];
1762 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1763 table_id,
1764 &format!("my_table_{}", table_id),
1765 region_routes.iter().map(|r| r.region.id.region_number()),
1766 )
1767 .into();
1768 let table_route_value = TableRouteValue::physical(region_routes.clone());
1769
1770 tables_data.push((table_info, table_route_value));
1771 }
1772
1773 table_metadata_manager
1775 .create_logical_tables_metadata(tables_data)
1776 .await
1777 .unwrap();
1778 }
1779
1780 #[tokio::test]
1781 async fn test_delete_table_metadata() {
1782 let mem_kv = Arc::new(MemoryKvBackend::default());
1783 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1784 let region_route = new_test_region_route();
1785 let region_routes = &vec![region_route.clone()];
1786 let table_info: RawTableInfo =
1787 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1788 let table_id = table_info.ident.table_id;
1789 let datanode_id = 2;
1790 let region_wal_options = create_mock_region_wal_options();
1791 let serialized_region_wal_options = region_wal_options
1792 .iter()
1793 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1794 .collect::<HashMap<_, _>>();
1795
1796 create_physical_table_metadata(
1798 &table_metadata_manager,
1799 table_info.clone(),
1800 region_routes.clone(),
1801 serialized_region_wal_options,
1802 )
1803 .await
1804 .unwrap();
1805
1806 let table_name = TableName::new(
1807 table_info.catalog_name,
1808 table_info.schema_name,
1809 table_info.name,
1810 );
1811 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1812 table_metadata_manager
1814 .delete_table_metadata(
1815 table_id,
1816 &table_name,
1817 table_route_value,
1818 ®ion_wal_options,
1819 )
1820 .await
1821 .unwrap();
1822 table_metadata_manager
1824 .delete_table_metadata(
1825 table_id,
1826 &table_name,
1827 table_route_value,
1828 ®ion_wal_options,
1829 )
1830 .await
1831 .unwrap();
1832 assert!(
1833 table_metadata_manager
1834 .table_info_manager()
1835 .get(table_id)
1836 .await
1837 .unwrap()
1838 .is_none()
1839 );
1840 assert!(
1841 table_metadata_manager
1842 .table_route_manager()
1843 .table_route_storage()
1844 .get(table_id)
1845 .await
1846 .unwrap()
1847 .is_none()
1848 );
1849 assert!(
1850 table_metadata_manager
1851 .datanode_table_manager()
1852 .tables(datanode_id)
1853 .try_collect::<Vec<_>>()
1854 .await
1855 .unwrap()
1856 .is_empty()
1857 );
1858 let table_info = table_metadata_manager
1860 .table_info_manager()
1861 .get(table_id)
1862 .await
1863 .unwrap();
1864 assert!(table_info.is_none());
1865 let table_route = table_metadata_manager
1866 .table_route_manager()
1867 .table_route_storage()
1868 .get(table_id)
1869 .await
1870 .unwrap();
1871 assert!(table_route.is_none());
1872 let regions = table_metadata_manager
1874 .topic_region_manager
1875 .regions("greptimedb_topic0")
1876 .await
1877 .unwrap();
1878 assert_eq!(regions.len(), 0);
1879 let regions = table_metadata_manager
1880 .topic_region_manager
1881 .regions("greptimedb_topic1")
1882 .await
1883 .unwrap();
1884 assert_eq!(regions.len(), 0);
1885 }
1886
1887 #[tokio::test]
1888 async fn test_rename_table() {
1889 let mem_kv = Arc::new(MemoryKvBackend::default());
1890 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1891 let region_route = new_test_region_route();
1892 let region_routes = vec![region_route.clone()];
1893 let table_info: RawTableInfo =
1894 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1895 let table_id = table_info.ident.table_id;
1896 create_physical_table_metadata(
1898 &table_metadata_manager,
1899 table_info.clone(),
1900 region_routes.clone(),
1901 HashMap::new(),
1902 )
1903 .await
1904 .unwrap();
1905
1906 let new_table_name = "another_name".to_string();
1907 let table_info_value =
1908 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1909
1910 table_metadata_manager
1911 .rename_table(&table_info_value, new_table_name.clone())
1912 .await
1913 .unwrap();
1914 table_metadata_manager
1916 .rename_table(&table_info_value, new_table_name.clone())
1917 .await
1918 .unwrap();
1919 let mut modified_table_info = table_info.clone();
1920 modified_table_info.name = "hi".to_string();
1921 let modified_table_info_value =
1922 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1923 assert!(
1926 table_metadata_manager
1927 .rename_table(&modified_table_info_value, new_table_name.clone())
1928 .await
1929 .is_err()
1930 );
1931
1932 let old_table_name = TableNameKey::new(
1933 &table_info.catalog_name,
1934 &table_info.schema_name,
1935 &table_info.name,
1936 );
1937 let new_table_name = TableNameKey::new(
1938 &table_info.catalog_name,
1939 &table_info.schema_name,
1940 &new_table_name,
1941 );
1942
1943 assert!(
1944 table_metadata_manager
1945 .table_name_manager()
1946 .get(old_table_name)
1947 .await
1948 .unwrap()
1949 .is_none()
1950 );
1951
1952 assert_eq!(
1953 table_metadata_manager
1954 .table_name_manager()
1955 .get(new_table_name)
1956 .await
1957 .unwrap()
1958 .unwrap()
1959 .table_id(),
1960 table_id
1961 );
1962 }
1963
1964 #[tokio::test]
1965 async fn test_update_table_info() {
1966 let mem_kv = Arc::new(MemoryKvBackend::default());
1967 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1968 let region_route = new_test_region_route();
1969 let region_routes = vec![region_route.clone()];
1970 let table_info: RawTableInfo =
1971 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1972 let table_id = table_info.ident.table_id;
1973 create_physical_table_metadata(
1975 &table_metadata_manager,
1976 table_info.clone(),
1977 region_routes.clone(),
1978 HashMap::new(),
1979 )
1980 .await
1981 .unwrap();
1982
1983 let mut new_table_info = table_info.clone();
1984 new_table_info.name = "hi".to_string();
1985 let current_table_info_value =
1986 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1987 table_metadata_manager
1989 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1990 .await
1991 .unwrap();
1992 table_metadata_manager
1994 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1995 .await
1996 .unwrap();
1997
1998 let updated_table_info = table_metadata_manager
2000 .table_info_manager()
2001 .get(table_id)
2002 .await
2003 .unwrap()
2004 .unwrap()
2005 .into_inner();
2006 assert_eq!(updated_table_info.table_info, new_table_info);
2007
2008 let mut wrong_table_info = table_info.clone();
2009 wrong_table_info.name = "wrong".to_string();
2010 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
2011 current_table_info_value.update(wrong_table_info),
2012 );
2013 assert!(
2016 table_metadata_manager
2017 .update_table_info(&wrong_table_info_value, None, new_table_info)
2018 .await
2019 .is_err()
2020 )
2021 }
2022
2023 #[tokio::test]
2024 async fn test_update_table_leader_region_status() {
2025 let mem_kv = Arc::new(MemoryKvBackend::default());
2026 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2027 let datanode = 1;
2028 let region_routes = vec![
2029 RegionRoute {
2030 region: Region {
2031 id: 1.into(),
2032 name: "r1".to_string(),
2033 partition: None,
2034 attrs: BTreeMap::new(),
2035 partition_expr: Default::default(),
2036 },
2037 leader_peer: Some(Peer::new(datanode, "a2")),
2038 leader_state: Some(LeaderState::Downgrading),
2039 follower_peers: vec![],
2040 leader_down_since: Some(current_time_millis()),
2041 },
2042 RegionRoute {
2043 region: Region {
2044 id: 2.into(),
2045 name: "r2".to_string(),
2046 partition: None,
2047 attrs: BTreeMap::new(),
2048 partition_expr: Default::default(),
2049 },
2050 leader_peer: Some(Peer::new(datanode, "a1")),
2051 leader_state: None,
2052 follower_peers: vec![],
2053 leader_down_since: None,
2054 },
2055 ];
2056 let table_info: RawTableInfo =
2057 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2058 let table_id = table_info.ident.table_id;
2059 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2060 TableRouteValue::physical(region_routes.clone()),
2061 );
2062
2063 create_physical_table_metadata(
2065 &table_metadata_manager,
2066 table_info.clone(),
2067 region_routes.clone(),
2068 HashMap::new(),
2069 )
2070 .await
2071 .unwrap();
2072
2073 table_metadata_manager
2074 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2075 if region_route.leader_state.is_some() {
2076 None
2077 } else {
2078 Some(Some(LeaderState::Downgrading))
2079 }
2080 })
2081 .await
2082 .unwrap();
2083
2084 let updated_route_value = table_metadata_manager
2085 .table_route_manager()
2086 .table_route_storage()
2087 .get(table_id)
2088 .await
2089 .unwrap()
2090 .unwrap();
2091
2092 assert_eq!(
2093 updated_route_value.region_routes().unwrap()[0].leader_state,
2094 Some(LeaderState::Downgrading)
2095 );
2096
2097 assert!(
2098 updated_route_value.region_routes().unwrap()[0]
2099 .leader_down_since
2100 .is_some()
2101 );
2102
2103 assert_eq!(
2104 updated_route_value.region_routes().unwrap()[1].leader_state,
2105 Some(LeaderState::Downgrading)
2106 );
2107 assert!(
2108 updated_route_value.region_routes().unwrap()[1]
2109 .leader_down_since
2110 .is_some()
2111 );
2112 }
2113
2114 async fn assert_datanode_table(
2115 table_metadata_manager: &TableMetadataManager,
2116 table_id: u32,
2117 region_routes: &[RegionRoute],
2118 ) {
2119 let region_distribution = region_distribution(region_routes);
2120 for (datanode, regions) in region_distribution {
2121 let got = table_metadata_manager
2122 .datanode_table_manager()
2123 .get(&DatanodeTableKey::new(datanode, table_id))
2124 .await
2125 .unwrap()
2126 .unwrap();
2127
2128 assert_eq!(got.regions, regions.leader_regions);
2129 assert_eq!(got.follower_regions, regions.follower_regions);
2130 }
2131 }
2132
2133 #[tokio::test]
2134 async fn test_update_table_route() {
2135 let mem_kv = Arc::new(MemoryKvBackend::default());
2136 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2137 let region_route = new_test_region_route();
2138 let region_routes = vec![region_route.clone()];
2139 let table_info: RawTableInfo =
2140 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2141 let table_id = table_info.ident.table_id;
2142 let engine = table_info.meta.engine.as_str();
2143 let region_storage_path =
2144 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2145 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2146 TableRouteValue::physical(region_routes.clone()),
2147 );
2148
2149 create_physical_table_metadata(
2151 &table_metadata_manager,
2152 table_info.clone(),
2153 region_routes.clone(),
2154 HashMap::new(),
2155 )
2156 .await
2157 .unwrap();
2158
2159 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2160 let new_region_routes = vec![
2161 new_region_route(1, 1),
2162 new_region_route(2, 2),
2163 new_region_route(3, 3),
2164 ];
2165 table_metadata_manager
2167 .update_table_route(
2168 table_id,
2169 RegionInfo {
2170 engine: engine.to_string(),
2171 region_storage_path: region_storage_path.clone(),
2172 region_options: HashMap::new(),
2173 region_wal_options: HashMap::new(),
2174 },
2175 ¤t_table_route_value,
2176 new_region_routes.clone(),
2177 &HashMap::new(),
2178 &HashMap::new(),
2179 )
2180 .await
2181 .unwrap();
2182 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2183
2184 table_metadata_manager
2186 .update_table_route(
2187 table_id,
2188 RegionInfo {
2189 engine: engine.to_string(),
2190 region_storage_path: region_storage_path.clone(),
2191 region_options: HashMap::new(),
2192 region_wal_options: HashMap::new(),
2193 },
2194 ¤t_table_route_value,
2195 new_region_routes.clone(),
2196 &HashMap::new(),
2197 &HashMap::new(),
2198 )
2199 .await
2200 .unwrap();
2201
2202 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2203 current_table_route_value
2204 .inner
2205 .update(new_region_routes.clone())
2206 .unwrap(),
2207 );
2208 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2209 table_metadata_manager
2211 .update_table_route(
2212 table_id,
2213 RegionInfo {
2214 engine: engine.to_string(),
2215 region_storage_path: region_storage_path.clone(),
2216 region_options: HashMap::new(),
2217 region_wal_options: HashMap::new(),
2218 },
2219 ¤t_table_route_value,
2220 new_region_routes.clone(),
2221 &HashMap::new(),
2222 &HashMap::new(),
2223 )
2224 .await
2225 .unwrap();
2226 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2227
2228 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2231 current_table_route_value
2232 .update(vec![
2233 new_region_route(1, 1),
2234 new_region_route(2, 2),
2235 new_region_route(3, 3),
2236 new_region_route(4, 4),
2237 ])
2238 .unwrap(),
2239 );
2240 assert!(
2241 table_metadata_manager
2242 .update_table_route(
2243 table_id,
2244 RegionInfo {
2245 engine: engine.to_string(),
2246 region_storage_path: region_storage_path.clone(),
2247 region_options: HashMap::new(),
2248 region_wal_options: HashMap::new(),
2249 },
2250 &wrong_table_route_value,
2251 new_region_routes,
2252 &HashMap::new(),
2253 &HashMap::new(),
2254 )
2255 .await
2256 .is_err()
2257 );
2258 }
2259
2260 #[tokio::test]
2261 async fn test_destroy_table_metadata() {
2262 let mem_kv = Arc::new(MemoryKvBackend::default());
2263 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2264 let table_id = 1025;
2265 let table_name = "foo";
2266 let task = test_create_table_task(table_name, table_id);
2267 let options = create_mock_region_wal_options();
2268 let serialized_options = options
2269 .iter()
2270 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2271 .collect::<HashMap<_, _>>();
2272 table_metadata_manager
2273 .create_table_metadata(
2274 task.table_info,
2275 TableRouteValue::physical(vec![
2276 RegionRoute {
2277 region: Region::new_test(RegionId::new(table_id, 1)),
2278 leader_peer: Some(Peer::empty(1)),
2279 follower_peers: vec![Peer::empty(5)],
2280 leader_state: None,
2281 leader_down_since: None,
2282 },
2283 RegionRoute {
2284 region: Region::new_test(RegionId::new(table_id, 2)),
2285 leader_peer: Some(Peer::empty(2)),
2286 follower_peers: vec![Peer::empty(4)],
2287 leader_state: None,
2288 leader_down_since: None,
2289 },
2290 RegionRoute {
2291 region: Region::new_test(RegionId::new(table_id, 3)),
2292 leader_peer: Some(Peer::empty(3)),
2293 follower_peers: vec![],
2294 leader_state: None,
2295 leader_down_since: None,
2296 },
2297 ]),
2298 serialized_options,
2299 )
2300 .await
2301 .unwrap();
2302 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2303 let table_route_value = table_metadata_manager
2304 .table_route_manager
2305 .table_route_storage()
2306 .get_with_raw_bytes(table_id)
2307 .await
2308 .unwrap()
2309 .unwrap();
2310 table_metadata_manager
2311 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2312 .await
2313 .unwrap();
2314 assert!(mem_kv.is_empty());
2315 }
2316
2317 #[tokio::test]
2318 async fn test_restore_table_metadata() {
2319 let mem_kv = Arc::new(MemoryKvBackend::default());
2320 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2321 let table_id = 1025;
2322 let table_name = "foo";
2323 let task = test_create_table_task(table_name, table_id);
2324 let options = create_mock_region_wal_options();
2325 let serialized_options = options
2326 .iter()
2327 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2328 .collect::<HashMap<_, _>>();
2329 table_metadata_manager
2330 .create_table_metadata(
2331 task.table_info,
2332 TableRouteValue::physical(vec![
2333 RegionRoute {
2334 region: Region::new_test(RegionId::new(table_id, 1)),
2335 leader_peer: Some(Peer::empty(1)),
2336 follower_peers: vec![Peer::empty(5)],
2337 leader_state: None,
2338 leader_down_since: None,
2339 },
2340 RegionRoute {
2341 region: Region::new_test(RegionId::new(table_id, 2)),
2342 leader_peer: Some(Peer::empty(2)),
2343 follower_peers: vec![Peer::empty(4)],
2344 leader_state: None,
2345 leader_down_since: None,
2346 },
2347 RegionRoute {
2348 region: Region::new_test(RegionId::new(table_id, 3)),
2349 leader_peer: Some(Peer::empty(3)),
2350 follower_peers: vec![],
2351 leader_state: None,
2352 leader_down_since: None,
2353 },
2354 ]),
2355 serialized_options,
2356 )
2357 .await
2358 .unwrap();
2359 let expected_result = mem_kv.dump();
2360 let table_route_value = table_metadata_manager
2361 .table_route_manager
2362 .table_route_storage()
2363 .get_with_raw_bytes(table_id)
2364 .await
2365 .unwrap()
2366 .unwrap();
2367 let region_routes = table_route_value.region_routes().unwrap();
2368 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2369 let table_route_value = TableRouteValue::physical(region_routes.clone());
2370 table_metadata_manager
2371 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2372 .await
2373 .unwrap();
2374 table_metadata_manager
2375 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2376 .await
2377 .unwrap();
2378 let kvs = mem_kv.dump();
2379 assert_eq!(kvs, expected_result);
2380 table_metadata_manager
2382 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2383 .await
2384 .unwrap();
2385 let kvs = mem_kv.dump();
2386 assert_eq!(kvs, expected_result);
2387 }
2388
2389 #[tokio::test]
2390 async fn test_create_update_view_info() {
2391 let mem_kv = Arc::new(MemoryKvBackend::default());
2392 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2393
2394 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2395
2396 let view_id = view_info.ident.table_id;
2397
2398 let logical_plan: Vec<u8> = vec![1, 2, 3];
2399 let columns = vec!["a".to_string()];
2400 let plan_columns = vec!["number".to_string()];
2401 let table_names = new_test_table_names();
2402 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2403
2404 table_metadata_manager
2406 .create_view_metadata(
2407 view_info.clone(),
2408 logical_plan.clone(),
2409 table_names.clone(),
2410 columns.clone(),
2411 plan_columns.clone(),
2412 definition.to_string(),
2413 )
2414 .await
2415 .unwrap();
2416
2417 {
2418 let current_view_info = table_metadata_manager
2420 .view_info_manager()
2421 .get(view_id)
2422 .await
2423 .unwrap()
2424 .unwrap()
2425 .into_inner();
2426 assert_eq!(current_view_info.view_info, logical_plan);
2427 assert_eq!(current_view_info.table_names, table_names);
2428 assert_eq!(current_view_info.definition, definition);
2429 assert_eq!(current_view_info.columns, columns);
2430 assert_eq!(current_view_info.plan_columns, plan_columns);
2431 let current_table_info = table_metadata_manager
2433 .table_info_manager()
2434 .get(view_id)
2435 .await
2436 .unwrap()
2437 .unwrap()
2438 .into_inner();
2439 assert_eq!(current_table_info.table_info, view_info);
2440 }
2441
2442 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2443 let new_table_names = {
2444 let mut set = HashSet::new();
2445 set.insert(TableName {
2446 catalog_name: "greptime".to_string(),
2447 schema_name: "public".to_string(),
2448 table_name: "b_table".to_string(),
2449 });
2450 set.insert(TableName {
2451 catalog_name: "greptime".to_string(),
2452 schema_name: "public".to_string(),
2453 table_name: "c_table".to_string(),
2454 });
2455 set
2456 };
2457 let new_columns = vec!["b".to_string()];
2458 let new_plan_columns = vec!["number2".to_string()];
2459 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2460
2461 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2462 logical_plan.clone(),
2463 table_names,
2464 columns,
2465 plan_columns,
2466 definition.to_string(),
2467 ));
2468 table_metadata_manager
2470 .update_view_info(
2471 view_id,
2472 ¤t_view_info_value,
2473 new_logical_plan.clone(),
2474 new_table_names.clone(),
2475 new_columns.clone(),
2476 new_plan_columns.clone(),
2477 new_definition.to_string(),
2478 )
2479 .await
2480 .unwrap();
2481 table_metadata_manager
2483 .update_view_info(
2484 view_id,
2485 ¤t_view_info_value,
2486 new_logical_plan.clone(),
2487 new_table_names.clone(),
2488 new_columns.clone(),
2489 new_plan_columns.clone(),
2490 new_definition.to_string(),
2491 )
2492 .await
2493 .unwrap();
2494
2495 let updated_view_info = table_metadata_manager
2497 .view_info_manager()
2498 .get(view_id)
2499 .await
2500 .unwrap()
2501 .unwrap()
2502 .into_inner();
2503 assert_eq!(updated_view_info.view_info, new_logical_plan);
2504 assert_eq!(updated_view_info.table_names, new_table_names);
2505 assert_eq!(updated_view_info.definition, new_definition);
2506 assert_eq!(updated_view_info.columns, new_columns);
2507 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2508
2509 let wrong_view_info = logical_plan.clone();
2510 let wrong_definition = "wrong_definition";
2511 let wrong_view_info_value =
2512 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2513 wrong_view_info,
2514 new_table_names.clone(),
2515 new_columns.clone(),
2516 new_plan_columns.clone(),
2517 wrong_definition.to_string(),
2518 ));
2519 assert!(
2522 table_metadata_manager
2523 .update_view_info(
2524 view_id,
2525 &wrong_view_info_value,
2526 new_logical_plan.clone(),
2527 new_table_names.clone(),
2528 vec!["c".to_string()],
2529 vec!["number3".to_string()],
2530 wrong_definition.to_string(),
2531 )
2532 .await
2533 .is_err()
2534 );
2535
2536 let current_view_info = table_metadata_manager
2538 .view_info_manager()
2539 .get(view_id)
2540 .await
2541 .unwrap()
2542 .unwrap()
2543 .into_inner();
2544 assert_eq!(current_view_info.view_info, new_logical_plan);
2545 assert_eq!(current_view_info.table_names, new_table_names);
2546 assert_eq!(current_view_info.definition, new_definition);
2547 assert_eq!(current_view_info.columns, new_columns);
2548 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2549 }
2550
2551 #[test]
2552 fn test_region_role_set_deserialize() {
2553 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2554 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2555 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2556 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2557
2558 let s = r#"[1, 2, 3]"#;
2559 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2560 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2561 assert!(region_role_set.follower_regions.is_empty());
2562 }
2563
2564 #[test]
2565 fn test_region_distribution_deserialize() {
2566 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2567 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2568 assert_eq!(region_distribution.len(), 2);
2569 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2570 assert!(region_distribution[&1].follower_regions.is_empty());
2571 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2572 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2573 }
2574}