1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
168pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
169pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
170
171pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
172pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
173pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
174pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
175pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
176pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
177pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
178pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
179pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
180pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
182pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
183
184pub const CACHE_KEY_PREFIXES: [&str; 5] = [
186 TABLE_NAME_KEY_PREFIX,
187 CATALOG_NAME_KEY_PREFIX,
188 SCHEMA_NAME_KEY_PREFIX,
189 TABLE_ROUTE_PREFIX,
190 NODE_ADDRESS_PREFIX,
191];
192
193#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
195pub struct RegionRoleSet {
196 pub leader_regions: Vec<RegionNumber>,
198 pub follower_regions: Vec<RegionNumber>,
200}
201
202impl<'de> Deserialize<'de> for RegionRoleSet {
203 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
204 where
205 D: serde::Deserializer<'de>,
206 {
207 #[derive(Deserialize)]
208 #[serde(untagged)]
209 enum RegionRoleSetOrLeaderOnly {
210 Full {
211 leader_regions: Vec<RegionNumber>,
212 follower_regions: Vec<RegionNumber>,
213 },
214 LeaderOnly(Vec<RegionNumber>),
215 }
216 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
217 RegionRoleSetOrLeaderOnly::Full {
218 leader_regions,
219 follower_regions,
220 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
221 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
222 Ok(RegionRoleSet::new(leader_regions, vec![]))
223 }
224 }
225 }
226}
227
228impl RegionRoleSet {
229 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
231 Self {
232 leader_regions,
233 follower_regions,
234 }
235 }
236
237 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
239 self.leader_regions.push(region_number);
240 }
241
242 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
244 self.follower_regions.push(region_number);
245 }
246
247 pub fn sort(&mut self) {
249 self.follower_regions.sort();
250 self.leader_regions.sort();
251 }
252}
253
254pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
258
259pub type FlowId = u32;
261pub type FlowPartitionId = u32;
263
264lazy_static! {
265 pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
266}
267
268lazy_static! {
269 static ref TABLE_INFO_KEY_PATTERN: Regex =
270 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
271}
272
273lazy_static! {
274 static ref VIEW_INFO_KEY_PATTERN: Regex =
275 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
276}
277
278lazy_static! {
279 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
280 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
281}
282
283lazy_static! {
284 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
285 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
286}
287
288lazy_static! {
289 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
290 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
291 ))
292 .unwrap();
293}
294
295lazy_static! {
296 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
298 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
299 ))
300 .unwrap();
301}
302
303lazy_static! {
304 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
306 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
307 ))
308 .unwrap();
309}
310
311lazy_static! {
312 static ref NODE_ADDRESS_PATTERN: Regex =
313 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
314}
315
316lazy_static! {
317 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
318 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
319}
320
321lazy_static! {
322 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
323 "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
324 ))
325 .unwrap();
326}
327
328pub trait MetadataKey<'a, T> {
330 fn to_bytes(&self) -> Vec<u8>;
331
332 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub struct BytesAdapter(Vec<u8>);
337
338impl From<Vec<u8>> for BytesAdapter {
339 fn from(value: Vec<u8>) -> Self {
340 Self(value)
341 }
342}
343
344impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
345 fn to_bytes(&self) -> Vec<u8> {
346 self.0.clone()
347 }
348
349 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
350 Ok(BytesAdapter(bytes.to_vec()))
351 }
352}
353
354pub(crate) trait MetadataKeyGetTxnOp {
355 fn build_get_op(
356 &self,
357 ) -> (
358 TxnOp,
359 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
360 );
361}
362
363pub trait MetadataValue {
364 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
365 where
366 Self: Sized;
367
368 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
369}
370
371pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
372
373pub struct TableMetadataManager {
374 table_name_manager: TableNameManager,
375 table_info_manager: TableInfoManager,
376 view_info_manager: ViewInfoManager,
377 datanode_table_manager: DatanodeTableManager,
378 catalog_manager: CatalogManager,
379 schema_manager: SchemaManager,
380 table_route_manager: TableRouteManager,
381 tombstone_manager: TombstoneManager,
382 topic_name_manager: TopicNameManager,
383 topic_region_manager: TopicRegionManager,
384 kv_backend: KvBackendRef,
385}
386
387#[macro_export]
388macro_rules! ensure_values {
389 ($got:expr, $expected_value:expr, $name:expr) => {
390 ensure!(
391 $got == $expected_value,
392 error::UnexpectedSnafu {
393 err_msg: format!(
394 "Reads the different value: {:?} during {}, expected: {:?}",
395 $got, $name, $expected_value
396 )
397 }
398 );
399 };
400}
401
402pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
412 bytes: Bytes,
414 inner: T,
416}
417
418impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
419 type Target = T;
420
421 fn deref(&self) -> &Self::Target {
422 &self.inner
423 }
424}
425
426impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
427 fn deref_mut(&mut self) -> &mut Self::Target {
428 &mut self.inner
429 }
430}
431
432impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 write!(
435 f,
436 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
437 self.inner, self.bytes
438 )
439 }
440}
441
442impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
443 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
447 where
448 S: serde::Serializer,
449 {
450 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
453 }
454}
455
456impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
457 for DeserializedValueWithBytes<T>
458{
459 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
463 where
464 D: serde::Deserializer<'de>,
465 {
466 let buf = String::deserialize(deserializer)?;
467 let bytes = Bytes::from(buf);
468
469 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
470 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
471
472 Ok(value)
473 }
474}
475
476impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
477 fn clone(&self) -> Self {
478 Self {
479 bytes: self.bytes.clone(),
480 inner: self.inner.clone(),
481 }
482 }
483}
484
485impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
486 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
489 let inner = T::try_from_raw_value(&bytes)?;
490 Ok(Self { bytes, inner })
491 }
492
493 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
496 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
497 }
498
499 pub fn into_inner(self) -> T {
500 self.inner
501 }
502
503 pub fn get_inner_ref(&self) -> &T {
504 &self.inner
505 }
506
507 pub fn get_raw_bytes(&self) -> Vec<u8> {
509 self.bytes.to_vec()
510 }
511
512 #[cfg(any(test, feature = "testing"))]
513 pub fn from_inner(inner: T) -> Self {
514 let bytes = serde_json::to_vec(&inner).unwrap();
515
516 Self {
517 bytes: Bytes::from(bytes),
518 inner,
519 }
520 }
521}
522
523impl TableMetadataManager {
524 pub fn new(kv_backend: KvBackendRef) -> Self {
525 TableMetadataManager {
526 table_name_manager: TableNameManager::new(kv_backend.clone()),
527 table_info_manager: TableInfoManager::new(kv_backend.clone()),
528 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
529 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
530 catalog_manager: CatalogManager::new(kv_backend.clone()),
531 schema_manager: SchemaManager::new(kv_backend.clone()),
532 table_route_manager: TableRouteManager::new(kv_backend.clone()),
533 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
534 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
535 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
536 kv_backend,
537 }
538 }
539
540 pub fn new_with_custom_tombstone_prefix(
542 kv_backend: KvBackendRef,
543 tombstone_prefix: &str,
544 ) -> Self {
545 Self {
546 table_name_manager: TableNameManager::new(kv_backend.clone()),
547 table_info_manager: TableInfoManager::new(kv_backend.clone()),
548 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
549 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
550 catalog_manager: CatalogManager::new(kv_backend.clone()),
551 schema_manager: SchemaManager::new(kv_backend.clone()),
552 table_route_manager: TableRouteManager::new(kv_backend.clone()),
553 tombstone_manager: TombstoneManager::new_with_prefix(
554 kv_backend.clone(),
555 tombstone_prefix,
556 ),
557 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
558 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
559 kv_backend,
560 }
561 }
562
563 pub async fn init(&self) -> Result<()> {
564 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
565
566 self.catalog_manager().create(catalog_name, true).await?;
567
568 let internal_schemas = [
569 DEFAULT_SCHEMA_NAME,
570 INFORMATION_SCHEMA_NAME,
571 DEFAULT_PRIVATE_SCHEMA_NAME,
572 ];
573
574 for schema_name in internal_schemas {
575 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
576
577 self.schema_manager().create(schema_key, None, true).await?;
578 }
579
580 Ok(())
581 }
582
583 pub fn table_name_manager(&self) -> &TableNameManager {
584 &self.table_name_manager
585 }
586
587 pub fn table_info_manager(&self) -> &TableInfoManager {
588 &self.table_info_manager
589 }
590
591 pub fn view_info_manager(&self) -> &ViewInfoManager {
592 &self.view_info_manager
593 }
594
595 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
596 &self.datanode_table_manager
597 }
598
599 pub fn catalog_manager(&self) -> &CatalogManager {
600 &self.catalog_manager
601 }
602
603 pub fn schema_manager(&self) -> &SchemaManager {
604 &self.schema_manager
605 }
606
607 pub fn table_route_manager(&self) -> &TableRouteManager {
608 &self.table_route_manager
609 }
610
611 pub fn topic_name_manager(&self) -> &TopicNameManager {
612 &self.topic_name_manager
613 }
614
615 pub fn topic_region_manager(&self) -> &TopicRegionManager {
616 &self.topic_region_manager
617 }
618
619 #[cfg(feature = "testing")]
620 pub fn kv_backend(&self) -> &KvBackendRef {
621 &self.kv_backend
622 }
623
624 pub async fn get_full_table_info(
625 &self,
626 table_id: TableId,
627 ) -> Result<(
628 Option<DeserializedValueWithBytes<TableInfoValue>>,
629 Option<DeserializedValueWithBytes<TableRouteValue>>,
630 )> {
631 let table_info_key = TableInfoKey::new(table_id);
632 let table_route_key = TableRouteKey::new(table_id);
633 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
634 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
635
636 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
637 let mut res = self.kv_backend.txn(txn).await?;
638 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
639 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
640 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
641 Ok((table_info_value, table_route_value))
642 }
643
644 pub async fn create_view_metadata(
654 &self,
655 view_info: RawTableInfo,
656 raw_logical_plan: Vec<u8>,
657 table_names: HashSet<TableName>,
658 columns: Vec<String>,
659 plan_columns: Vec<String>,
660 definition: String,
661 ) -> Result<()> {
662 let view_id = view_info.ident.table_id;
663
664 let view_name = TableNameKey::new(
666 &view_info.catalog_name,
667 &view_info.schema_name,
668 &view_info.name,
669 );
670 let create_table_name_txn = self
671 .table_name_manager()
672 .build_create_txn(&view_name, view_id)?;
673
674 let table_info_value = TableInfoValue::new(view_info);
676
677 let (create_table_info_txn, on_create_table_info_failure) = self
678 .table_info_manager()
679 .build_create_txn(view_id, &table_info_value)?;
680
681 let view_info_value = ViewInfoValue::new(
683 raw_logical_plan,
684 table_names,
685 columns,
686 plan_columns,
687 definition,
688 );
689 let (create_view_info_txn, on_create_view_info_failure) = self
690 .view_info_manager()
691 .build_create_txn(view_id, &view_info_value)?;
692
693 let txn = Txn::merge_all(vec![
694 create_table_name_txn,
695 create_table_info_txn,
696 create_view_info_txn,
697 ]);
698
699 let mut r = self.kv_backend.txn(txn).await?;
700
701 if !r.succeeded {
703 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
704 let remote_table_info = on_create_table_info_failure(&mut set)?
705 .context(error::UnexpectedSnafu {
706 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
707 })?
708 .into_inner();
709
710 let remote_view_info = on_create_view_info_failure(&mut set)?
711 .context(error::UnexpectedSnafu {
712 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
713 })?
714 .into_inner();
715
716 let op_name = "the creating view metadata";
717 ensure_values!(remote_table_info, table_info_value, op_name);
718 ensure_values!(remote_view_info, view_info_value, op_name);
719 }
720
721 Ok(())
722 }
723
724 pub async fn create_table_metadata(
727 &self,
728 mut table_info: RawTableInfo,
729 table_route_value: TableRouteValue,
730 region_wal_options: HashMap<RegionNumber, String>,
731 ) -> Result<()> {
732 let region_numbers = table_route_value.region_numbers();
733 table_info.meta.region_numbers = region_numbers;
734 let table_id = table_info.ident.table_id;
735 let engine = table_info.meta.engine.clone();
736
737 let table_name = TableNameKey::new(
739 &table_info.catalog_name,
740 &table_info.schema_name,
741 &table_info.name,
742 );
743 let create_table_name_txn = self
744 .table_name_manager()
745 .build_create_txn(&table_name, table_id)?;
746
747 let region_options = table_info.to_region_options();
748 let table_info_value = TableInfoValue::new(table_info);
750 let (create_table_info_txn, on_create_table_info_failure) = self
751 .table_info_manager()
752 .build_create_txn(table_id, &table_info_value)?;
753
754 let (create_table_route_txn, on_create_table_route_failure) = self
755 .table_route_manager()
756 .table_route_storage()
757 .build_create_txn(table_id, &table_route_value)?;
758
759 let create_topic_region_txn = self
760 .topic_region_manager
761 .build_create_txn(table_id, ®ion_wal_options)?;
762
763 let mut txn = Txn::merge_all(vec![
764 create_table_name_txn,
765 create_table_info_txn,
766 create_table_route_txn,
767 create_topic_region_txn,
768 ]);
769
770 if let TableRouteValue::Physical(x) = &table_route_value {
771 let region_storage_path = table_info_value.region_storage_path();
772 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
773 table_id,
774 &engine,
775 ®ion_storage_path,
776 region_options,
777 region_wal_options,
778 region_distribution(&x.region_routes),
779 )?;
780 txn = txn.merge(create_datanode_table_txn);
781 }
782
783 let mut r = self.kv_backend.txn(txn).await?;
784
785 if !r.succeeded {
787 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
788 let remote_table_info = on_create_table_info_failure(&mut set)?
789 .context(error::UnexpectedSnafu {
790 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
791 })?
792 .into_inner();
793
794 let remote_table_route = on_create_table_route_failure(&mut set)?
795 .context(error::UnexpectedSnafu {
796 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
797 })?
798 .into_inner();
799
800 let op_name = "the creating table metadata";
801 ensure_values!(remote_table_info, table_info_value, op_name);
802 ensure_values!(remote_table_route, table_route_value, op_name);
803 }
804
805 Ok(())
806 }
807
808 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
809 self.kv_backend.max_txn_ops() / 3
812 }
813
814 pub async fn create_logical_tables_metadata(
816 &self,
817 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
818 ) -> Result<()> {
819 let len = tables_data.len();
820 let mut txns = Vec::with_capacity(3 * len);
821 struct OnFailure<F1, R1, F2, R2>
822 where
823 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
824 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
825 {
826 table_info_value: TableInfoValue,
827 on_create_table_info_failure: F1,
828 table_route_value: TableRouteValue,
829 on_create_table_route_failure: F2,
830 }
831 let mut on_failures = Vec::with_capacity(len);
832 for (mut table_info, table_route_value) in tables_data {
833 table_info.meta.region_numbers = table_route_value.region_numbers();
834 let table_id = table_info.ident.table_id;
835
836 let table_name = TableNameKey::new(
838 &table_info.catalog_name,
839 &table_info.schema_name,
840 &table_info.name,
841 );
842 let create_table_name_txn = self
843 .table_name_manager()
844 .build_create_txn(&table_name, table_id)?;
845 txns.push(create_table_name_txn);
846
847 let table_info_value = TableInfoValue::new(table_info);
849 let (create_table_info_txn, on_create_table_info_failure) =
850 self.table_info_manager()
851 .build_create_txn(table_id, &table_info_value)?;
852 txns.push(create_table_info_txn);
853
854 let (create_table_route_txn, on_create_table_route_failure) = self
855 .table_route_manager()
856 .table_route_storage()
857 .build_create_txn(table_id, &table_route_value)?;
858 txns.push(create_table_route_txn);
859
860 on_failures.push(OnFailure {
861 table_info_value,
862 on_create_table_info_failure,
863 table_route_value,
864 on_create_table_route_failure,
865 });
866 }
867
868 let txn = Txn::merge_all(txns);
869 let mut r = self.kv_backend.txn(txn).await?;
870
871 if !r.succeeded {
873 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
874 for on_failure in on_failures {
875 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
876 .context(error::UnexpectedSnafu {
877 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
878 })?
879 .into_inner();
880
881 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
882 .context(error::UnexpectedSnafu {
883 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
884 })?
885 .into_inner();
886
887 let op_name = "the creating logical tables metadata";
888 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
889 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
890 }
891 }
892
893 Ok(())
894 }
895
896 fn table_metadata_keys(
897 &self,
898 table_id: TableId,
899 table_name: &TableName,
900 table_route_value: &TableRouteValue,
901 region_wal_options: &HashMap<RegionNumber, WalOptions>,
902 ) -> Result<Vec<Vec<u8>>> {
903 let datanode_ids = if table_route_value.is_physical() {
905 region_distribution(table_route_value.region_routes()?)
906 .into_keys()
907 .collect()
908 } else {
909 vec![]
910 };
911 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
912 let table_name = TableNameKey::new(
913 &table_name.catalog_name,
914 &table_name.schema_name,
915 &table_name.table_name,
916 );
917 let table_info_key = TableInfoKey::new(table_id);
918 let table_route_key = TableRouteKey::new(table_id);
919 let datanode_table_keys = datanode_ids
920 .into_iter()
921 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
922 .collect::<HashSet<_>>();
923 let topic_region_map = self
924 .topic_region_manager
925 .get_topic_region_mapping(table_id, region_wal_options);
926 let topic_region_keys = topic_region_map
927 .iter()
928 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
929 .collect::<Vec<_>>();
930 keys.push(table_name.to_bytes());
931 keys.push(table_info_key.to_bytes());
932 keys.push(table_route_key.to_bytes());
933 for key in &datanode_table_keys {
934 keys.push(key.to_bytes());
935 }
936 for key in topic_region_keys {
937 keys.push(key.to_bytes());
938 }
939 Ok(keys)
940 }
941
942 pub async fn delete_table_metadata(
945 &self,
946 table_id: TableId,
947 table_name: &TableName,
948 table_route_value: &TableRouteValue,
949 region_wal_options: &HashMap<RegionNumber, WalOptions>,
950 ) -> Result<()> {
951 let keys =
952 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
953 self.tombstone_manager.create(keys).await.map(|_| ())
954 }
955
956 pub async fn delete_table_metadata_tombstone(
959 &self,
960 table_id: TableId,
961 table_name: &TableName,
962 table_route_value: &TableRouteValue,
963 region_wal_options: &HashMap<RegionNumber, WalOptions>,
964 ) -> Result<()> {
965 let table_metadata_keys =
966 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
967 self.tombstone_manager
968 .delete(table_metadata_keys)
969 .await
970 .map(|_| ())
971 }
972
973 pub async fn restore_table_metadata(
976 &self,
977 table_id: TableId,
978 table_name: &TableName,
979 table_route_value: &TableRouteValue,
980 region_wal_options: &HashMap<RegionNumber, WalOptions>,
981 ) -> Result<()> {
982 let keys =
983 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
984 self.tombstone_manager.restore(keys).await.map(|_| ())
985 }
986
987 pub async fn destroy_table_metadata(
990 &self,
991 table_id: TableId,
992 table_name: &TableName,
993 table_route_value: &TableRouteValue,
994 region_wal_options: &HashMap<RegionNumber, WalOptions>,
995 ) -> Result<()> {
996 let keys =
997 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
998 let _ = self
999 .kv_backend
1000 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1001 .await?;
1002 Ok(())
1003 }
1004
1005 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1006 let mut keys = Vec::with_capacity(3);
1007 let view_name = TableNameKey::new(
1008 &view_name.catalog_name,
1009 &view_name.schema_name,
1010 &view_name.table_name,
1011 );
1012 let table_info_key = TableInfoKey::new(view_id);
1013 let view_info_key = ViewInfoKey::new(view_id);
1014 keys.push(view_name.to_bytes());
1015 keys.push(table_info_key.to_bytes());
1016 keys.push(view_info_key.to_bytes());
1017
1018 Ok(keys)
1019 }
1020
1021 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1024 let keys = self.view_info_keys(view_id, view_name)?;
1025 let _ = self
1026 .kv_backend
1027 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1028 .await?;
1029 Ok(())
1030 }
1031
1032 pub async fn rename_table(
1036 &self,
1037 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1038 new_table_name: String,
1039 ) -> Result<()> {
1040 let current_table_info = ¤t_table_info_value.table_info;
1041 let table_id = current_table_info.ident.table_id;
1042
1043 let table_name_key = TableNameKey::new(
1044 ¤t_table_info.catalog_name,
1045 ¤t_table_info.schema_name,
1046 ¤t_table_info.name,
1047 );
1048
1049 let new_table_name_key = TableNameKey::new(
1050 ¤t_table_info.catalog_name,
1051 ¤t_table_info.schema_name,
1052 &new_table_name,
1053 );
1054
1055 let update_table_name_txn = self.table_name_manager().build_update_txn(
1057 &table_name_key,
1058 &new_table_name_key,
1059 table_id,
1060 )?;
1061
1062 let new_table_info_value = current_table_info_value
1063 .inner
1064 .with_update(move |table_info| {
1065 table_info.name = new_table_name;
1066 });
1067
1068 let (update_table_info_txn, on_update_table_info_failure) = self
1070 .table_info_manager()
1071 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1072
1073 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1074
1075 let mut r = self.kv_backend.txn(txn).await?;
1076
1077 if !r.succeeded {
1079 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1080 let remote_table_info = on_update_table_info_failure(&mut set)?
1081 .context(error::UnexpectedSnafu {
1082 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1083 })?
1084 .into_inner();
1085
1086 let op_name = "the renaming table metadata";
1087 ensure_values!(remote_table_info, new_table_info_value, op_name);
1088 }
1089
1090 Ok(())
1091 }
1092
1093 pub async fn update_table_info(
1097 &self,
1098 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1099 region_distribution: Option<RegionDistribution>,
1100 new_table_info: RawTableInfo,
1101 ) -> Result<()> {
1102 let table_id = current_table_info_value.table_info.ident.table_id;
1103 let new_table_info_value = current_table_info_value.update(new_table_info);
1104
1105 let (update_table_info_txn, on_update_table_info_failure) = self
1107 .table_info_manager()
1108 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1109
1110 let txn = if let Some(region_distribution) = region_distribution {
1111 let new_region_options = new_table_info_value.table_info.to_region_options();
1113 let update_datanode_table_options_txn = self
1114 .datanode_table_manager
1115 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1116 .await?;
1117 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1118 } else {
1119 update_table_info_txn
1120 };
1121
1122 let mut r = self.kv_backend.txn(txn).await?;
1123 if !r.succeeded {
1125 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1126 let remote_table_info = on_update_table_info_failure(&mut set)?
1127 .context(error::UnexpectedSnafu {
1128 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1129 })?
1130 .into_inner();
1131
1132 let op_name = "the updating table info";
1133 ensure_values!(remote_table_info, new_table_info_value, op_name);
1134 }
1135 Ok(())
1136 }
1137
1138 #[allow(clippy::too_many_arguments)]
1149 pub async fn update_view_info(
1150 &self,
1151 view_id: TableId,
1152 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1153 new_view_info: Vec<u8>,
1154 table_names: HashSet<TableName>,
1155 columns: Vec<String>,
1156 plan_columns: Vec<String>,
1157 definition: String,
1158 ) -> Result<()> {
1159 let new_view_info_value = current_view_info_value.update(
1160 new_view_info,
1161 table_names,
1162 columns,
1163 plan_columns,
1164 definition,
1165 );
1166
1167 let (update_view_info_txn, on_update_view_info_failure) = self
1169 .view_info_manager()
1170 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1171
1172 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1173
1174 if !r.succeeded {
1176 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1177 let remote_view_info = on_update_view_info_failure(&mut set)?
1178 .context(error::UnexpectedSnafu {
1179 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1180 })?
1181 .into_inner();
1182
1183 let op_name = "the updating view info";
1184 ensure_values!(remote_view_info, new_view_info_value, op_name);
1185 }
1186 Ok(())
1187 }
1188
1189 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1190 self.kv_backend.max_txn_ops()
1191 }
1192
1193 pub async fn batch_update_table_info_values(
1194 &self,
1195 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1196 ) -> Result<()> {
1197 let len = table_info_value_pairs.len();
1198 let mut txns = Vec::with_capacity(len);
1199 struct OnFailure<F, R>
1200 where
1201 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1202 {
1203 table_info_value: TableInfoValue,
1204 on_update_table_info_failure: F,
1205 }
1206 let mut on_failures = Vec::with_capacity(len);
1207
1208 for (table_info_value, new_table_info) in table_info_value_pairs {
1209 let table_id = table_info_value.table_info.ident.table_id;
1210
1211 let new_table_info_value = table_info_value.update(new_table_info);
1212
1213 let (update_table_info_txn, on_update_table_info_failure) =
1214 self.table_info_manager().build_update_txn(
1215 table_id,
1216 &table_info_value,
1217 &new_table_info_value,
1218 )?;
1219
1220 txns.push(update_table_info_txn);
1221
1222 on_failures.push(OnFailure {
1223 table_info_value: new_table_info_value,
1224 on_update_table_info_failure,
1225 });
1226 }
1227
1228 let txn = Txn::merge_all(txns);
1229 let mut r = self.kv_backend.txn(txn).await?;
1230
1231 if !r.succeeded {
1232 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1233 for on_failure in on_failures {
1234 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1235 .context(error::UnexpectedSnafu {
1236 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1237 })?
1238 .into_inner();
1239
1240 let op_name = "the batch updating table info";
1241 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1242 }
1243 }
1244
1245 Ok(())
1246 }
1247
1248 pub async fn update_table_route(
1249 &self,
1250 table_id: TableId,
1251 region_info: RegionInfo,
1252 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1253 new_region_routes: Vec<RegionRoute>,
1254 new_region_options: &HashMap<String, String>,
1255 new_region_wal_options: &HashMap<RegionNumber, String>,
1256 ) -> Result<()> {
1257 let current_region_distribution =
1259 region_distribution(current_table_route_value.region_routes()?);
1260 let new_region_distribution = region_distribution(&new_region_routes);
1261
1262 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1263 table_id,
1264 region_info,
1265 current_region_distribution,
1266 new_region_distribution,
1267 new_region_options,
1268 new_region_wal_options,
1269 )?;
1270
1271 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1273
1274 let (update_table_route_txn, on_update_table_route_failure) = self
1275 .table_route_manager()
1276 .table_route_storage()
1277 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1278
1279 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1280
1281 let mut r = self.kv_backend.txn(txn).await?;
1282
1283 if !r.succeeded {
1285 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1286 let remote_table_route = on_update_table_route_failure(&mut set)?
1287 .context(error::UnexpectedSnafu {
1288 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1289 })?
1290 .into_inner();
1291
1292 let op_name = "the updating table route";
1293 ensure_values!(remote_table_route, new_table_route_value, op_name);
1294 }
1295
1296 Ok(())
1297 }
1298
1299 pub async fn update_leader_region_status<F>(
1301 &self,
1302 table_id: TableId,
1303 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1304 next_region_route_status: F,
1305 ) -> Result<()>
1306 where
1307 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1308 {
1309 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1310
1311 let mut updated = 0;
1312 for route in &mut new_region_routes {
1313 if let Some(state) = next_region_route_status(route) {
1314 if route.set_leader_state(state) {
1315 updated += 1;
1316 }
1317 }
1318 }
1319
1320 if updated == 0 {
1321 warn!("No leader status updated");
1322 return Ok(());
1323 }
1324
1325 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1327
1328 let (update_table_route_txn, on_update_table_route_failure) = self
1329 .table_route_manager()
1330 .table_route_storage()
1331 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1332
1333 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1334
1335 if !r.succeeded {
1337 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1338 let remote_table_route = on_update_table_route_failure(&mut set)?
1339 .context(error::UnexpectedSnafu {
1340 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1341 })?
1342 .into_inner();
1343
1344 let op_name = "the updating leader region status";
1345 ensure_values!(remote_table_route, new_table_route_value, op_name);
1346 }
1347
1348 Ok(())
1349 }
1350}
1351
1352#[macro_export]
1353macro_rules! impl_metadata_value {
1354 ($($val_ty: ty), *) => {
1355 $(
1356 impl $crate::key::MetadataValue for $val_ty {
1357 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1358 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1359 }
1360
1361 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1362 serde_json::to_vec(self).context(SerdeJsonSnafu)
1363 }
1364 }
1365 )*
1366 }
1367}
1368
1369macro_rules! impl_metadata_key_get_txn_op {
1370 ($($key: ty), *) => {
1371 $(
1372 impl $crate::key::MetadataKeyGetTxnOp for $key {
1373 fn build_get_op(
1376 &self,
1377 ) -> (
1378 TxnOp,
1379 impl for<'a> FnMut(
1380 &'a mut TxnOpGetResponseSet,
1381 ) -> Option<Vec<u8>>,
1382 ) {
1383 let raw_key = self.to_bytes();
1384 (
1385 TxnOp::Get(raw_key.clone()),
1386 TxnOpGetResponseSet::filter(raw_key),
1387 )
1388 }
1389 }
1390 )*
1391 }
1392}
1393
1394impl_metadata_key_get_txn_op! {
1395 TableNameKey<'_>,
1396 TableInfoKey,
1397 ViewInfoKey,
1398 TableRouteKey,
1399 DatanodeTableKey
1400}
1401
1402#[macro_export]
1403macro_rules! impl_optional_metadata_value {
1404 ($($val_ty: ty), *) => {
1405 $(
1406 impl $val_ty {
1407 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1408 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1409 }
1410
1411 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1412 serde_json::to_vec(self).context(SerdeJsonSnafu)
1413 }
1414 }
1415 )*
1416 }
1417}
1418
1419impl_metadata_value! {
1420 TableNameValue,
1421 TableInfoValue,
1422 ViewInfoValue,
1423 DatanodeTableValue,
1424 FlowInfoValue,
1425 FlowNameValue,
1426 FlowRouteValue,
1427 TableFlowValue,
1428 NodeAddressValue,
1429 SchemaNameValue,
1430 FlowStateValue,
1431 PoisonValue
1432}
1433
1434impl_optional_metadata_value! {
1435 CatalogNameValue,
1436 SchemaNameValue
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use std::collections::{BTreeMap, HashMap, HashSet};
1442 use std::sync::Arc;
1443
1444 use bytes::Bytes;
1445 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1446 use common_time::util::current_time_millis;
1447 use common_wal::options::{KafkaWalOptions, WalOptions};
1448 use futures::TryStreamExt;
1449 use store_api::storage::{RegionId, RegionNumber};
1450 use table::metadata::{RawTableInfo, TableInfo};
1451 use table::table_name::TableName;
1452
1453 use super::datanode_table::DatanodeTableKey;
1454 use super::test_utils;
1455 use crate::ddl::test_util::create_table::test_create_table_task;
1456 use crate::ddl::utils::region_storage_path;
1457 use crate::error::Result;
1458 use crate::key::datanode_table::RegionInfo;
1459 use crate::key::table_info::TableInfoValue;
1460 use crate::key::table_name::TableNameKey;
1461 use crate::key::table_route::TableRouteValue;
1462 use crate::key::{
1463 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1464 ViewInfoValue, TOPIC_REGION_PREFIX,
1465 };
1466 use crate::kv_backend::memory::MemoryKvBackend;
1467 use crate::kv_backend::KvBackend;
1468 use crate::peer::Peer;
1469 use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1470 use crate::rpc::store::RangeRequest;
1471 use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1472
1473 #[test]
1474 fn test_deserialized_value_with_bytes() {
1475 let region_route = new_test_region_route();
1476 let region_routes = vec![region_route.clone()];
1477
1478 let expected_region_routes =
1479 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1480 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1481
1482 let value = DeserializedValueWithBytes {
1485 inner: TableRouteValue::physical(region_routes.clone()),
1487 bytes: Bytes::from(expected.clone()),
1488 };
1489
1490 let encoded = serde_json::to_vec(&value).unwrap();
1491
1492 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1495 serde_json::from_slice(&encoded).unwrap();
1496
1497 assert_eq!(decoded.inner, expected_region_routes);
1498 assert_eq!(decoded.bytes, expected);
1499 }
1500
1501 fn new_test_region_route() -> RegionRoute {
1502 new_region_route(1, 2)
1503 }
1504
1505 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1506 RegionRoute {
1507 region: Region {
1508 id: region_id.into(),
1509 name: "r1".to_string(),
1510 partition: None,
1511 attrs: BTreeMap::new(),
1512 },
1513 leader_peer: Some(Peer::new(datanode, "a2")),
1514 follower_peers: vec![],
1515 leader_state: None,
1516 leader_down_since: None,
1517 }
1518 }
1519
1520 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1521 test_utils::new_test_table_info(10, region_numbers)
1522 }
1523
1524 fn new_test_table_names() -> HashSet<TableName> {
1525 let mut set = HashSet::new();
1526 set.insert(TableName {
1527 catalog_name: "greptime".to_string(),
1528 schema_name: "public".to_string(),
1529 table_name: "a_table".to_string(),
1530 });
1531 set.insert(TableName {
1532 catalog_name: "greptime".to_string(),
1533 schema_name: "public".to_string(),
1534 table_name: "b_table".to_string(),
1535 });
1536 set
1537 }
1538
1539 async fn create_physical_table_metadata(
1540 table_metadata_manager: &TableMetadataManager,
1541 table_info: RawTableInfo,
1542 region_routes: Vec<RegionRoute>,
1543 region_wal_options: HashMap<RegionNumber, String>,
1544 ) -> Result<()> {
1545 table_metadata_manager
1546 .create_table_metadata(
1547 table_info,
1548 TableRouteValue::physical(region_routes),
1549 region_wal_options,
1550 )
1551 .await
1552 }
1553
1554 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1555 let topics = (0..2)
1556 .map(|i| format!("greptimedb_topic{}", i))
1557 .collect::<Vec<_>>();
1558 let wal_options = topics
1559 .iter()
1560 .map(|topic| {
1561 WalOptions::Kafka(KafkaWalOptions {
1562 topic: topic.clone(),
1563 })
1564 })
1565 .collect::<Vec<_>>();
1566
1567 (0..16)
1568 .enumerate()
1569 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1570 .collect()
1571 }
1572
1573 #[tokio::test]
1574 async fn test_raft_engine_topic_region_map() {
1575 let mem_kv = Arc::new(MemoryKvBackend::default());
1576 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1577 let region_route = new_test_region_route();
1578 let region_routes = &vec![region_route.clone()];
1579 let table_info: RawTableInfo =
1580 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1581 let wal_allocator = WalOptionsAllocator::RaftEngine;
1582 let regions = (0..16).collect();
1583 let region_wal_options =
1584 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1585 create_physical_table_metadata(
1586 &table_metadata_manager,
1587 table_info.clone(),
1588 region_routes.clone(),
1589 region_wal_options.clone(),
1590 )
1591 .await
1592 .unwrap();
1593
1594 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1595 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1596 let resp = mem_kv.range(range_req).await.unwrap();
1597 assert!(resp.kvs.is_empty());
1599 }
1600
1601 #[tokio::test]
1602 async fn test_create_table_metadata() {
1603 let mem_kv = Arc::new(MemoryKvBackend::default());
1604 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1605 let region_route = new_test_region_route();
1606 let region_routes = &vec![region_route.clone()];
1607 let table_info: RawTableInfo =
1608 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1609 let region_wal_options = create_mock_region_wal_options()
1610 .into_iter()
1611 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1612 .collect::<HashMap<_, _>>();
1613
1614 create_physical_table_metadata(
1616 &table_metadata_manager,
1617 table_info.clone(),
1618 region_routes.clone(),
1619 region_wal_options.clone(),
1620 )
1621 .await
1622 .unwrap();
1623
1624 assert!(create_physical_table_metadata(
1626 &table_metadata_manager,
1627 table_info.clone(),
1628 region_routes.clone(),
1629 region_wal_options.clone(),
1630 )
1631 .await
1632 .is_ok());
1633
1634 let mut modified_region_routes = region_routes.clone();
1635 modified_region_routes.push(region_route.clone());
1636 assert!(create_physical_table_metadata(
1638 &table_metadata_manager,
1639 table_info.clone(),
1640 modified_region_routes,
1641 region_wal_options.clone(),
1642 )
1643 .await
1644 .is_err());
1645
1646 let (remote_table_info, remote_table_route) = table_metadata_manager
1647 .get_full_table_info(10)
1648 .await
1649 .unwrap();
1650
1651 assert_eq!(
1652 remote_table_info.unwrap().into_inner().table_info,
1653 table_info
1654 );
1655 assert_eq!(
1656 remote_table_route
1657 .unwrap()
1658 .into_inner()
1659 .region_routes()
1660 .unwrap(),
1661 region_routes
1662 );
1663
1664 for i in 0..2 {
1665 let region_number = i as u32;
1666 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1667 let topic = format!("greptimedb_topic{}", i);
1668 let regions = table_metadata_manager
1669 .topic_region_manager
1670 .regions(&topic)
1671 .await
1672 .unwrap();
1673 assert_eq!(regions.len(), 8);
1674 assert_eq!(regions[0], region_id);
1675 }
1676 }
1677
1678 #[tokio::test]
1679 async fn test_create_logic_tables_metadata() {
1680 let mem_kv = Arc::new(MemoryKvBackend::default());
1681 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1682 let region_route = new_test_region_route();
1683 let region_routes = vec![region_route.clone()];
1684 let table_info: RawTableInfo =
1685 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1686 let table_id = table_info.ident.table_id;
1687 let table_route_value = TableRouteValue::physical(region_routes.clone());
1688
1689 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1690 table_metadata_manager
1692 .create_logical_tables_metadata(tables_data.clone())
1693 .await
1694 .unwrap();
1695
1696 assert!(table_metadata_manager
1698 .create_logical_tables_metadata(tables_data)
1699 .await
1700 .is_ok());
1701
1702 let mut modified_region_routes = region_routes.clone();
1703 modified_region_routes.push(new_region_route(2, 3));
1704 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1705 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1706 assert!(table_metadata_manager
1708 .create_logical_tables_metadata(modified_tables_data)
1709 .await
1710 .is_err());
1711
1712 let (remote_table_info, remote_table_route) = table_metadata_manager
1713 .get_full_table_info(table_id)
1714 .await
1715 .unwrap();
1716
1717 assert_eq!(
1718 remote_table_info.unwrap().into_inner().table_info,
1719 table_info
1720 );
1721 assert_eq!(
1722 remote_table_route
1723 .unwrap()
1724 .into_inner()
1725 .region_routes()
1726 .unwrap(),
1727 ®ion_routes
1728 );
1729 }
1730
1731 #[tokio::test]
1732 async fn test_create_many_logical_tables_metadata() {
1733 let kv_backend = Arc::new(MemoryKvBackend::default());
1734 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1735
1736 let mut tables_data = vec![];
1737 for i in 0..128 {
1738 let table_id = i + 1;
1739 let regin_number = table_id * 3;
1740 let region_id = RegionId::new(table_id, regin_number);
1741 let region_route = new_region_route(region_id.as_u64(), 2);
1742 let region_routes = vec![region_route.clone()];
1743 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1744 table_id,
1745 &format!("my_table_{}", table_id),
1746 region_routes.iter().map(|r| r.region.id.region_number()),
1747 )
1748 .into();
1749 let table_route_value = TableRouteValue::physical(region_routes.clone());
1750
1751 tables_data.push((table_info, table_route_value));
1752 }
1753
1754 table_metadata_manager
1756 .create_logical_tables_metadata(tables_data)
1757 .await
1758 .unwrap();
1759 }
1760
1761 #[tokio::test]
1762 async fn test_delete_table_metadata() {
1763 let mem_kv = Arc::new(MemoryKvBackend::default());
1764 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1765 let region_route = new_test_region_route();
1766 let region_routes = &vec![region_route.clone()];
1767 let table_info: RawTableInfo =
1768 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1769 let table_id = table_info.ident.table_id;
1770 let datanode_id = 2;
1771 let region_wal_options = create_mock_region_wal_options();
1772 let serialized_region_wal_options = region_wal_options
1773 .iter()
1774 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1775 .collect::<HashMap<_, _>>();
1776
1777 create_physical_table_metadata(
1779 &table_metadata_manager,
1780 table_info.clone(),
1781 region_routes.clone(),
1782 serialized_region_wal_options,
1783 )
1784 .await
1785 .unwrap();
1786
1787 let table_name = TableName::new(
1788 table_info.catalog_name,
1789 table_info.schema_name,
1790 table_info.name,
1791 );
1792 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1793 table_metadata_manager
1795 .delete_table_metadata(
1796 table_id,
1797 &table_name,
1798 table_route_value,
1799 ®ion_wal_options,
1800 )
1801 .await
1802 .unwrap();
1803 table_metadata_manager
1805 .delete_table_metadata(
1806 table_id,
1807 &table_name,
1808 table_route_value,
1809 ®ion_wal_options,
1810 )
1811 .await
1812 .unwrap();
1813 assert!(table_metadata_manager
1814 .table_info_manager()
1815 .get(table_id)
1816 .await
1817 .unwrap()
1818 .is_none());
1819 assert!(table_metadata_manager
1820 .table_route_manager()
1821 .table_route_storage()
1822 .get(table_id)
1823 .await
1824 .unwrap()
1825 .is_none());
1826 assert!(table_metadata_manager
1827 .datanode_table_manager()
1828 .tables(datanode_id)
1829 .try_collect::<Vec<_>>()
1830 .await
1831 .unwrap()
1832 .is_empty());
1833 let table_info = table_metadata_manager
1835 .table_info_manager()
1836 .get(table_id)
1837 .await
1838 .unwrap();
1839 assert!(table_info.is_none());
1840 let table_route = table_metadata_manager
1841 .table_route_manager()
1842 .table_route_storage()
1843 .get(table_id)
1844 .await
1845 .unwrap();
1846 assert!(table_route.is_none());
1847 let regions = table_metadata_manager
1849 .topic_region_manager
1850 .regions("greptimedb_topic0")
1851 .await
1852 .unwrap();
1853 assert_eq!(regions.len(), 0);
1854 let regions = table_metadata_manager
1855 .topic_region_manager
1856 .regions("greptimedb_topic1")
1857 .await
1858 .unwrap();
1859 assert_eq!(regions.len(), 0);
1860 }
1861
1862 #[tokio::test]
1863 async fn test_rename_table() {
1864 let mem_kv = Arc::new(MemoryKvBackend::default());
1865 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1866 let region_route = new_test_region_route();
1867 let region_routes = vec![region_route.clone()];
1868 let table_info: RawTableInfo =
1869 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1870 let table_id = table_info.ident.table_id;
1871 create_physical_table_metadata(
1873 &table_metadata_manager,
1874 table_info.clone(),
1875 region_routes.clone(),
1876 HashMap::new(),
1877 )
1878 .await
1879 .unwrap();
1880
1881 let new_table_name = "another_name".to_string();
1882 let table_info_value =
1883 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1884
1885 table_metadata_manager
1886 .rename_table(&table_info_value, new_table_name.clone())
1887 .await
1888 .unwrap();
1889 table_metadata_manager
1891 .rename_table(&table_info_value, new_table_name.clone())
1892 .await
1893 .unwrap();
1894 let mut modified_table_info = table_info.clone();
1895 modified_table_info.name = "hi".to_string();
1896 let modified_table_info_value =
1897 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1898 assert!(table_metadata_manager
1901 .rename_table(&modified_table_info_value, new_table_name.clone())
1902 .await
1903 .is_err());
1904
1905 let old_table_name = TableNameKey::new(
1906 &table_info.catalog_name,
1907 &table_info.schema_name,
1908 &table_info.name,
1909 );
1910 let new_table_name = TableNameKey::new(
1911 &table_info.catalog_name,
1912 &table_info.schema_name,
1913 &new_table_name,
1914 );
1915
1916 assert!(table_metadata_manager
1917 .table_name_manager()
1918 .get(old_table_name)
1919 .await
1920 .unwrap()
1921 .is_none());
1922
1923 assert_eq!(
1924 table_metadata_manager
1925 .table_name_manager()
1926 .get(new_table_name)
1927 .await
1928 .unwrap()
1929 .unwrap()
1930 .table_id(),
1931 table_id
1932 );
1933 }
1934
1935 #[tokio::test]
1936 async fn test_update_table_info() {
1937 let mem_kv = Arc::new(MemoryKvBackend::default());
1938 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1939 let region_route = new_test_region_route();
1940 let region_routes = vec![region_route.clone()];
1941 let table_info: RawTableInfo =
1942 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1943 let table_id = table_info.ident.table_id;
1944 create_physical_table_metadata(
1946 &table_metadata_manager,
1947 table_info.clone(),
1948 region_routes.clone(),
1949 HashMap::new(),
1950 )
1951 .await
1952 .unwrap();
1953
1954 let mut new_table_info = table_info.clone();
1955 new_table_info.name = "hi".to_string();
1956 let current_table_info_value =
1957 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1958 table_metadata_manager
1960 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1961 .await
1962 .unwrap();
1963 table_metadata_manager
1965 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1966 .await
1967 .unwrap();
1968
1969 let updated_table_info = table_metadata_manager
1971 .table_info_manager()
1972 .get(table_id)
1973 .await
1974 .unwrap()
1975 .unwrap()
1976 .into_inner();
1977 assert_eq!(updated_table_info.table_info, new_table_info);
1978
1979 let mut wrong_table_info = table_info.clone();
1980 wrong_table_info.name = "wrong".to_string();
1981 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1982 current_table_info_value.update(wrong_table_info),
1983 );
1984 assert!(table_metadata_manager
1987 .update_table_info(&wrong_table_info_value, None, new_table_info)
1988 .await
1989 .is_err())
1990 }
1991
1992 #[tokio::test]
1993 async fn test_update_table_leader_region_status() {
1994 let mem_kv = Arc::new(MemoryKvBackend::default());
1995 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1996 let datanode = 1;
1997 let region_routes = vec![
1998 RegionRoute {
1999 region: Region {
2000 id: 1.into(),
2001 name: "r1".to_string(),
2002 partition: None,
2003 attrs: BTreeMap::new(),
2004 },
2005 leader_peer: Some(Peer::new(datanode, "a2")),
2006 leader_state: Some(LeaderState::Downgrading),
2007 follower_peers: vec![],
2008 leader_down_since: Some(current_time_millis()),
2009 },
2010 RegionRoute {
2011 region: Region {
2012 id: 2.into(),
2013 name: "r2".to_string(),
2014 partition: None,
2015 attrs: BTreeMap::new(),
2016 },
2017 leader_peer: Some(Peer::new(datanode, "a1")),
2018 leader_state: None,
2019 follower_peers: vec![],
2020 leader_down_since: None,
2021 },
2022 ];
2023 let table_info: RawTableInfo =
2024 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2025 let table_id = table_info.ident.table_id;
2026 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2027 TableRouteValue::physical(region_routes.clone()),
2028 );
2029
2030 create_physical_table_metadata(
2032 &table_metadata_manager,
2033 table_info.clone(),
2034 region_routes.clone(),
2035 HashMap::new(),
2036 )
2037 .await
2038 .unwrap();
2039
2040 table_metadata_manager
2041 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2042 if region_route.leader_state.is_some() {
2043 None
2044 } else {
2045 Some(Some(LeaderState::Downgrading))
2046 }
2047 })
2048 .await
2049 .unwrap();
2050
2051 let updated_route_value = table_metadata_manager
2052 .table_route_manager()
2053 .table_route_storage()
2054 .get(table_id)
2055 .await
2056 .unwrap()
2057 .unwrap();
2058
2059 assert_eq!(
2060 updated_route_value.region_routes().unwrap()[0].leader_state,
2061 Some(LeaderState::Downgrading)
2062 );
2063
2064 assert!(updated_route_value.region_routes().unwrap()[0]
2065 .leader_down_since
2066 .is_some());
2067
2068 assert_eq!(
2069 updated_route_value.region_routes().unwrap()[1].leader_state,
2070 Some(LeaderState::Downgrading)
2071 );
2072 assert!(updated_route_value.region_routes().unwrap()[1]
2073 .leader_down_since
2074 .is_some());
2075 }
2076
2077 async fn assert_datanode_table(
2078 table_metadata_manager: &TableMetadataManager,
2079 table_id: u32,
2080 region_routes: &[RegionRoute],
2081 ) {
2082 let region_distribution = region_distribution(region_routes);
2083 for (datanode, regions) in region_distribution {
2084 let got = table_metadata_manager
2085 .datanode_table_manager()
2086 .get(&DatanodeTableKey::new(datanode, table_id))
2087 .await
2088 .unwrap()
2089 .unwrap();
2090
2091 assert_eq!(got.regions, regions.leader_regions);
2092 assert_eq!(got.follower_regions, regions.follower_regions);
2093 }
2094 }
2095
2096 #[tokio::test]
2097 async fn test_update_table_route() {
2098 let mem_kv = Arc::new(MemoryKvBackend::default());
2099 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2100 let region_route = new_test_region_route();
2101 let region_routes = vec![region_route.clone()];
2102 let table_info: RawTableInfo =
2103 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2104 let table_id = table_info.ident.table_id;
2105 let engine = table_info.meta.engine.as_str();
2106 let region_storage_path =
2107 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2108 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2109 TableRouteValue::physical(region_routes.clone()),
2110 );
2111
2112 create_physical_table_metadata(
2114 &table_metadata_manager,
2115 table_info.clone(),
2116 region_routes.clone(),
2117 HashMap::new(),
2118 )
2119 .await
2120 .unwrap();
2121
2122 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2123 let new_region_routes = vec![
2124 new_region_route(1, 1),
2125 new_region_route(2, 2),
2126 new_region_route(3, 3),
2127 ];
2128 table_metadata_manager
2130 .update_table_route(
2131 table_id,
2132 RegionInfo {
2133 engine: engine.to_string(),
2134 region_storage_path: region_storage_path.to_string(),
2135 region_options: HashMap::new(),
2136 region_wal_options: HashMap::new(),
2137 },
2138 ¤t_table_route_value,
2139 new_region_routes.clone(),
2140 &HashMap::new(),
2141 &HashMap::new(),
2142 )
2143 .await
2144 .unwrap();
2145 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2146
2147 table_metadata_manager
2149 .update_table_route(
2150 table_id,
2151 RegionInfo {
2152 engine: engine.to_string(),
2153 region_storage_path: region_storage_path.to_string(),
2154 region_options: HashMap::new(),
2155 region_wal_options: HashMap::new(),
2156 },
2157 ¤t_table_route_value,
2158 new_region_routes.clone(),
2159 &HashMap::new(),
2160 &HashMap::new(),
2161 )
2162 .await
2163 .unwrap();
2164
2165 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2166 current_table_route_value
2167 .inner
2168 .update(new_region_routes.clone())
2169 .unwrap(),
2170 );
2171 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2172 table_metadata_manager
2174 .update_table_route(
2175 table_id,
2176 RegionInfo {
2177 engine: engine.to_string(),
2178 region_storage_path: region_storage_path.to_string(),
2179 region_options: HashMap::new(),
2180 region_wal_options: HashMap::new(),
2181 },
2182 ¤t_table_route_value,
2183 new_region_routes.clone(),
2184 &HashMap::new(),
2185 &HashMap::new(),
2186 )
2187 .await
2188 .unwrap();
2189 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2190
2191 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2194 current_table_route_value
2195 .update(vec![
2196 new_region_route(1, 1),
2197 new_region_route(2, 2),
2198 new_region_route(3, 3),
2199 new_region_route(4, 4),
2200 ])
2201 .unwrap(),
2202 );
2203 assert!(table_metadata_manager
2204 .update_table_route(
2205 table_id,
2206 RegionInfo {
2207 engine: engine.to_string(),
2208 region_storage_path: region_storage_path.to_string(),
2209 region_options: HashMap::new(),
2210 region_wal_options: HashMap::new(),
2211 },
2212 &wrong_table_route_value,
2213 new_region_routes,
2214 &HashMap::new(),
2215 &HashMap::new(),
2216 )
2217 .await
2218 .is_err());
2219 }
2220
2221 #[tokio::test]
2222 async fn test_destroy_table_metadata() {
2223 let mem_kv = Arc::new(MemoryKvBackend::default());
2224 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2225 let table_id = 1025;
2226 let table_name = "foo";
2227 let task = test_create_table_task(table_name, table_id);
2228 let options = create_mock_region_wal_options();
2229 let serialized_options = options
2230 .iter()
2231 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2232 .collect::<HashMap<_, _>>();
2233 table_metadata_manager
2234 .create_table_metadata(
2235 task.table_info,
2236 TableRouteValue::physical(vec![
2237 RegionRoute {
2238 region: Region::new_test(RegionId::new(table_id, 1)),
2239 leader_peer: Some(Peer::empty(1)),
2240 follower_peers: vec![Peer::empty(5)],
2241 leader_state: None,
2242 leader_down_since: None,
2243 },
2244 RegionRoute {
2245 region: Region::new_test(RegionId::new(table_id, 2)),
2246 leader_peer: Some(Peer::empty(2)),
2247 follower_peers: vec![Peer::empty(4)],
2248 leader_state: None,
2249 leader_down_since: None,
2250 },
2251 RegionRoute {
2252 region: Region::new_test(RegionId::new(table_id, 3)),
2253 leader_peer: Some(Peer::empty(3)),
2254 follower_peers: vec![],
2255 leader_state: None,
2256 leader_down_since: None,
2257 },
2258 ]),
2259 serialized_options,
2260 )
2261 .await
2262 .unwrap();
2263 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2264 let table_route_value = table_metadata_manager
2265 .table_route_manager
2266 .table_route_storage()
2267 .get_with_raw_bytes(table_id)
2268 .await
2269 .unwrap()
2270 .unwrap();
2271 table_metadata_manager
2272 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2273 .await
2274 .unwrap();
2275 assert!(mem_kv.is_empty());
2276 }
2277
2278 #[tokio::test]
2279 async fn test_restore_table_metadata() {
2280 let mem_kv = Arc::new(MemoryKvBackend::default());
2281 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2282 let table_id = 1025;
2283 let table_name = "foo";
2284 let task = test_create_table_task(table_name, table_id);
2285 let options = create_mock_region_wal_options();
2286 let serialized_options = options
2287 .iter()
2288 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2289 .collect::<HashMap<_, _>>();
2290 table_metadata_manager
2291 .create_table_metadata(
2292 task.table_info,
2293 TableRouteValue::physical(vec![
2294 RegionRoute {
2295 region: Region::new_test(RegionId::new(table_id, 1)),
2296 leader_peer: Some(Peer::empty(1)),
2297 follower_peers: vec![Peer::empty(5)],
2298 leader_state: None,
2299 leader_down_since: None,
2300 },
2301 RegionRoute {
2302 region: Region::new_test(RegionId::new(table_id, 2)),
2303 leader_peer: Some(Peer::empty(2)),
2304 follower_peers: vec![Peer::empty(4)],
2305 leader_state: None,
2306 leader_down_since: None,
2307 },
2308 RegionRoute {
2309 region: Region::new_test(RegionId::new(table_id, 3)),
2310 leader_peer: Some(Peer::empty(3)),
2311 follower_peers: vec![],
2312 leader_state: None,
2313 leader_down_since: None,
2314 },
2315 ]),
2316 serialized_options,
2317 )
2318 .await
2319 .unwrap();
2320 let expected_result = mem_kv.dump();
2321 let table_route_value = table_metadata_manager
2322 .table_route_manager
2323 .table_route_storage()
2324 .get_with_raw_bytes(table_id)
2325 .await
2326 .unwrap()
2327 .unwrap();
2328 let region_routes = table_route_value.region_routes().unwrap();
2329 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2330 let table_route_value = TableRouteValue::physical(region_routes.clone());
2331 table_metadata_manager
2332 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2333 .await
2334 .unwrap();
2335 table_metadata_manager
2336 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2337 .await
2338 .unwrap();
2339 let kvs = mem_kv.dump();
2340 assert_eq!(kvs, expected_result);
2341 table_metadata_manager
2343 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2344 .await
2345 .unwrap();
2346 let kvs = mem_kv.dump();
2347 assert_eq!(kvs, expected_result);
2348 }
2349
2350 #[tokio::test]
2351 async fn test_create_update_view_info() {
2352 let mem_kv = Arc::new(MemoryKvBackend::default());
2353 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2354
2355 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2356
2357 let view_id = view_info.ident.table_id;
2358
2359 let logical_plan: Vec<u8> = vec![1, 2, 3];
2360 let columns = vec!["a".to_string()];
2361 let plan_columns = vec!["number".to_string()];
2362 let table_names = new_test_table_names();
2363 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2364
2365 table_metadata_manager
2367 .create_view_metadata(
2368 view_info.clone(),
2369 logical_plan.clone(),
2370 table_names.clone(),
2371 columns.clone(),
2372 plan_columns.clone(),
2373 definition.to_string(),
2374 )
2375 .await
2376 .unwrap();
2377
2378 {
2379 let current_view_info = table_metadata_manager
2381 .view_info_manager()
2382 .get(view_id)
2383 .await
2384 .unwrap()
2385 .unwrap()
2386 .into_inner();
2387 assert_eq!(current_view_info.view_info, logical_plan);
2388 assert_eq!(current_view_info.table_names, table_names);
2389 assert_eq!(current_view_info.definition, definition);
2390 assert_eq!(current_view_info.columns, columns);
2391 assert_eq!(current_view_info.plan_columns, plan_columns);
2392 let current_table_info = table_metadata_manager
2394 .table_info_manager()
2395 .get(view_id)
2396 .await
2397 .unwrap()
2398 .unwrap()
2399 .into_inner();
2400 assert_eq!(current_table_info.table_info, view_info);
2401 }
2402
2403 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2404 let new_table_names = {
2405 let mut set = HashSet::new();
2406 set.insert(TableName {
2407 catalog_name: "greptime".to_string(),
2408 schema_name: "public".to_string(),
2409 table_name: "b_table".to_string(),
2410 });
2411 set.insert(TableName {
2412 catalog_name: "greptime".to_string(),
2413 schema_name: "public".to_string(),
2414 table_name: "c_table".to_string(),
2415 });
2416 set
2417 };
2418 let new_columns = vec!["b".to_string()];
2419 let new_plan_columns = vec!["number2".to_string()];
2420 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2421
2422 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2423 logical_plan.clone(),
2424 table_names,
2425 columns,
2426 plan_columns,
2427 definition.to_string(),
2428 ));
2429 table_metadata_manager
2431 .update_view_info(
2432 view_id,
2433 ¤t_view_info_value,
2434 new_logical_plan.clone(),
2435 new_table_names.clone(),
2436 new_columns.clone(),
2437 new_plan_columns.clone(),
2438 new_definition.to_string(),
2439 )
2440 .await
2441 .unwrap();
2442 table_metadata_manager
2444 .update_view_info(
2445 view_id,
2446 ¤t_view_info_value,
2447 new_logical_plan.clone(),
2448 new_table_names.clone(),
2449 new_columns.clone(),
2450 new_plan_columns.clone(),
2451 new_definition.to_string(),
2452 )
2453 .await
2454 .unwrap();
2455
2456 let updated_view_info = table_metadata_manager
2458 .view_info_manager()
2459 .get(view_id)
2460 .await
2461 .unwrap()
2462 .unwrap()
2463 .into_inner();
2464 assert_eq!(updated_view_info.view_info, new_logical_plan);
2465 assert_eq!(updated_view_info.table_names, new_table_names);
2466 assert_eq!(updated_view_info.definition, new_definition);
2467 assert_eq!(updated_view_info.columns, new_columns);
2468 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2469
2470 let wrong_view_info = logical_plan.clone();
2471 let wrong_definition = "wrong_definition";
2472 let wrong_view_info_value =
2473 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2474 wrong_view_info,
2475 new_table_names.clone(),
2476 new_columns.clone(),
2477 new_plan_columns.clone(),
2478 wrong_definition.to_string(),
2479 ));
2480 assert!(table_metadata_manager
2483 .update_view_info(
2484 view_id,
2485 &wrong_view_info_value,
2486 new_logical_plan.clone(),
2487 new_table_names.clone(),
2488 vec!["c".to_string()],
2489 vec!["number3".to_string()],
2490 wrong_definition.to_string(),
2491 )
2492 .await
2493 .is_err());
2494
2495 let current_view_info = table_metadata_manager
2497 .view_info_manager()
2498 .get(view_id)
2499 .await
2500 .unwrap()
2501 .unwrap()
2502 .into_inner();
2503 assert_eq!(current_view_info.view_info, new_logical_plan);
2504 assert_eq!(current_view_info.table_names, new_table_names);
2505 assert_eq!(current_view_info.definition, new_definition);
2506 assert_eq!(current_view_info.columns, new_columns);
2507 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2508 }
2509
2510 #[test]
2511 fn test_region_role_set_deserialize() {
2512 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2513 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2514 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2515 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2516
2517 let s = r#"[1, 2, 3]"#;
2518 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2519 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2520 assert!(region_role_set.follower_regions.is_empty());
2521 }
2522
2523 #[test]
2524 fn test_region_distribution_deserialize() {
2525 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2526 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2527 assert_eq!(region_distribution.len(), 2);
2528 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2529 assert!(region_distribution[&1].follower_regions.is_empty());
2530 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2531 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2532 }
2533}