1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod maintenance;
104pub mod node_address;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const MAINTENANCE_KEY: &str = "__maintenance";
168
169pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
170pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
171pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
172pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
173pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
174pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
175pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
176pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
177pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
178pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
180pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
181
182pub const CACHE_KEY_PREFIXES: [&str; 5] = [
184 TABLE_NAME_KEY_PREFIX,
185 CATALOG_NAME_KEY_PREFIX,
186 SCHEMA_NAME_KEY_PREFIX,
187 TABLE_ROUTE_PREFIX,
188 NODE_ADDRESS_PREFIX,
189];
190
191#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
193pub struct RegionRoleSet {
194 pub leader_regions: Vec<RegionNumber>,
196 pub follower_regions: Vec<RegionNumber>,
198}
199
200impl<'de> Deserialize<'de> for RegionRoleSet {
201 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
202 where
203 D: serde::Deserializer<'de>,
204 {
205 #[derive(Deserialize)]
206 #[serde(untagged)]
207 enum RegionRoleSetOrLeaderOnly {
208 Full {
209 leader_regions: Vec<RegionNumber>,
210 follower_regions: Vec<RegionNumber>,
211 },
212 LeaderOnly(Vec<RegionNumber>),
213 }
214 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
215 RegionRoleSetOrLeaderOnly::Full {
216 leader_regions,
217 follower_regions,
218 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
219 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
220 Ok(RegionRoleSet::new(leader_regions, vec![]))
221 }
222 }
223 }
224}
225
226impl RegionRoleSet {
227 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
229 Self {
230 leader_regions,
231 follower_regions,
232 }
233 }
234
235 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
237 self.leader_regions.push(region_number);
238 }
239
240 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
242 self.follower_regions.push(region_number);
243 }
244
245 pub fn sort(&mut self) {
247 self.follower_regions.sort();
248 self.leader_regions.sort();
249 }
250}
251
252pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
256
257pub type FlowId = u32;
259pub type FlowPartitionId = u32;
261
262lazy_static! {
263 pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
264}
265
266lazy_static! {
267 static ref TABLE_INFO_KEY_PATTERN: Regex =
268 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
269}
270
271lazy_static! {
272 static ref VIEW_INFO_KEY_PATTERN: Regex =
273 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
274}
275
276lazy_static! {
277 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
278 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
279}
280
281lazy_static! {
282 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
283 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
284}
285
286lazy_static! {
287 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
288 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
289 ))
290 .unwrap();
291}
292
293lazy_static! {
294 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
296 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
297 ))
298 .unwrap();
299}
300
301lazy_static! {
302 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
304 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
305 ))
306 .unwrap();
307}
308
309lazy_static! {
310 static ref NODE_ADDRESS_PATTERN: Regex =
311 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
312}
313
314lazy_static! {
315 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
316 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
317}
318
319lazy_static! {
320 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
321 "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
322 ))
323 .unwrap();
324}
325
326pub trait MetadataKey<'a, T> {
328 fn to_bytes(&self) -> Vec<u8>;
329
330 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
331}
332
333#[derive(Debug, Clone, PartialEq)]
334pub struct BytesAdapter(Vec<u8>);
335
336impl From<Vec<u8>> for BytesAdapter {
337 fn from(value: Vec<u8>) -> Self {
338 Self(value)
339 }
340}
341
342impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
343 fn to_bytes(&self) -> Vec<u8> {
344 self.0.clone()
345 }
346
347 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
348 Ok(BytesAdapter(bytes.to_vec()))
349 }
350}
351
352pub(crate) trait MetadataKeyGetTxnOp {
353 fn build_get_op(
354 &self,
355 ) -> (
356 TxnOp,
357 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
358 );
359}
360
361pub trait MetadataValue {
362 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
363 where
364 Self: Sized;
365
366 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
367}
368
369pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
370
371pub struct TableMetadataManager {
372 table_name_manager: TableNameManager,
373 table_info_manager: TableInfoManager,
374 view_info_manager: ViewInfoManager,
375 datanode_table_manager: DatanodeTableManager,
376 catalog_manager: CatalogManager,
377 schema_manager: SchemaManager,
378 table_route_manager: TableRouteManager,
379 tombstone_manager: TombstoneManager,
380 topic_name_manager: TopicNameManager,
381 topic_region_manager: TopicRegionManager,
382 kv_backend: KvBackendRef,
383}
384
385#[macro_export]
386macro_rules! ensure_values {
387 ($got:expr, $expected_value:expr, $name:expr) => {
388 ensure!(
389 $got == $expected_value,
390 error::UnexpectedSnafu {
391 err_msg: format!(
392 "Reads the different value: {:?} during {}, expected: {:?}",
393 $got, $name, $expected_value
394 )
395 }
396 );
397 };
398}
399
400pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
410 bytes: Bytes,
412 inner: T,
414}
415
416impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
417 type Target = T;
418
419 fn deref(&self) -> &Self::Target {
420 &self.inner
421 }
422}
423
424impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
425 fn deref_mut(&mut self) -> &mut Self::Target {
426 &mut self.inner
427 }
428}
429
430impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 write!(
433 f,
434 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
435 self.inner, self.bytes
436 )
437 }
438}
439
440impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
441 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
445 where
446 S: serde::Serializer,
447 {
448 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
451 }
452}
453
454impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
455 for DeserializedValueWithBytes<T>
456{
457 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
461 where
462 D: serde::Deserializer<'de>,
463 {
464 let buf = String::deserialize(deserializer)?;
465 let bytes = Bytes::from(buf);
466
467 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
468 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
469
470 Ok(value)
471 }
472}
473
474impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
475 fn clone(&self) -> Self {
476 Self {
477 bytes: self.bytes.clone(),
478 inner: self.inner.clone(),
479 }
480 }
481}
482
483impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
484 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
487 let inner = T::try_from_raw_value(&bytes)?;
488 Ok(Self { bytes, inner })
489 }
490
491 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
494 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
495 }
496
497 pub fn into_inner(self) -> T {
498 self.inner
499 }
500
501 pub fn get_inner_ref(&self) -> &T {
502 &self.inner
503 }
504
505 pub fn get_raw_bytes(&self) -> Vec<u8> {
507 self.bytes.to_vec()
508 }
509
510 #[cfg(any(test, feature = "testing"))]
511 pub fn from_inner(inner: T) -> Self {
512 let bytes = serde_json::to_vec(&inner).unwrap();
513
514 Self {
515 bytes: Bytes::from(bytes),
516 inner,
517 }
518 }
519}
520
521impl TableMetadataManager {
522 pub fn new(kv_backend: KvBackendRef) -> Self {
523 TableMetadataManager {
524 table_name_manager: TableNameManager::new(kv_backend.clone()),
525 table_info_manager: TableInfoManager::new(kv_backend.clone()),
526 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
527 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
528 catalog_manager: CatalogManager::new(kv_backend.clone()),
529 schema_manager: SchemaManager::new(kv_backend.clone()),
530 table_route_manager: TableRouteManager::new(kv_backend.clone()),
531 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
532 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
533 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
534 kv_backend,
535 }
536 }
537
538 pub async fn init(&self) -> Result<()> {
539 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
540
541 self.catalog_manager().create(catalog_name, true).await?;
542
543 let internal_schemas = [
544 DEFAULT_SCHEMA_NAME,
545 INFORMATION_SCHEMA_NAME,
546 DEFAULT_PRIVATE_SCHEMA_NAME,
547 ];
548
549 for schema_name in internal_schemas {
550 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
551
552 self.schema_manager().create(schema_key, None, true).await?;
553 }
554
555 Ok(())
556 }
557
558 pub fn table_name_manager(&self) -> &TableNameManager {
559 &self.table_name_manager
560 }
561
562 pub fn table_info_manager(&self) -> &TableInfoManager {
563 &self.table_info_manager
564 }
565
566 pub fn view_info_manager(&self) -> &ViewInfoManager {
567 &self.view_info_manager
568 }
569
570 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
571 &self.datanode_table_manager
572 }
573
574 pub fn catalog_manager(&self) -> &CatalogManager {
575 &self.catalog_manager
576 }
577
578 pub fn schema_manager(&self) -> &SchemaManager {
579 &self.schema_manager
580 }
581
582 pub fn table_route_manager(&self) -> &TableRouteManager {
583 &self.table_route_manager
584 }
585
586 pub fn topic_name_manager(&self) -> &TopicNameManager {
587 &self.topic_name_manager
588 }
589
590 pub fn topic_region_manager(&self) -> &TopicRegionManager {
591 &self.topic_region_manager
592 }
593
594 #[cfg(feature = "testing")]
595 pub fn kv_backend(&self) -> &KvBackendRef {
596 &self.kv_backend
597 }
598
599 pub async fn get_full_table_info(
600 &self,
601 table_id: TableId,
602 ) -> Result<(
603 Option<DeserializedValueWithBytes<TableInfoValue>>,
604 Option<DeserializedValueWithBytes<TableRouteValue>>,
605 )> {
606 let table_info_key = TableInfoKey::new(table_id);
607 let table_route_key = TableRouteKey::new(table_id);
608 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
609 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
610
611 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
612 let mut res = self.kv_backend.txn(txn).await?;
613 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
614 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
615 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
616 Ok((table_info_value, table_route_value))
617 }
618
619 pub async fn create_view_metadata(
629 &self,
630 view_info: RawTableInfo,
631 raw_logical_plan: Vec<u8>,
632 table_names: HashSet<TableName>,
633 columns: Vec<String>,
634 plan_columns: Vec<String>,
635 definition: String,
636 ) -> Result<()> {
637 let view_id = view_info.ident.table_id;
638
639 let view_name = TableNameKey::new(
641 &view_info.catalog_name,
642 &view_info.schema_name,
643 &view_info.name,
644 );
645 let create_table_name_txn = self
646 .table_name_manager()
647 .build_create_txn(&view_name, view_id)?;
648
649 let table_info_value = TableInfoValue::new(view_info);
651
652 let (create_table_info_txn, on_create_table_info_failure) = self
653 .table_info_manager()
654 .build_create_txn(view_id, &table_info_value)?;
655
656 let view_info_value = ViewInfoValue::new(
658 raw_logical_plan,
659 table_names,
660 columns,
661 plan_columns,
662 definition,
663 );
664 let (create_view_info_txn, on_create_view_info_failure) = self
665 .view_info_manager()
666 .build_create_txn(view_id, &view_info_value)?;
667
668 let txn = Txn::merge_all(vec![
669 create_table_name_txn,
670 create_table_info_txn,
671 create_view_info_txn,
672 ]);
673
674 let mut r = self.kv_backend.txn(txn).await?;
675
676 if !r.succeeded {
678 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
679 let remote_table_info = on_create_table_info_failure(&mut set)?
680 .context(error::UnexpectedSnafu {
681 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
682 })?
683 .into_inner();
684
685 let remote_view_info = on_create_view_info_failure(&mut set)?
686 .context(error::UnexpectedSnafu {
687 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
688 })?
689 .into_inner();
690
691 let op_name = "the creating view metadata";
692 ensure_values!(remote_table_info, table_info_value, op_name);
693 ensure_values!(remote_view_info, view_info_value, op_name);
694 }
695
696 Ok(())
697 }
698
699 pub async fn create_table_metadata(
702 &self,
703 mut table_info: RawTableInfo,
704 table_route_value: TableRouteValue,
705 region_wal_options: HashMap<RegionNumber, String>,
706 ) -> Result<()> {
707 let region_numbers = table_route_value.region_numbers();
708 table_info.meta.region_numbers = region_numbers;
709 let table_id = table_info.ident.table_id;
710 let engine = table_info.meta.engine.clone();
711
712 let table_name = TableNameKey::new(
714 &table_info.catalog_name,
715 &table_info.schema_name,
716 &table_info.name,
717 );
718 let create_table_name_txn = self
719 .table_name_manager()
720 .build_create_txn(&table_name, table_id)?;
721
722 let region_options = table_info.to_region_options();
723 let table_info_value = TableInfoValue::new(table_info);
725 let (create_table_info_txn, on_create_table_info_failure) = self
726 .table_info_manager()
727 .build_create_txn(table_id, &table_info_value)?;
728
729 let (create_table_route_txn, on_create_table_route_failure) = self
730 .table_route_manager()
731 .table_route_storage()
732 .build_create_txn(table_id, &table_route_value)?;
733
734 let create_topic_region_txn = self
735 .topic_region_manager
736 .build_create_txn(table_id, ®ion_wal_options)?;
737
738 let mut txn = Txn::merge_all(vec![
739 create_table_name_txn,
740 create_table_info_txn,
741 create_table_route_txn,
742 create_topic_region_txn,
743 ]);
744
745 if let TableRouteValue::Physical(x) = &table_route_value {
746 let region_storage_path = table_info_value.region_storage_path();
747 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
748 table_id,
749 &engine,
750 ®ion_storage_path,
751 region_options,
752 region_wal_options,
753 region_distribution(&x.region_routes),
754 )?;
755 txn = txn.merge(create_datanode_table_txn);
756 }
757
758 let mut r = self.kv_backend.txn(txn).await?;
759
760 if !r.succeeded {
762 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
763 let remote_table_info = on_create_table_info_failure(&mut set)?
764 .context(error::UnexpectedSnafu {
765 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
766 })?
767 .into_inner();
768
769 let remote_table_route = on_create_table_route_failure(&mut set)?
770 .context(error::UnexpectedSnafu {
771 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
772 })?
773 .into_inner();
774
775 let op_name = "the creating table metadata";
776 ensure_values!(remote_table_info, table_info_value, op_name);
777 ensure_values!(remote_table_route, table_route_value, op_name);
778 }
779
780 Ok(())
781 }
782
783 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
784 self.kv_backend.max_txn_ops() / 3
787 }
788
789 pub async fn create_logical_tables_metadata(
791 &self,
792 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
793 ) -> Result<()> {
794 let len = tables_data.len();
795 let mut txns = Vec::with_capacity(3 * len);
796 struct OnFailure<F1, R1, F2, R2>
797 where
798 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
799 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
800 {
801 table_info_value: TableInfoValue,
802 on_create_table_info_failure: F1,
803 table_route_value: TableRouteValue,
804 on_create_table_route_failure: F2,
805 }
806 let mut on_failures = Vec::with_capacity(len);
807 for (mut table_info, table_route_value) in tables_data {
808 table_info.meta.region_numbers = table_route_value.region_numbers();
809 let table_id = table_info.ident.table_id;
810
811 let table_name = TableNameKey::new(
813 &table_info.catalog_name,
814 &table_info.schema_name,
815 &table_info.name,
816 );
817 let create_table_name_txn = self
818 .table_name_manager()
819 .build_create_txn(&table_name, table_id)?;
820 txns.push(create_table_name_txn);
821
822 let table_info_value = TableInfoValue::new(table_info);
824 let (create_table_info_txn, on_create_table_info_failure) =
825 self.table_info_manager()
826 .build_create_txn(table_id, &table_info_value)?;
827 txns.push(create_table_info_txn);
828
829 let (create_table_route_txn, on_create_table_route_failure) = self
830 .table_route_manager()
831 .table_route_storage()
832 .build_create_txn(table_id, &table_route_value)?;
833 txns.push(create_table_route_txn);
834
835 on_failures.push(OnFailure {
836 table_info_value,
837 on_create_table_info_failure,
838 table_route_value,
839 on_create_table_route_failure,
840 });
841 }
842
843 let txn = Txn::merge_all(txns);
844 let mut r = self.kv_backend.txn(txn).await?;
845
846 if !r.succeeded {
848 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
849 for on_failure in on_failures {
850 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
851 .context(error::UnexpectedSnafu {
852 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
853 })?
854 .into_inner();
855
856 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
857 .context(error::UnexpectedSnafu {
858 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
859 })?
860 .into_inner();
861
862 let op_name = "the creating logical tables metadata";
863 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
864 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
865 }
866 }
867
868 Ok(())
869 }
870
871 fn table_metadata_keys(
872 &self,
873 table_id: TableId,
874 table_name: &TableName,
875 table_route_value: &TableRouteValue,
876 region_wal_options: &HashMap<RegionNumber, WalOptions>,
877 ) -> Result<Vec<Vec<u8>>> {
878 let datanode_ids = if table_route_value.is_physical() {
880 region_distribution(table_route_value.region_routes()?)
881 .into_keys()
882 .collect()
883 } else {
884 vec![]
885 };
886 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
887 let table_name = TableNameKey::new(
888 &table_name.catalog_name,
889 &table_name.schema_name,
890 &table_name.table_name,
891 );
892 let table_info_key = TableInfoKey::new(table_id);
893 let table_route_key = TableRouteKey::new(table_id);
894 let datanode_table_keys = datanode_ids
895 .into_iter()
896 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
897 .collect::<HashSet<_>>();
898 let topic_region_map = self
899 .topic_region_manager
900 .get_topic_region_mapping(table_id, region_wal_options);
901 let topic_region_keys = topic_region_map
902 .iter()
903 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
904 .collect::<Vec<_>>();
905 keys.push(table_name.to_bytes());
906 keys.push(table_info_key.to_bytes());
907 keys.push(table_route_key.to_bytes());
908 for key in &datanode_table_keys {
909 keys.push(key.to_bytes());
910 }
911 for key in topic_region_keys {
912 keys.push(key.to_bytes());
913 }
914 Ok(keys)
915 }
916
917 pub async fn delete_table_metadata(
920 &self,
921 table_id: TableId,
922 table_name: &TableName,
923 table_route_value: &TableRouteValue,
924 region_wal_options: &HashMap<RegionNumber, WalOptions>,
925 ) -> Result<()> {
926 let keys =
927 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
928 self.tombstone_manager.create(keys).await
929 }
930
931 pub async fn delete_table_metadata_tombstone(
934 &self,
935 table_id: TableId,
936 table_name: &TableName,
937 table_route_value: &TableRouteValue,
938 region_wal_options: &HashMap<RegionNumber, WalOptions>,
939 ) -> Result<()> {
940 let table_metadata_keys =
941 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
942 self.tombstone_manager.delete(table_metadata_keys).await
943 }
944
945 pub async fn restore_table_metadata(
948 &self,
949 table_id: TableId,
950 table_name: &TableName,
951 table_route_value: &TableRouteValue,
952 region_wal_options: &HashMap<RegionNumber, WalOptions>,
953 ) -> Result<()> {
954 let keys =
955 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
956 self.tombstone_manager.restore(keys).await
957 }
958
959 pub async fn destroy_table_metadata(
962 &self,
963 table_id: TableId,
964 table_name: &TableName,
965 table_route_value: &TableRouteValue,
966 region_wal_options: &HashMap<RegionNumber, WalOptions>,
967 ) -> Result<()> {
968 let keys =
969 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
970 let _ = self
971 .kv_backend
972 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
973 .await?;
974 Ok(())
975 }
976
977 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
978 let mut keys = Vec::with_capacity(3);
979 let view_name = TableNameKey::new(
980 &view_name.catalog_name,
981 &view_name.schema_name,
982 &view_name.table_name,
983 );
984 let table_info_key = TableInfoKey::new(view_id);
985 let view_info_key = ViewInfoKey::new(view_id);
986 keys.push(view_name.to_bytes());
987 keys.push(table_info_key.to_bytes());
988 keys.push(view_info_key.to_bytes());
989
990 Ok(keys)
991 }
992
993 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
996 let keys = self.view_info_keys(view_id, view_name)?;
997 let _ = self
998 .kv_backend
999 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1000 .await?;
1001 Ok(())
1002 }
1003
1004 pub async fn rename_table(
1008 &self,
1009 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1010 new_table_name: String,
1011 ) -> Result<()> {
1012 let current_table_info = ¤t_table_info_value.table_info;
1013 let table_id = current_table_info.ident.table_id;
1014
1015 let table_name_key = TableNameKey::new(
1016 ¤t_table_info.catalog_name,
1017 ¤t_table_info.schema_name,
1018 ¤t_table_info.name,
1019 );
1020
1021 let new_table_name_key = TableNameKey::new(
1022 ¤t_table_info.catalog_name,
1023 ¤t_table_info.schema_name,
1024 &new_table_name,
1025 );
1026
1027 let update_table_name_txn = self.table_name_manager().build_update_txn(
1029 &table_name_key,
1030 &new_table_name_key,
1031 table_id,
1032 )?;
1033
1034 let new_table_info_value = current_table_info_value
1035 .inner
1036 .with_update(move |table_info| {
1037 table_info.name = new_table_name;
1038 });
1039
1040 let (update_table_info_txn, on_update_table_info_failure) = self
1042 .table_info_manager()
1043 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1044
1045 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1046
1047 let mut r = self.kv_backend.txn(txn).await?;
1048
1049 if !r.succeeded {
1051 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1052 let remote_table_info = on_update_table_info_failure(&mut set)?
1053 .context(error::UnexpectedSnafu {
1054 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1055 })?
1056 .into_inner();
1057
1058 let op_name = "the renaming table metadata";
1059 ensure_values!(remote_table_info, new_table_info_value, op_name);
1060 }
1061
1062 Ok(())
1063 }
1064
1065 pub async fn update_table_info(
1069 &self,
1070 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1071 region_distribution: Option<RegionDistribution>,
1072 new_table_info: RawTableInfo,
1073 ) -> Result<()> {
1074 let table_id = current_table_info_value.table_info.ident.table_id;
1075 let new_table_info_value = current_table_info_value.update(new_table_info);
1076
1077 let (update_table_info_txn, on_update_table_info_failure) = self
1079 .table_info_manager()
1080 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1081
1082 let txn = if let Some(region_distribution) = region_distribution {
1083 let new_region_options = new_table_info_value.table_info.to_region_options();
1085 let update_datanode_table_options_txn = self
1086 .datanode_table_manager
1087 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1088 .await?;
1089 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1090 } else {
1091 update_table_info_txn
1092 };
1093
1094 let mut r = self.kv_backend.txn(txn).await?;
1095 if !r.succeeded {
1097 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1098 let remote_table_info = on_update_table_info_failure(&mut set)?
1099 .context(error::UnexpectedSnafu {
1100 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1101 })?
1102 .into_inner();
1103
1104 let op_name = "the updating table info";
1105 ensure_values!(remote_table_info, new_table_info_value, op_name);
1106 }
1107 Ok(())
1108 }
1109
1110 #[allow(clippy::too_many_arguments)]
1121 pub async fn update_view_info(
1122 &self,
1123 view_id: TableId,
1124 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1125 new_view_info: Vec<u8>,
1126 table_names: HashSet<TableName>,
1127 columns: Vec<String>,
1128 plan_columns: Vec<String>,
1129 definition: String,
1130 ) -> Result<()> {
1131 let new_view_info_value = current_view_info_value.update(
1132 new_view_info,
1133 table_names,
1134 columns,
1135 plan_columns,
1136 definition,
1137 );
1138
1139 let (update_view_info_txn, on_update_view_info_failure) = self
1141 .view_info_manager()
1142 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1143
1144 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1145
1146 if !r.succeeded {
1148 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1149 let remote_view_info = on_update_view_info_failure(&mut set)?
1150 .context(error::UnexpectedSnafu {
1151 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1152 })?
1153 .into_inner();
1154
1155 let op_name = "the updating view info";
1156 ensure_values!(remote_view_info, new_view_info_value, op_name);
1157 }
1158 Ok(())
1159 }
1160
1161 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1162 self.kv_backend.max_txn_ops()
1163 }
1164
1165 pub async fn batch_update_table_info_values(
1166 &self,
1167 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1168 ) -> Result<()> {
1169 let len = table_info_value_pairs.len();
1170 let mut txns = Vec::with_capacity(len);
1171 struct OnFailure<F, R>
1172 where
1173 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1174 {
1175 table_info_value: TableInfoValue,
1176 on_update_table_info_failure: F,
1177 }
1178 let mut on_failures = Vec::with_capacity(len);
1179
1180 for (table_info_value, new_table_info) in table_info_value_pairs {
1181 let table_id = table_info_value.table_info.ident.table_id;
1182
1183 let new_table_info_value = table_info_value.update(new_table_info);
1184
1185 let (update_table_info_txn, on_update_table_info_failure) =
1186 self.table_info_manager().build_update_txn(
1187 table_id,
1188 &table_info_value,
1189 &new_table_info_value,
1190 )?;
1191
1192 txns.push(update_table_info_txn);
1193
1194 on_failures.push(OnFailure {
1195 table_info_value: new_table_info_value,
1196 on_update_table_info_failure,
1197 });
1198 }
1199
1200 let txn = Txn::merge_all(txns);
1201 let mut r = self.kv_backend.txn(txn).await?;
1202
1203 if !r.succeeded {
1204 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1205 for on_failure in on_failures {
1206 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1207 .context(error::UnexpectedSnafu {
1208 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1209 })?
1210 .into_inner();
1211
1212 let op_name = "the batch updating table info";
1213 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1214 }
1215 }
1216
1217 Ok(())
1218 }
1219
1220 pub async fn update_table_route(
1221 &self,
1222 table_id: TableId,
1223 region_info: RegionInfo,
1224 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1225 new_region_routes: Vec<RegionRoute>,
1226 new_region_options: &HashMap<String, String>,
1227 new_region_wal_options: &HashMap<RegionNumber, String>,
1228 ) -> Result<()> {
1229 let current_region_distribution =
1231 region_distribution(current_table_route_value.region_routes()?);
1232 let new_region_distribution = region_distribution(&new_region_routes);
1233
1234 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1235 table_id,
1236 region_info,
1237 current_region_distribution,
1238 new_region_distribution,
1239 new_region_options,
1240 new_region_wal_options,
1241 )?;
1242
1243 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1245
1246 let (update_table_route_txn, on_update_table_route_failure) = self
1247 .table_route_manager()
1248 .table_route_storage()
1249 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1250
1251 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1252
1253 let mut r = self.kv_backend.txn(txn).await?;
1254
1255 if !r.succeeded {
1257 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1258 let remote_table_route = on_update_table_route_failure(&mut set)?
1259 .context(error::UnexpectedSnafu {
1260 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1261 })?
1262 .into_inner();
1263
1264 let op_name = "the updating table route";
1265 ensure_values!(remote_table_route, new_table_route_value, op_name);
1266 }
1267
1268 Ok(())
1269 }
1270
1271 pub async fn update_leader_region_status<F>(
1273 &self,
1274 table_id: TableId,
1275 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1276 next_region_route_status: F,
1277 ) -> Result<()>
1278 where
1279 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1280 {
1281 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1282
1283 let mut updated = 0;
1284 for route in &mut new_region_routes {
1285 if let Some(state) = next_region_route_status(route) {
1286 if route.set_leader_state(state) {
1287 updated += 1;
1288 }
1289 }
1290 }
1291
1292 if updated == 0 {
1293 warn!("No leader status updated");
1294 return Ok(());
1295 }
1296
1297 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1299
1300 let (update_table_route_txn, on_update_table_route_failure) = self
1301 .table_route_manager()
1302 .table_route_storage()
1303 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1304
1305 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1306
1307 if !r.succeeded {
1309 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1310 let remote_table_route = on_update_table_route_failure(&mut set)?
1311 .context(error::UnexpectedSnafu {
1312 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1313 })?
1314 .into_inner();
1315
1316 let op_name = "the updating leader region status";
1317 ensure_values!(remote_table_route, new_table_route_value, op_name);
1318 }
1319
1320 Ok(())
1321 }
1322}
1323
1324#[macro_export]
1325macro_rules! impl_metadata_value {
1326 ($($val_ty: ty), *) => {
1327 $(
1328 impl $crate::key::MetadataValue for $val_ty {
1329 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1330 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1331 }
1332
1333 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1334 serde_json::to_vec(self).context(SerdeJsonSnafu)
1335 }
1336 }
1337 )*
1338 }
1339}
1340
1341macro_rules! impl_metadata_key_get_txn_op {
1342 ($($key: ty), *) => {
1343 $(
1344 impl $crate::key::MetadataKeyGetTxnOp for $key {
1345 fn build_get_op(
1348 &self,
1349 ) -> (
1350 TxnOp,
1351 impl for<'a> FnMut(
1352 &'a mut TxnOpGetResponseSet,
1353 ) -> Option<Vec<u8>>,
1354 ) {
1355 let raw_key = self.to_bytes();
1356 (
1357 TxnOp::Get(raw_key.clone()),
1358 TxnOpGetResponseSet::filter(raw_key),
1359 )
1360 }
1361 }
1362 )*
1363 }
1364}
1365
1366impl_metadata_key_get_txn_op! {
1367 TableNameKey<'_>,
1368 TableInfoKey,
1369 ViewInfoKey,
1370 TableRouteKey,
1371 DatanodeTableKey
1372}
1373
1374#[macro_export]
1375macro_rules! impl_optional_metadata_value {
1376 ($($val_ty: ty), *) => {
1377 $(
1378 impl $val_ty {
1379 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1380 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1381 }
1382
1383 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1384 serde_json::to_vec(self).context(SerdeJsonSnafu)
1385 }
1386 }
1387 )*
1388 }
1389}
1390
1391impl_metadata_value! {
1392 TableNameValue,
1393 TableInfoValue,
1394 ViewInfoValue,
1395 DatanodeTableValue,
1396 FlowInfoValue,
1397 FlowNameValue,
1398 FlowRouteValue,
1399 TableFlowValue,
1400 NodeAddressValue,
1401 SchemaNameValue,
1402 FlowStateValue,
1403 PoisonValue
1404}
1405
1406impl_optional_metadata_value! {
1407 CatalogNameValue,
1408 SchemaNameValue
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413 use std::collections::{BTreeMap, HashMap, HashSet};
1414 use std::sync::Arc;
1415
1416 use bytes::Bytes;
1417 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1418 use common_time::util::current_time_millis;
1419 use common_wal::options::{KafkaWalOptions, WalOptions};
1420 use futures::TryStreamExt;
1421 use store_api::storage::{RegionId, RegionNumber};
1422 use table::metadata::{RawTableInfo, TableInfo};
1423 use table::table_name::TableName;
1424
1425 use super::datanode_table::DatanodeTableKey;
1426 use super::test_utils;
1427 use crate::ddl::test_util::create_table::test_create_table_task;
1428 use crate::ddl::utils::region_storage_path;
1429 use crate::error::Result;
1430 use crate::key::datanode_table::RegionInfo;
1431 use crate::key::table_info::TableInfoValue;
1432 use crate::key::table_name::TableNameKey;
1433 use crate::key::table_route::TableRouteValue;
1434 use crate::key::{
1435 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1436 ViewInfoValue, TOPIC_REGION_PREFIX,
1437 };
1438 use crate::kv_backend::memory::MemoryKvBackend;
1439 use crate::kv_backend::KvBackend;
1440 use crate::peer::Peer;
1441 use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1442 use crate::rpc::store::RangeRequest;
1443 use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1444
1445 #[test]
1446 fn test_deserialized_value_with_bytes() {
1447 let region_route = new_test_region_route();
1448 let region_routes = vec![region_route.clone()];
1449
1450 let expected_region_routes =
1451 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1452 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1453
1454 let value = DeserializedValueWithBytes {
1457 inner: TableRouteValue::physical(region_routes.clone()),
1459 bytes: Bytes::from(expected.clone()),
1460 };
1461
1462 let encoded = serde_json::to_vec(&value).unwrap();
1463
1464 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1467 serde_json::from_slice(&encoded).unwrap();
1468
1469 assert_eq!(decoded.inner, expected_region_routes);
1470 assert_eq!(decoded.bytes, expected);
1471 }
1472
1473 fn new_test_region_route() -> RegionRoute {
1474 new_region_route(1, 2)
1475 }
1476
1477 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1478 RegionRoute {
1479 region: Region {
1480 id: region_id.into(),
1481 name: "r1".to_string(),
1482 partition: None,
1483 attrs: BTreeMap::new(),
1484 },
1485 leader_peer: Some(Peer::new(datanode, "a2")),
1486 follower_peers: vec![],
1487 leader_state: None,
1488 leader_down_since: None,
1489 }
1490 }
1491
1492 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1493 test_utils::new_test_table_info(10, region_numbers)
1494 }
1495
1496 fn new_test_table_names() -> HashSet<TableName> {
1497 let mut set = HashSet::new();
1498 set.insert(TableName {
1499 catalog_name: "greptime".to_string(),
1500 schema_name: "public".to_string(),
1501 table_name: "a_table".to_string(),
1502 });
1503 set.insert(TableName {
1504 catalog_name: "greptime".to_string(),
1505 schema_name: "public".to_string(),
1506 table_name: "b_table".to_string(),
1507 });
1508 set
1509 }
1510
1511 async fn create_physical_table_metadata(
1512 table_metadata_manager: &TableMetadataManager,
1513 table_info: RawTableInfo,
1514 region_routes: Vec<RegionRoute>,
1515 region_wal_options: HashMap<RegionNumber, String>,
1516 ) -> Result<()> {
1517 table_metadata_manager
1518 .create_table_metadata(
1519 table_info,
1520 TableRouteValue::physical(region_routes),
1521 region_wal_options,
1522 )
1523 .await
1524 }
1525
1526 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1527 let topics = (0..2)
1528 .map(|i| format!("greptimedb_topic{}", i))
1529 .collect::<Vec<_>>();
1530 let wal_options = topics
1531 .iter()
1532 .map(|topic| {
1533 WalOptions::Kafka(KafkaWalOptions {
1534 topic: topic.clone(),
1535 })
1536 })
1537 .collect::<Vec<_>>();
1538
1539 (0..16)
1540 .enumerate()
1541 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1542 .collect()
1543 }
1544
1545 #[tokio::test]
1546 async fn test_raft_engine_topic_region_map() {
1547 let mem_kv = Arc::new(MemoryKvBackend::default());
1548 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1549 let region_route = new_test_region_route();
1550 let region_routes = &vec![region_route.clone()];
1551 let table_info: RawTableInfo =
1552 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1553 let wal_allocator = WalOptionsAllocator::RaftEngine;
1554 let regions = (0..16).collect();
1555 let region_wal_options =
1556 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1557 create_physical_table_metadata(
1558 &table_metadata_manager,
1559 table_info.clone(),
1560 region_routes.clone(),
1561 region_wal_options.clone(),
1562 )
1563 .await
1564 .unwrap();
1565
1566 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1567 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1568 let resp = mem_kv.range(range_req).await.unwrap();
1569 assert!(resp.kvs.is_empty());
1571 }
1572
1573 #[tokio::test]
1574 async fn test_create_table_metadata() {
1575 let mem_kv = Arc::new(MemoryKvBackend::default());
1576 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1577 let region_route = new_test_region_route();
1578 let region_routes = &vec![region_route.clone()];
1579 let table_info: RawTableInfo =
1580 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1581 let region_wal_options = create_mock_region_wal_options()
1582 .into_iter()
1583 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1584 .collect::<HashMap<_, _>>();
1585
1586 create_physical_table_metadata(
1588 &table_metadata_manager,
1589 table_info.clone(),
1590 region_routes.clone(),
1591 region_wal_options.clone(),
1592 )
1593 .await
1594 .unwrap();
1595
1596 assert!(create_physical_table_metadata(
1598 &table_metadata_manager,
1599 table_info.clone(),
1600 region_routes.clone(),
1601 region_wal_options.clone(),
1602 )
1603 .await
1604 .is_ok());
1605
1606 let mut modified_region_routes = region_routes.clone();
1607 modified_region_routes.push(region_route.clone());
1608 assert!(create_physical_table_metadata(
1610 &table_metadata_manager,
1611 table_info.clone(),
1612 modified_region_routes,
1613 region_wal_options.clone(),
1614 )
1615 .await
1616 .is_err());
1617
1618 let (remote_table_info, remote_table_route) = table_metadata_manager
1619 .get_full_table_info(10)
1620 .await
1621 .unwrap();
1622
1623 assert_eq!(
1624 remote_table_info.unwrap().into_inner().table_info,
1625 table_info
1626 );
1627 assert_eq!(
1628 remote_table_route
1629 .unwrap()
1630 .into_inner()
1631 .region_routes()
1632 .unwrap(),
1633 region_routes
1634 );
1635
1636 for i in 0..2 {
1637 let region_number = i as u32;
1638 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1639 let topic = format!("greptimedb_topic{}", i);
1640 let regions = table_metadata_manager
1641 .topic_region_manager
1642 .regions(&topic)
1643 .await
1644 .unwrap();
1645 assert_eq!(regions.len(), 8);
1646 assert_eq!(regions[0], region_id);
1647 }
1648 }
1649
1650 #[tokio::test]
1651 async fn test_create_logic_tables_metadata() {
1652 let mem_kv = Arc::new(MemoryKvBackend::default());
1653 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1654 let region_route = new_test_region_route();
1655 let region_routes = vec![region_route.clone()];
1656 let table_info: RawTableInfo =
1657 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1658 let table_id = table_info.ident.table_id;
1659 let table_route_value = TableRouteValue::physical(region_routes.clone());
1660
1661 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1662 table_metadata_manager
1664 .create_logical_tables_metadata(tables_data.clone())
1665 .await
1666 .unwrap();
1667
1668 assert!(table_metadata_manager
1670 .create_logical_tables_metadata(tables_data)
1671 .await
1672 .is_ok());
1673
1674 let mut modified_region_routes = region_routes.clone();
1675 modified_region_routes.push(new_region_route(2, 3));
1676 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1677 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1678 assert!(table_metadata_manager
1680 .create_logical_tables_metadata(modified_tables_data)
1681 .await
1682 .is_err());
1683
1684 let (remote_table_info, remote_table_route) = table_metadata_manager
1685 .get_full_table_info(table_id)
1686 .await
1687 .unwrap();
1688
1689 assert_eq!(
1690 remote_table_info.unwrap().into_inner().table_info,
1691 table_info
1692 );
1693 assert_eq!(
1694 remote_table_route
1695 .unwrap()
1696 .into_inner()
1697 .region_routes()
1698 .unwrap(),
1699 ®ion_routes
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_create_many_logical_tables_metadata() {
1705 let kv_backend = Arc::new(MemoryKvBackend::default());
1706 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1707
1708 let mut tables_data = vec![];
1709 for i in 0..128 {
1710 let table_id = i + 1;
1711 let regin_number = table_id * 3;
1712 let region_id = RegionId::new(table_id, regin_number);
1713 let region_route = new_region_route(region_id.as_u64(), 2);
1714 let region_routes = vec![region_route.clone()];
1715 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1716 table_id,
1717 &format!("my_table_{}", table_id),
1718 region_routes.iter().map(|r| r.region.id.region_number()),
1719 )
1720 .into();
1721 let table_route_value = TableRouteValue::physical(region_routes.clone());
1722
1723 tables_data.push((table_info, table_route_value));
1724 }
1725
1726 table_metadata_manager
1728 .create_logical_tables_metadata(tables_data)
1729 .await
1730 .unwrap();
1731 }
1732
1733 #[tokio::test]
1734 async fn test_delete_table_metadata() {
1735 let mem_kv = Arc::new(MemoryKvBackend::default());
1736 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1737 let region_route = new_test_region_route();
1738 let region_routes = &vec![region_route.clone()];
1739 let table_info: RawTableInfo =
1740 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1741 let table_id = table_info.ident.table_id;
1742 let datanode_id = 2;
1743 let region_wal_options = create_mock_region_wal_options();
1744 let serialized_region_wal_options = region_wal_options
1745 .iter()
1746 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1747 .collect::<HashMap<_, _>>();
1748
1749 create_physical_table_metadata(
1751 &table_metadata_manager,
1752 table_info.clone(),
1753 region_routes.clone(),
1754 serialized_region_wal_options,
1755 )
1756 .await
1757 .unwrap();
1758
1759 let table_name = TableName::new(
1760 table_info.catalog_name,
1761 table_info.schema_name,
1762 table_info.name,
1763 );
1764 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1765 table_metadata_manager
1767 .delete_table_metadata(
1768 table_id,
1769 &table_name,
1770 table_route_value,
1771 ®ion_wal_options,
1772 )
1773 .await
1774 .unwrap();
1775 table_metadata_manager
1777 .delete_table_metadata(
1778 table_id,
1779 &table_name,
1780 table_route_value,
1781 ®ion_wal_options,
1782 )
1783 .await
1784 .unwrap();
1785 assert!(table_metadata_manager
1786 .table_info_manager()
1787 .get(table_id)
1788 .await
1789 .unwrap()
1790 .is_none());
1791 assert!(table_metadata_manager
1792 .table_route_manager()
1793 .table_route_storage()
1794 .get(table_id)
1795 .await
1796 .unwrap()
1797 .is_none());
1798 assert!(table_metadata_manager
1799 .datanode_table_manager()
1800 .tables(datanode_id)
1801 .try_collect::<Vec<_>>()
1802 .await
1803 .unwrap()
1804 .is_empty());
1805 let table_info = table_metadata_manager
1807 .table_info_manager()
1808 .get(table_id)
1809 .await
1810 .unwrap();
1811 assert!(table_info.is_none());
1812 let table_route = table_metadata_manager
1813 .table_route_manager()
1814 .table_route_storage()
1815 .get(table_id)
1816 .await
1817 .unwrap();
1818 assert!(table_route.is_none());
1819 let regions = table_metadata_manager
1821 .topic_region_manager
1822 .regions("greptimedb_topic0")
1823 .await
1824 .unwrap();
1825 assert_eq!(regions.len(), 0);
1826 let regions = table_metadata_manager
1827 .topic_region_manager
1828 .regions("greptimedb_topic1")
1829 .await
1830 .unwrap();
1831 assert_eq!(regions.len(), 0);
1832 }
1833
1834 #[tokio::test]
1835 async fn test_rename_table() {
1836 let mem_kv = Arc::new(MemoryKvBackend::default());
1837 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1838 let region_route = new_test_region_route();
1839 let region_routes = vec![region_route.clone()];
1840 let table_info: RawTableInfo =
1841 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1842 let table_id = table_info.ident.table_id;
1843 create_physical_table_metadata(
1845 &table_metadata_manager,
1846 table_info.clone(),
1847 region_routes.clone(),
1848 HashMap::new(),
1849 )
1850 .await
1851 .unwrap();
1852
1853 let new_table_name = "another_name".to_string();
1854 let table_info_value =
1855 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1856
1857 table_metadata_manager
1858 .rename_table(&table_info_value, new_table_name.clone())
1859 .await
1860 .unwrap();
1861 table_metadata_manager
1863 .rename_table(&table_info_value, new_table_name.clone())
1864 .await
1865 .unwrap();
1866 let mut modified_table_info = table_info.clone();
1867 modified_table_info.name = "hi".to_string();
1868 let modified_table_info_value =
1869 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1870 assert!(table_metadata_manager
1873 .rename_table(&modified_table_info_value, new_table_name.clone())
1874 .await
1875 .is_err());
1876
1877 let old_table_name = TableNameKey::new(
1878 &table_info.catalog_name,
1879 &table_info.schema_name,
1880 &table_info.name,
1881 );
1882 let new_table_name = TableNameKey::new(
1883 &table_info.catalog_name,
1884 &table_info.schema_name,
1885 &new_table_name,
1886 );
1887
1888 assert!(table_metadata_manager
1889 .table_name_manager()
1890 .get(old_table_name)
1891 .await
1892 .unwrap()
1893 .is_none());
1894
1895 assert_eq!(
1896 table_metadata_manager
1897 .table_name_manager()
1898 .get(new_table_name)
1899 .await
1900 .unwrap()
1901 .unwrap()
1902 .table_id(),
1903 table_id
1904 );
1905 }
1906
1907 #[tokio::test]
1908 async fn test_update_table_info() {
1909 let mem_kv = Arc::new(MemoryKvBackend::default());
1910 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1911 let region_route = new_test_region_route();
1912 let region_routes = vec![region_route.clone()];
1913 let table_info: RawTableInfo =
1914 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1915 let table_id = table_info.ident.table_id;
1916 create_physical_table_metadata(
1918 &table_metadata_manager,
1919 table_info.clone(),
1920 region_routes.clone(),
1921 HashMap::new(),
1922 )
1923 .await
1924 .unwrap();
1925
1926 let mut new_table_info = table_info.clone();
1927 new_table_info.name = "hi".to_string();
1928 let current_table_info_value =
1929 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1930 table_metadata_manager
1932 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1933 .await
1934 .unwrap();
1935 table_metadata_manager
1937 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1938 .await
1939 .unwrap();
1940
1941 let updated_table_info = table_metadata_manager
1943 .table_info_manager()
1944 .get(table_id)
1945 .await
1946 .unwrap()
1947 .unwrap()
1948 .into_inner();
1949 assert_eq!(updated_table_info.table_info, new_table_info);
1950
1951 let mut wrong_table_info = table_info.clone();
1952 wrong_table_info.name = "wrong".to_string();
1953 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1954 current_table_info_value.update(wrong_table_info),
1955 );
1956 assert!(table_metadata_manager
1959 .update_table_info(&wrong_table_info_value, None, new_table_info)
1960 .await
1961 .is_err())
1962 }
1963
1964 #[tokio::test]
1965 async fn test_update_table_leader_region_status() {
1966 let mem_kv = Arc::new(MemoryKvBackend::default());
1967 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1968 let datanode = 1;
1969 let region_routes = vec![
1970 RegionRoute {
1971 region: Region {
1972 id: 1.into(),
1973 name: "r1".to_string(),
1974 partition: None,
1975 attrs: BTreeMap::new(),
1976 },
1977 leader_peer: Some(Peer::new(datanode, "a2")),
1978 leader_state: Some(LeaderState::Downgrading),
1979 follower_peers: vec![],
1980 leader_down_since: Some(current_time_millis()),
1981 },
1982 RegionRoute {
1983 region: Region {
1984 id: 2.into(),
1985 name: "r2".to_string(),
1986 partition: None,
1987 attrs: BTreeMap::new(),
1988 },
1989 leader_peer: Some(Peer::new(datanode, "a1")),
1990 leader_state: None,
1991 follower_peers: vec![],
1992 leader_down_since: None,
1993 },
1994 ];
1995 let table_info: RawTableInfo =
1996 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1997 let table_id = table_info.ident.table_id;
1998 let current_table_route_value = DeserializedValueWithBytes::from_inner(
1999 TableRouteValue::physical(region_routes.clone()),
2000 );
2001
2002 create_physical_table_metadata(
2004 &table_metadata_manager,
2005 table_info.clone(),
2006 region_routes.clone(),
2007 HashMap::new(),
2008 )
2009 .await
2010 .unwrap();
2011
2012 table_metadata_manager
2013 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2014 if region_route.leader_state.is_some() {
2015 None
2016 } else {
2017 Some(Some(LeaderState::Downgrading))
2018 }
2019 })
2020 .await
2021 .unwrap();
2022
2023 let updated_route_value = table_metadata_manager
2024 .table_route_manager()
2025 .table_route_storage()
2026 .get(table_id)
2027 .await
2028 .unwrap()
2029 .unwrap();
2030
2031 assert_eq!(
2032 updated_route_value.region_routes().unwrap()[0].leader_state,
2033 Some(LeaderState::Downgrading)
2034 );
2035
2036 assert!(updated_route_value.region_routes().unwrap()[0]
2037 .leader_down_since
2038 .is_some());
2039
2040 assert_eq!(
2041 updated_route_value.region_routes().unwrap()[1].leader_state,
2042 Some(LeaderState::Downgrading)
2043 );
2044 assert!(updated_route_value.region_routes().unwrap()[1]
2045 .leader_down_since
2046 .is_some());
2047 }
2048
2049 async fn assert_datanode_table(
2050 table_metadata_manager: &TableMetadataManager,
2051 table_id: u32,
2052 region_routes: &[RegionRoute],
2053 ) {
2054 let region_distribution = region_distribution(region_routes);
2055 for (datanode, regions) in region_distribution {
2056 let got = table_metadata_manager
2057 .datanode_table_manager()
2058 .get(&DatanodeTableKey::new(datanode, table_id))
2059 .await
2060 .unwrap()
2061 .unwrap();
2062
2063 assert_eq!(got.regions, regions.leader_regions);
2064 assert_eq!(got.follower_regions, regions.follower_regions);
2065 }
2066 }
2067
2068 #[tokio::test]
2069 async fn test_update_table_route() {
2070 let mem_kv = Arc::new(MemoryKvBackend::default());
2071 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2072 let region_route = new_test_region_route();
2073 let region_routes = vec![region_route.clone()];
2074 let table_info: RawTableInfo =
2075 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2076 let table_id = table_info.ident.table_id;
2077 let engine = table_info.meta.engine.as_str();
2078 let region_storage_path =
2079 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2080 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2081 TableRouteValue::physical(region_routes.clone()),
2082 );
2083
2084 create_physical_table_metadata(
2086 &table_metadata_manager,
2087 table_info.clone(),
2088 region_routes.clone(),
2089 HashMap::new(),
2090 )
2091 .await
2092 .unwrap();
2093
2094 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2095 let new_region_routes = vec![
2096 new_region_route(1, 1),
2097 new_region_route(2, 2),
2098 new_region_route(3, 3),
2099 ];
2100 table_metadata_manager
2102 .update_table_route(
2103 table_id,
2104 RegionInfo {
2105 engine: engine.to_string(),
2106 region_storage_path: region_storage_path.to_string(),
2107 region_options: HashMap::new(),
2108 region_wal_options: HashMap::new(),
2109 },
2110 ¤t_table_route_value,
2111 new_region_routes.clone(),
2112 &HashMap::new(),
2113 &HashMap::new(),
2114 )
2115 .await
2116 .unwrap();
2117 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2118
2119 table_metadata_manager
2121 .update_table_route(
2122 table_id,
2123 RegionInfo {
2124 engine: engine.to_string(),
2125 region_storage_path: region_storage_path.to_string(),
2126 region_options: HashMap::new(),
2127 region_wal_options: HashMap::new(),
2128 },
2129 ¤t_table_route_value,
2130 new_region_routes.clone(),
2131 &HashMap::new(),
2132 &HashMap::new(),
2133 )
2134 .await
2135 .unwrap();
2136
2137 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2138 current_table_route_value
2139 .inner
2140 .update(new_region_routes.clone())
2141 .unwrap(),
2142 );
2143 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2144 table_metadata_manager
2146 .update_table_route(
2147 table_id,
2148 RegionInfo {
2149 engine: engine.to_string(),
2150 region_storage_path: region_storage_path.to_string(),
2151 region_options: HashMap::new(),
2152 region_wal_options: HashMap::new(),
2153 },
2154 ¤t_table_route_value,
2155 new_region_routes.clone(),
2156 &HashMap::new(),
2157 &HashMap::new(),
2158 )
2159 .await
2160 .unwrap();
2161 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2162
2163 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2166 current_table_route_value
2167 .update(vec![
2168 new_region_route(1, 1),
2169 new_region_route(2, 2),
2170 new_region_route(3, 3),
2171 new_region_route(4, 4),
2172 ])
2173 .unwrap(),
2174 );
2175 assert!(table_metadata_manager
2176 .update_table_route(
2177 table_id,
2178 RegionInfo {
2179 engine: engine.to_string(),
2180 region_storage_path: region_storage_path.to_string(),
2181 region_options: HashMap::new(),
2182 region_wal_options: HashMap::new(),
2183 },
2184 &wrong_table_route_value,
2185 new_region_routes,
2186 &HashMap::new(),
2187 &HashMap::new(),
2188 )
2189 .await
2190 .is_err());
2191 }
2192
2193 #[tokio::test]
2194 async fn test_destroy_table_metadata() {
2195 let mem_kv = Arc::new(MemoryKvBackend::default());
2196 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2197 let table_id = 1025;
2198 let table_name = "foo";
2199 let task = test_create_table_task(table_name, table_id);
2200 let options = create_mock_region_wal_options();
2201 let serialized_options = options
2202 .iter()
2203 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2204 .collect::<HashMap<_, _>>();
2205 table_metadata_manager
2206 .create_table_metadata(
2207 task.table_info,
2208 TableRouteValue::physical(vec![
2209 RegionRoute {
2210 region: Region::new_test(RegionId::new(table_id, 1)),
2211 leader_peer: Some(Peer::empty(1)),
2212 follower_peers: vec![Peer::empty(5)],
2213 leader_state: None,
2214 leader_down_since: None,
2215 },
2216 RegionRoute {
2217 region: Region::new_test(RegionId::new(table_id, 2)),
2218 leader_peer: Some(Peer::empty(2)),
2219 follower_peers: vec![Peer::empty(4)],
2220 leader_state: None,
2221 leader_down_since: None,
2222 },
2223 RegionRoute {
2224 region: Region::new_test(RegionId::new(table_id, 3)),
2225 leader_peer: Some(Peer::empty(3)),
2226 follower_peers: vec![],
2227 leader_state: None,
2228 leader_down_since: None,
2229 },
2230 ]),
2231 serialized_options,
2232 )
2233 .await
2234 .unwrap();
2235 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2236 let table_route_value = table_metadata_manager
2237 .table_route_manager
2238 .table_route_storage()
2239 .get_with_raw_bytes(table_id)
2240 .await
2241 .unwrap()
2242 .unwrap();
2243 table_metadata_manager
2244 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2245 .await
2246 .unwrap();
2247 assert!(mem_kv.is_empty());
2248 }
2249
2250 #[tokio::test]
2251 async fn test_restore_table_metadata() {
2252 let mem_kv = Arc::new(MemoryKvBackend::default());
2253 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2254 let table_id = 1025;
2255 let table_name = "foo";
2256 let task = test_create_table_task(table_name, table_id);
2257 let options = create_mock_region_wal_options();
2258 let serialized_options = options
2259 .iter()
2260 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2261 .collect::<HashMap<_, _>>();
2262 table_metadata_manager
2263 .create_table_metadata(
2264 task.table_info,
2265 TableRouteValue::physical(vec![
2266 RegionRoute {
2267 region: Region::new_test(RegionId::new(table_id, 1)),
2268 leader_peer: Some(Peer::empty(1)),
2269 follower_peers: vec![Peer::empty(5)],
2270 leader_state: None,
2271 leader_down_since: None,
2272 },
2273 RegionRoute {
2274 region: Region::new_test(RegionId::new(table_id, 2)),
2275 leader_peer: Some(Peer::empty(2)),
2276 follower_peers: vec![Peer::empty(4)],
2277 leader_state: None,
2278 leader_down_since: None,
2279 },
2280 RegionRoute {
2281 region: Region::new_test(RegionId::new(table_id, 3)),
2282 leader_peer: Some(Peer::empty(3)),
2283 follower_peers: vec![],
2284 leader_state: None,
2285 leader_down_since: None,
2286 },
2287 ]),
2288 serialized_options,
2289 )
2290 .await
2291 .unwrap();
2292 let expected_result = mem_kv.dump();
2293 let table_route_value = table_metadata_manager
2294 .table_route_manager
2295 .table_route_storage()
2296 .get_with_raw_bytes(table_id)
2297 .await
2298 .unwrap()
2299 .unwrap();
2300 let region_routes = table_route_value.region_routes().unwrap();
2301 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2302 let table_route_value = TableRouteValue::physical(region_routes.clone());
2303 table_metadata_manager
2304 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2305 .await
2306 .unwrap();
2307 table_metadata_manager
2308 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2309 .await
2310 .unwrap();
2311 let kvs = mem_kv.dump();
2312 assert_eq!(kvs, expected_result);
2313 table_metadata_manager
2315 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2316 .await
2317 .unwrap();
2318 let kvs = mem_kv.dump();
2319 assert_eq!(kvs, expected_result);
2320 }
2321
2322 #[tokio::test]
2323 async fn test_create_update_view_info() {
2324 let mem_kv = Arc::new(MemoryKvBackend::default());
2325 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2326
2327 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2328
2329 let view_id = view_info.ident.table_id;
2330
2331 let logical_plan: Vec<u8> = vec![1, 2, 3];
2332 let columns = vec!["a".to_string()];
2333 let plan_columns = vec!["number".to_string()];
2334 let table_names = new_test_table_names();
2335 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2336
2337 table_metadata_manager
2339 .create_view_metadata(
2340 view_info.clone(),
2341 logical_plan.clone(),
2342 table_names.clone(),
2343 columns.clone(),
2344 plan_columns.clone(),
2345 definition.to_string(),
2346 )
2347 .await
2348 .unwrap();
2349
2350 {
2351 let current_view_info = table_metadata_manager
2353 .view_info_manager()
2354 .get(view_id)
2355 .await
2356 .unwrap()
2357 .unwrap()
2358 .into_inner();
2359 assert_eq!(current_view_info.view_info, logical_plan);
2360 assert_eq!(current_view_info.table_names, table_names);
2361 assert_eq!(current_view_info.definition, definition);
2362 assert_eq!(current_view_info.columns, columns);
2363 assert_eq!(current_view_info.plan_columns, plan_columns);
2364 let current_table_info = table_metadata_manager
2366 .table_info_manager()
2367 .get(view_id)
2368 .await
2369 .unwrap()
2370 .unwrap()
2371 .into_inner();
2372 assert_eq!(current_table_info.table_info, view_info);
2373 }
2374
2375 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2376 let new_table_names = {
2377 let mut set = HashSet::new();
2378 set.insert(TableName {
2379 catalog_name: "greptime".to_string(),
2380 schema_name: "public".to_string(),
2381 table_name: "b_table".to_string(),
2382 });
2383 set.insert(TableName {
2384 catalog_name: "greptime".to_string(),
2385 schema_name: "public".to_string(),
2386 table_name: "c_table".to_string(),
2387 });
2388 set
2389 };
2390 let new_columns = vec!["b".to_string()];
2391 let new_plan_columns = vec!["number2".to_string()];
2392 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2393
2394 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2395 logical_plan.clone(),
2396 table_names,
2397 columns,
2398 plan_columns,
2399 definition.to_string(),
2400 ));
2401 table_metadata_manager
2403 .update_view_info(
2404 view_id,
2405 ¤t_view_info_value,
2406 new_logical_plan.clone(),
2407 new_table_names.clone(),
2408 new_columns.clone(),
2409 new_plan_columns.clone(),
2410 new_definition.to_string(),
2411 )
2412 .await
2413 .unwrap();
2414 table_metadata_manager
2416 .update_view_info(
2417 view_id,
2418 ¤t_view_info_value,
2419 new_logical_plan.clone(),
2420 new_table_names.clone(),
2421 new_columns.clone(),
2422 new_plan_columns.clone(),
2423 new_definition.to_string(),
2424 )
2425 .await
2426 .unwrap();
2427
2428 let updated_view_info = table_metadata_manager
2430 .view_info_manager()
2431 .get(view_id)
2432 .await
2433 .unwrap()
2434 .unwrap()
2435 .into_inner();
2436 assert_eq!(updated_view_info.view_info, new_logical_plan);
2437 assert_eq!(updated_view_info.table_names, new_table_names);
2438 assert_eq!(updated_view_info.definition, new_definition);
2439 assert_eq!(updated_view_info.columns, new_columns);
2440 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2441
2442 let wrong_view_info = logical_plan.clone();
2443 let wrong_definition = "wrong_definition";
2444 let wrong_view_info_value =
2445 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2446 wrong_view_info,
2447 new_table_names.clone(),
2448 new_columns.clone(),
2449 new_plan_columns.clone(),
2450 wrong_definition.to_string(),
2451 ));
2452 assert!(table_metadata_manager
2455 .update_view_info(
2456 view_id,
2457 &wrong_view_info_value,
2458 new_logical_plan.clone(),
2459 new_table_names.clone(),
2460 vec!["c".to_string()],
2461 vec!["number3".to_string()],
2462 wrong_definition.to_string(),
2463 )
2464 .await
2465 .is_err());
2466
2467 let current_view_info = table_metadata_manager
2469 .view_info_manager()
2470 .get(view_id)
2471 .await
2472 .unwrap()
2473 .unwrap()
2474 .into_inner();
2475 assert_eq!(current_view_info.view_info, new_logical_plan);
2476 assert_eq!(current_view_info.table_names, new_table_names);
2477 assert_eq!(current_view_info.definition, new_definition);
2478 assert_eq!(current_view_info.columns, new_columns);
2479 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2480 }
2481
2482 #[test]
2483 fn test_region_role_set_deserialize() {
2484 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2485 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2486 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2487 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2488
2489 let s = r#"[1, 2, 3]"#;
2490 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2491 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2492 assert!(region_role_set.follower_regions.is_empty());
2493 }
2494
2495 #[test]
2496 fn test_region_distribution_deserialize() {
2497 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2498 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2499 assert_eq!(region_distribution.len(), 2);
2500 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2501 assert!(region_distribution[&1].follower_regions.is_empty());
2502 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2503 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2504 }
2505}