1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod node_address;
104pub mod runtime_switch;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112pub mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
168pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
169pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
170pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
171
172pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
173pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
174pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
175pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
176pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
177pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
178pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
179pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
180pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
181pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
183pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
184
185pub const ELECTION_KEY: &str = "__metasrv_election";
187pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
189
190pub const CACHE_KEY_PREFIXES: [&str; 5] = [
192 TABLE_NAME_KEY_PREFIX,
193 CATALOG_NAME_KEY_PREFIX,
194 SCHEMA_NAME_KEY_PREFIX,
195 TABLE_ROUTE_PREFIX,
196 NODE_ADDRESS_PREFIX,
197];
198
199#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
201pub struct RegionRoleSet {
202 pub leader_regions: Vec<RegionNumber>,
204 pub follower_regions: Vec<RegionNumber>,
206}
207
208impl<'de> Deserialize<'de> for RegionRoleSet {
209 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
210 where
211 D: serde::Deserializer<'de>,
212 {
213 #[derive(Deserialize)]
214 #[serde(untagged)]
215 enum RegionRoleSetOrLeaderOnly {
216 Full {
217 leader_regions: Vec<RegionNumber>,
218 follower_regions: Vec<RegionNumber>,
219 },
220 LeaderOnly(Vec<RegionNumber>),
221 }
222 match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
223 RegionRoleSetOrLeaderOnly::Full {
224 leader_regions,
225 follower_regions,
226 } => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
227 RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
228 Ok(RegionRoleSet::new(leader_regions, vec![]))
229 }
230 }
231 }
232}
233
234impl RegionRoleSet {
235 pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
237 Self {
238 leader_regions,
239 follower_regions,
240 }
241 }
242
243 pub fn add_leader_region(&mut self, region_number: RegionNumber) {
245 self.leader_regions.push(region_number);
246 }
247
248 pub fn add_follower_region(&mut self, region_number: RegionNumber) {
250 self.follower_regions.push(region_number);
251 }
252
253 pub fn sort(&mut self) {
255 self.follower_regions.sort();
256 self.leader_regions.sort();
257 }
258}
259
260pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
264
265pub type FlowId = u32;
267pub type FlowPartitionId = u32;
269
270lazy_static! {
271 pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
272}
273
274lazy_static! {
275 static ref TABLE_INFO_KEY_PATTERN: Regex =
276 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
277}
278
279lazy_static! {
280 static ref VIEW_INFO_KEY_PATTERN: Regex =
281 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
282}
283
284lazy_static! {
285 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
286 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
287}
288
289lazy_static! {
290 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
291 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
292}
293
294lazy_static! {
295 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
296 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
297 ))
298 .unwrap();
299}
300
301lazy_static! {
302 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
304 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
305 ))
306 .unwrap();
307}
308
309lazy_static! {
310 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
312 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
313 ))
314 .unwrap();
315}
316
317lazy_static! {
318 static ref NODE_ADDRESS_PATTERN: Regex =
319 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
320}
321
322lazy_static! {
323 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
324 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
325}
326
327lazy_static! {
328 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
329 "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
330 ))
331 .unwrap();
332}
333
334pub trait MetadataKey<'a, T> {
336 fn to_bytes(&self) -> Vec<u8>;
337
338 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
339}
340
341#[derive(Debug, Clone, PartialEq)]
342pub struct BytesAdapter(Vec<u8>);
343
344impl From<Vec<u8>> for BytesAdapter {
345 fn from(value: Vec<u8>) -> Self {
346 Self(value)
347 }
348}
349
350impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
351 fn to_bytes(&self) -> Vec<u8> {
352 self.0.clone()
353 }
354
355 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
356 Ok(BytesAdapter(bytes.to_vec()))
357 }
358}
359
360pub(crate) trait MetadataKeyGetTxnOp {
361 fn build_get_op(
362 &self,
363 ) -> (
364 TxnOp,
365 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
366 );
367}
368
369pub trait MetadataValue {
370 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
371 where
372 Self: Sized;
373
374 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
375}
376
377pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
378
379pub struct TableMetadataManager {
380 table_name_manager: TableNameManager,
381 table_info_manager: TableInfoManager,
382 view_info_manager: ViewInfoManager,
383 datanode_table_manager: DatanodeTableManager,
384 catalog_manager: CatalogManager,
385 schema_manager: SchemaManager,
386 table_route_manager: TableRouteManager,
387 tombstone_manager: TombstoneManager,
388 topic_name_manager: TopicNameManager,
389 topic_region_manager: TopicRegionManager,
390 kv_backend: KvBackendRef,
391}
392
393#[macro_export]
394macro_rules! ensure_values {
395 ($got:expr, $expected_value:expr, $name:expr) => {
396 ensure!(
397 $got == $expected_value,
398 error::UnexpectedSnafu {
399 err_msg: format!(
400 "Reads the different value: {:?} during {}, expected: {:?}",
401 $got, $name, $expected_value
402 )
403 }
404 );
405 };
406}
407
408pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
418 bytes: Bytes,
420 inner: T,
422}
423
424impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
425 type Target = T;
426
427 fn deref(&self) -> &Self::Target {
428 &self.inner
429 }
430}
431
432impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
433 fn deref_mut(&mut self) -> &mut Self::Target {
434 &mut self.inner
435 }
436}
437
438impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
439 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
440 write!(
441 f,
442 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
443 self.inner, self.bytes
444 )
445 }
446}
447
448impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
449 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
453 where
454 S: serde::Serializer,
455 {
456 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
459 }
460}
461
462impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
463 for DeserializedValueWithBytes<T>
464{
465 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
469 where
470 D: serde::Deserializer<'de>,
471 {
472 let buf = String::deserialize(deserializer)?;
473 let bytes = Bytes::from(buf);
474
475 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
476 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477
478 Ok(value)
479 }
480}
481
482impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
483 fn clone(&self) -> Self {
484 Self {
485 bytes: self.bytes.clone(),
486 inner: self.inner.clone(),
487 }
488 }
489}
490
491impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
492 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
495 let inner = T::try_from_raw_value(&bytes)?;
496 Ok(Self { bytes, inner })
497 }
498
499 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
502 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
503 }
504
505 pub fn into_inner(self) -> T {
506 self.inner
507 }
508
509 pub fn get_inner_ref(&self) -> &T {
510 &self.inner
511 }
512
513 pub fn get_raw_bytes(&self) -> Vec<u8> {
515 self.bytes.to_vec()
516 }
517
518 #[cfg(any(test, feature = "testing"))]
519 pub fn from_inner(inner: T) -> Self {
520 let bytes = serde_json::to_vec(&inner).unwrap();
521
522 Self {
523 bytes: Bytes::from(bytes),
524 inner,
525 }
526 }
527}
528
529impl TableMetadataManager {
530 pub fn new(kv_backend: KvBackendRef) -> Self {
531 TableMetadataManager {
532 table_name_manager: TableNameManager::new(kv_backend.clone()),
533 table_info_manager: TableInfoManager::new(kv_backend.clone()),
534 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
535 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
536 catalog_manager: CatalogManager::new(kv_backend.clone()),
537 schema_manager: SchemaManager::new(kv_backend.clone()),
538 table_route_manager: TableRouteManager::new(kv_backend.clone()),
539 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
540 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
541 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
542 kv_backend,
543 }
544 }
545
546 pub fn new_with_custom_tombstone_prefix(
548 kv_backend: KvBackendRef,
549 tombstone_prefix: &str,
550 ) -> Self {
551 Self {
552 table_name_manager: TableNameManager::new(kv_backend.clone()),
553 table_info_manager: TableInfoManager::new(kv_backend.clone()),
554 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
555 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
556 catalog_manager: CatalogManager::new(kv_backend.clone()),
557 schema_manager: SchemaManager::new(kv_backend.clone()),
558 table_route_manager: TableRouteManager::new(kv_backend.clone()),
559 tombstone_manager: TombstoneManager::new_with_prefix(
560 kv_backend.clone(),
561 tombstone_prefix,
562 ),
563 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
564 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
565 kv_backend,
566 }
567 }
568
569 pub async fn init(&self) -> Result<()> {
570 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
571
572 self.catalog_manager().create(catalog_name, true).await?;
573
574 let internal_schemas = [
575 DEFAULT_SCHEMA_NAME,
576 INFORMATION_SCHEMA_NAME,
577 DEFAULT_PRIVATE_SCHEMA_NAME,
578 ];
579
580 for schema_name in internal_schemas {
581 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
582
583 self.schema_manager().create(schema_key, None, true).await?;
584 }
585
586 Ok(())
587 }
588
589 pub fn table_name_manager(&self) -> &TableNameManager {
590 &self.table_name_manager
591 }
592
593 pub fn table_info_manager(&self) -> &TableInfoManager {
594 &self.table_info_manager
595 }
596
597 pub fn view_info_manager(&self) -> &ViewInfoManager {
598 &self.view_info_manager
599 }
600
601 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
602 &self.datanode_table_manager
603 }
604
605 pub fn catalog_manager(&self) -> &CatalogManager {
606 &self.catalog_manager
607 }
608
609 pub fn schema_manager(&self) -> &SchemaManager {
610 &self.schema_manager
611 }
612
613 pub fn table_route_manager(&self) -> &TableRouteManager {
614 &self.table_route_manager
615 }
616
617 pub fn topic_name_manager(&self) -> &TopicNameManager {
618 &self.topic_name_manager
619 }
620
621 pub fn topic_region_manager(&self) -> &TopicRegionManager {
622 &self.topic_region_manager
623 }
624
625 #[cfg(feature = "testing")]
626 pub fn kv_backend(&self) -> &KvBackendRef {
627 &self.kv_backend
628 }
629
630 pub async fn get_full_table_info(
631 &self,
632 table_id: TableId,
633 ) -> Result<(
634 Option<DeserializedValueWithBytes<TableInfoValue>>,
635 Option<DeserializedValueWithBytes<TableRouteValue>>,
636 )> {
637 let table_info_key = TableInfoKey::new(table_id);
638 let table_route_key = TableRouteKey::new(table_id);
639 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
640 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
641
642 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
643 let mut res = self.kv_backend.txn(txn).await?;
644 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
645 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
646 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
647 Ok((table_info_value, table_route_value))
648 }
649
650 pub async fn create_view_metadata(
660 &self,
661 view_info: RawTableInfo,
662 raw_logical_plan: Vec<u8>,
663 table_names: HashSet<TableName>,
664 columns: Vec<String>,
665 plan_columns: Vec<String>,
666 definition: String,
667 ) -> Result<()> {
668 let view_id = view_info.ident.table_id;
669
670 let view_name = TableNameKey::new(
672 &view_info.catalog_name,
673 &view_info.schema_name,
674 &view_info.name,
675 );
676 let create_table_name_txn = self
677 .table_name_manager()
678 .build_create_txn(&view_name, view_id)?;
679
680 let table_info_value = TableInfoValue::new(view_info);
682
683 let (create_table_info_txn, on_create_table_info_failure) = self
684 .table_info_manager()
685 .build_create_txn(view_id, &table_info_value)?;
686
687 let view_info_value = ViewInfoValue::new(
689 raw_logical_plan,
690 table_names,
691 columns,
692 plan_columns,
693 definition,
694 );
695 let (create_view_info_txn, on_create_view_info_failure) = self
696 .view_info_manager()
697 .build_create_txn(view_id, &view_info_value)?;
698
699 let txn = Txn::merge_all(vec![
700 create_table_name_txn,
701 create_table_info_txn,
702 create_view_info_txn,
703 ]);
704
705 let mut r = self.kv_backend.txn(txn).await?;
706
707 if !r.succeeded {
709 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
710 let remote_table_info = on_create_table_info_failure(&mut set)?
711 .context(error::UnexpectedSnafu {
712 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
713 })?
714 .into_inner();
715
716 let remote_view_info = on_create_view_info_failure(&mut set)?
717 .context(error::UnexpectedSnafu {
718 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
719 })?
720 .into_inner();
721
722 let op_name = "the creating view metadata";
723 ensure_values!(remote_table_info, table_info_value, op_name);
724 ensure_values!(remote_view_info, view_info_value, op_name);
725 }
726
727 Ok(())
728 }
729
730 pub async fn create_table_metadata(
733 &self,
734 mut table_info: RawTableInfo,
735 table_route_value: TableRouteValue,
736 region_wal_options: HashMap<RegionNumber, String>,
737 ) -> Result<()> {
738 let region_numbers = table_route_value.region_numbers();
739 table_info.meta.region_numbers = region_numbers;
740 let table_id = table_info.ident.table_id;
741 let engine = table_info.meta.engine.clone();
742
743 let table_name = TableNameKey::new(
745 &table_info.catalog_name,
746 &table_info.schema_name,
747 &table_info.name,
748 );
749 let create_table_name_txn = self
750 .table_name_manager()
751 .build_create_txn(&table_name, table_id)?;
752
753 let region_options = table_info.to_region_options();
754 let table_info_value = TableInfoValue::new(table_info);
756 let (create_table_info_txn, on_create_table_info_failure) = self
757 .table_info_manager()
758 .build_create_txn(table_id, &table_info_value)?;
759
760 let (create_table_route_txn, on_create_table_route_failure) = self
761 .table_route_manager()
762 .table_route_storage()
763 .build_create_txn(table_id, &table_route_value)?;
764
765 let create_topic_region_txn = self
766 .topic_region_manager
767 .build_create_txn(table_id, ®ion_wal_options)?;
768
769 let mut txn = Txn::merge_all(vec![
770 create_table_name_txn,
771 create_table_info_txn,
772 create_table_route_txn,
773 create_topic_region_txn,
774 ]);
775
776 if let TableRouteValue::Physical(x) = &table_route_value {
777 let region_storage_path = table_info_value.region_storage_path();
778 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
779 table_id,
780 &engine,
781 ®ion_storage_path,
782 region_options,
783 region_wal_options,
784 region_distribution(&x.region_routes),
785 )?;
786 txn = txn.merge(create_datanode_table_txn);
787 }
788
789 let mut r = self.kv_backend.txn(txn).await?;
790
791 if !r.succeeded {
793 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
794 let remote_table_info = on_create_table_info_failure(&mut set)?
795 .context(error::UnexpectedSnafu {
796 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
797 })?
798 .into_inner();
799
800 let remote_table_route = on_create_table_route_failure(&mut set)?
801 .context(error::UnexpectedSnafu {
802 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
803 })?
804 .into_inner();
805
806 let op_name = "the creating table metadata";
807 ensure_values!(remote_table_info, table_info_value, op_name);
808 ensure_values!(remote_table_route, table_route_value, op_name);
809 }
810
811 Ok(())
812 }
813
814 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
815 self.kv_backend.max_txn_ops() / 3
818 }
819
820 pub async fn create_logical_tables_metadata(
822 &self,
823 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
824 ) -> Result<()> {
825 let len = tables_data.len();
826 let mut txns = Vec::with_capacity(3 * len);
827 struct OnFailure<F1, R1, F2, R2>
828 where
829 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
830 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
831 {
832 table_info_value: TableInfoValue,
833 on_create_table_info_failure: F1,
834 table_route_value: TableRouteValue,
835 on_create_table_route_failure: F2,
836 }
837 let mut on_failures = Vec::with_capacity(len);
838 for (mut table_info, table_route_value) in tables_data {
839 table_info.meta.region_numbers = table_route_value.region_numbers();
840 let table_id = table_info.ident.table_id;
841
842 let table_name = TableNameKey::new(
844 &table_info.catalog_name,
845 &table_info.schema_name,
846 &table_info.name,
847 );
848 let create_table_name_txn = self
849 .table_name_manager()
850 .build_create_txn(&table_name, table_id)?;
851 txns.push(create_table_name_txn);
852
853 let table_info_value = TableInfoValue::new(table_info);
855 let (create_table_info_txn, on_create_table_info_failure) =
856 self.table_info_manager()
857 .build_create_txn(table_id, &table_info_value)?;
858 txns.push(create_table_info_txn);
859
860 let (create_table_route_txn, on_create_table_route_failure) = self
861 .table_route_manager()
862 .table_route_storage()
863 .build_create_txn(table_id, &table_route_value)?;
864 txns.push(create_table_route_txn);
865
866 on_failures.push(OnFailure {
867 table_info_value,
868 on_create_table_info_failure,
869 table_route_value,
870 on_create_table_route_failure,
871 });
872 }
873
874 let txn = Txn::merge_all(txns);
875 let mut r = self.kv_backend.txn(txn).await?;
876
877 if !r.succeeded {
879 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
880 for on_failure in on_failures {
881 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
882 .context(error::UnexpectedSnafu {
883 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
884 })?
885 .into_inner();
886
887 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
888 .context(error::UnexpectedSnafu {
889 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
890 })?
891 .into_inner();
892
893 let op_name = "the creating logical tables metadata";
894 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
895 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
896 }
897 }
898
899 Ok(())
900 }
901
902 fn table_metadata_keys(
903 &self,
904 table_id: TableId,
905 table_name: &TableName,
906 table_route_value: &TableRouteValue,
907 region_wal_options: &HashMap<RegionNumber, WalOptions>,
908 ) -> Result<Vec<Vec<u8>>> {
909 let datanode_ids = if table_route_value.is_physical() {
911 region_distribution(table_route_value.region_routes()?)
912 .into_keys()
913 .collect()
914 } else {
915 vec![]
916 };
917 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
918 let table_name = TableNameKey::new(
919 &table_name.catalog_name,
920 &table_name.schema_name,
921 &table_name.table_name,
922 );
923 let table_info_key = TableInfoKey::new(table_id);
924 let table_route_key = TableRouteKey::new(table_id);
925 let datanode_table_keys = datanode_ids
926 .into_iter()
927 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
928 .collect::<HashSet<_>>();
929 let topic_region_map = self
930 .topic_region_manager
931 .get_topic_region_mapping(table_id, region_wal_options);
932 let topic_region_keys = topic_region_map
933 .iter()
934 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
935 .collect::<Vec<_>>();
936 keys.push(table_name.to_bytes());
937 keys.push(table_info_key.to_bytes());
938 keys.push(table_route_key.to_bytes());
939 for key in &datanode_table_keys {
940 keys.push(key.to_bytes());
941 }
942 for key in topic_region_keys {
943 keys.push(key.to_bytes());
944 }
945 Ok(keys)
946 }
947
948 pub async fn delete_table_metadata(
951 &self,
952 table_id: TableId,
953 table_name: &TableName,
954 table_route_value: &TableRouteValue,
955 region_wal_options: &HashMap<RegionNumber, WalOptions>,
956 ) -> Result<()> {
957 let keys =
958 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
959 self.tombstone_manager.create(keys).await.map(|_| ())
960 }
961
962 pub async fn delete_table_metadata_tombstone(
965 &self,
966 table_id: TableId,
967 table_name: &TableName,
968 table_route_value: &TableRouteValue,
969 region_wal_options: &HashMap<RegionNumber, WalOptions>,
970 ) -> Result<()> {
971 let table_metadata_keys =
972 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
973 self.tombstone_manager
974 .delete(table_metadata_keys)
975 .await
976 .map(|_| ())
977 }
978
979 pub async fn restore_table_metadata(
982 &self,
983 table_id: TableId,
984 table_name: &TableName,
985 table_route_value: &TableRouteValue,
986 region_wal_options: &HashMap<RegionNumber, WalOptions>,
987 ) -> Result<()> {
988 let keys =
989 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
990 self.tombstone_manager.restore(keys).await.map(|_| ())
991 }
992
993 pub async fn destroy_table_metadata(
996 &self,
997 table_id: TableId,
998 table_name: &TableName,
999 table_route_value: &TableRouteValue,
1000 region_wal_options: &HashMap<RegionNumber, WalOptions>,
1001 ) -> Result<()> {
1002 let keys =
1003 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
1004 let _ = self
1005 .kv_backend
1006 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1007 .await?;
1008 Ok(())
1009 }
1010
1011 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
1012 let mut keys = Vec::with_capacity(3);
1013 let view_name = TableNameKey::new(
1014 &view_name.catalog_name,
1015 &view_name.schema_name,
1016 &view_name.table_name,
1017 );
1018 let table_info_key = TableInfoKey::new(view_id);
1019 let view_info_key = ViewInfoKey::new(view_id);
1020 keys.push(view_name.to_bytes());
1021 keys.push(table_info_key.to_bytes());
1022 keys.push(view_info_key.to_bytes());
1023
1024 Ok(keys)
1025 }
1026
1027 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
1030 let keys = self.view_info_keys(view_id, view_name)?;
1031 let _ = self
1032 .kv_backend
1033 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
1034 .await?;
1035 Ok(())
1036 }
1037
1038 pub async fn rename_table(
1042 &self,
1043 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1044 new_table_name: String,
1045 ) -> Result<()> {
1046 let current_table_info = ¤t_table_info_value.table_info;
1047 let table_id = current_table_info.ident.table_id;
1048
1049 let table_name_key = TableNameKey::new(
1050 ¤t_table_info.catalog_name,
1051 ¤t_table_info.schema_name,
1052 ¤t_table_info.name,
1053 );
1054
1055 let new_table_name_key = TableNameKey::new(
1056 ¤t_table_info.catalog_name,
1057 ¤t_table_info.schema_name,
1058 &new_table_name,
1059 );
1060
1061 let update_table_name_txn = self.table_name_manager().build_update_txn(
1063 &table_name_key,
1064 &new_table_name_key,
1065 table_id,
1066 )?;
1067
1068 let new_table_info_value = current_table_info_value
1069 .inner
1070 .with_update(move |table_info| {
1071 table_info.name = new_table_name;
1072 });
1073
1074 let (update_table_info_txn, on_update_table_info_failure) = self
1076 .table_info_manager()
1077 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1078
1079 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
1080
1081 let mut r = self.kv_backend.txn(txn).await?;
1082
1083 if !r.succeeded {
1085 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1086 let remote_table_info = on_update_table_info_failure(&mut set)?
1087 .context(error::UnexpectedSnafu {
1088 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
1089 })?
1090 .into_inner();
1091
1092 let op_name = "the renaming table metadata";
1093 ensure_values!(remote_table_info, new_table_info_value, op_name);
1094 }
1095
1096 Ok(())
1097 }
1098
1099 pub async fn update_table_info(
1103 &self,
1104 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1105 region_distribution: Option<RegionDistribution>,
1106 new_table_info: RawTableInfo,
1107 ) -> Result<()> {
1108 let table_id = current_table_info_value.table_info.ident.table_id;
1109 let new_table_info_value = current_table_info_value.update(new_table_info);
1110
1111 let (update_table_info_txn, on_update_table_info_failure) = self
1113 .table_info_manager()
1114 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1115
1116 let txn = if let Some(region_distribution) = region_distribution {
1117 let new_region_options = new_table_info_value.table_info.to_region_options();
1119 let update_datanode_table_options_txn = self
1120 .datanode_table_manager
1121 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1122 .await?;
1123 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1124 } else {
1125 update_table_info_txn
1126 };
1127
1128 let mut r = self.kv_backend.txn(txn).await?;
1129 if !r.succeeded {
1131 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1132 let remote_table_info = on_update_table_info_failure(&mut set)?
1133 .context(error::UnexpectedSnafu {
1134 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1135 })?
1136 .into_inner();
1137
1138 let op_name = "the updating table info";
1139 ensure_values!(remote_table_info, new_table_info_value, op_name);
1140 }
1141 Ok(())
1142 }
1143
1144 #[allow(clippy::too_many_arguments)]
1155 pub async fn update_view_info(
1156 &self,
1157 view_id: TableId,
1158 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1159 new_view_info: Vec<u8>,
1160 table_names: HashSet<TableName>,
1161 columns: Vec<String>,
1162 plan_columns: Vec<String>,
1163 definition: String,
1164 ) -> Result<()> {
1165 let new_view_info_value = current_view_info_value.update(
1166 new_view_info,
1167 table_names,
1168 columns,
1169 plan_columns,
1170 definition,
1171 );
1172
1173 let (update_view_info_txn, on_update_view_info_failure) = self
1175 .view_info_manager()
1176 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1177
1178 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1179
1180 if !r.succeeded {
1182 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1183 let remote_view_info = on_update_view_info_failure(&mut set)?
1184 .context(error::UnexpectedSnafu {
1185 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1186 })?
1187 .into_inner();
1188
1189 let op_name = "the updating view info";
1190 ensure_values!(remote_view_info, new_view_info_value, op_name);
1191 }
1192 Ok(())
1193 }
1194
1195 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1196 self.kv_backend.max_txn_ops()
1197 }
1198
1199 pub async fn batch_update_table_info_values(
1200 &self,
1201 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1202 ) -> Result<()> {
1203 let len = table_info_value_pairs.len();
1204 let mut txns = Vec::with_capacity(len);
1205 struct OnFailure<F, R>
1206 where
1207 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1208 {
1209 table_info_value: TableInfoValue,
1210 on_update_table_info_failure: F,
1211 }
1212 let mut on_failures = Vec::with_capacity(len);
1213
1214 for (table_info_value, new_table_info) in table_info_value_pairs {
1215 let table_id = table_info_value.table_info.ident.table_id;
1216
1217 let new_table_info_value = table_info_value.update(new_table_info);
1218
1219 let (update_table_info_txn, on_update_table_info_failure) =
1220 self.table_info_manager().build_update_txn(
1221 table_id,
1222 &table_info_value,
1223 &new_table_info_value,
1224 )?;
1225
1226 txns.push(update_table_info_txn);
1227
1228 on_failures.push(OnFailure {
1229 table_info_value: new_table_info_value,
1230 on_update_table_info_failure,
1231 });
1232 }
1233
1234 let txn = Txn::merge_all(txns);
1235 let mut r = self.kv_backend.txn(txn).await?;
1236
1237 if !r.succeeded {
1238 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1239 for on_failure in on_failures {
1240 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1241 .context(error::UnexpectedSnafu {
1242 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1243 })?
1244 .into_inner();
1245
1246 let op_name = "the batch updating table info";
1247 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1248 }
1249 }
1250
1251 Ok(())
1252 }
1253
1254 pub async fn update_table_route(
1255 &self,
1256 table_id: TableId,
1257 region_info: RegionInfo,
1258 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1259 new_region_routes: Vec<RegionRoute>,
1260 new_region_options: &HashMap<String, String>,
1261 new_region_wal_options: &HashMap<RegionNumber, String>,
1262 ) -> Result<()> {
1263 let current_region_distribution =
1265 region_distribution(current_table_route_value.region_routes()?);
1266 let new_region_distribution = region_distribution(&new_region_routes);
1267
1268 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1269 table_id,
1270 region_info,
1271 current_region_distribution,
1272 new_region_distribution,
1273 new_region_options,
1274 new_region_wal_options,
1275 )?;
1276
1277 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1279
1280 let (update_table_route_txn, on_update_table_route_failure) = self
1281 .table_route_manager()
1282 .table_route_storage()
1283 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1284
1285 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1286
1287 let mut r = self.kv_backend.txn(txn).await?;
1288
1289 if !r.succeeded {
1291 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1292 let remote_table_route = on_update_table_route_failure(&mut set)?
1293 .context(error::UnexpectedSnafu {
1294 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1295 })?
1296 .into_inner();
1297
1298 let op_name = "the updating table route";
1299 ensure_values!(remote_table_route, new_table_route_value, op_name);
1300 }
1301
1302 Ok(())
1303 }
1304
1305 pub async fn update_leader_region_status<F>(
1307 &self,
1308 table_id: TableId,
1309 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1310 next_region_route_status: F,
1311 ) -> Result<()>
1312 where
1313 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1314 {
1315 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1316
1317 let mut updated = 0;
1318 for route in &mut new_region_routes {
1319 if let Some(state) = next_region_route_status(route) {
1320 if route.set_leader_state(state) {
1321 updated += 1;
1322 }
1323 }
1324 }
1325
1326 if updated == 0 {
1327 warn!("No leader status updated");
1328 return Ok(());
1329 }
1330
1331 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1333
1334 let (update_table_route_txn, on_update_table_route_failure) = self
1335 .table_route_manager()
1336 .table_route_storage()
1337 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1338
1339 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1340
1341 if !r.succeeded {
1343 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1344 let remote_table_route = on_update_table_route_failure(&mut set)?
1345 .context(error::UnexpectedSnafu {
1346 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1347 })?
1348 .into_inner();
1349
1350 let op_name = "the updating leader region status";
1351 ensure_values!(remote_table_route, new_table_route_value, op_name);
1352 }
1353
1354 Ok(())
1355 }
1356}
1357
1358#[macro_export]
1359macro_rules! impl_metadata_value {
1360 ($($val_ty: ty), *) => {
1361 $(
1362 impl $crate::key::MetadataValue for $val_ty {
1363 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1364 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1365 }
1366
1367 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1368 serde_json::to_vec(self).context(SerdeJsonSnafu)
1369 }
1370 }
1371 )*
1372 }
1373}
1374
1375macro_rules! impl_metadata_key_get_txn_op {
1376 ($($key: ty), *) => {
1377 $(
1378 impl $crate::key::MetadataKeyGetTxnOp for $key {
1379 fn build_get_op(
1382 &self,
1383 ) -> (
1384 TxnOp,
1385 impl for<'a> FnMut(
1386 &'a mut TxnOpGetResponseSet,
1387 ) -> Option<Vec<u8>>,
1388 ) {
1389 let raw_key = self.to_bytes();
1390 (
1391 TxnOp::Get(raw_key.clone()),
1392 TxnOpGetResponseSet::filter(raw_key),
1393 )
1394 }
1395 }
1396 )*
1397 }
1398}
1399
1400impl_metadata_key_get_txn_op! {
1401 TableNameKey<'_>,
1402 TableInfoKey,
1403 ViewInfoKey,
1404 TableRouteKey,
1405 DatanodeTableKey
1406}
1407
1408#[macro_export]
1409macro_rules! impl_optional_metadata_value {
1410 ($($val_ty: ty), *) => {
1411 $(
1412 impl $val_ty {
1413 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1414 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1415 }
1416
1417 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1418 serde_json::to_vec(self).context(SerdeJsonSnafu)
1419 }
1420 }
1421 )*
1422 }
1423}
1424
1425impl_metadata_value! {
1426 TableNameValue,
1427 TableInfoValue,
1428 ViewInfoValue,
1429 DatanodeTableValue,
1430 FlowInfoValue,
1431 FlowNameValue,
1432 FlowRouteValue,
1433 TableFlowValue,
1434 NodeAddressValue,
1435 SchemaNameValue,
1436 FlowStateValue,
1437 PoisonValue
1438}
1439
1440impl_optional_metadata_value! {
1441 CatalogNameValue,
1442 SchemaNameValue
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447 use std::collections::{BTreeMap, HashMap, HashSet};
1448 use std::sync::Arc;
1449
1450 use bytes::Bytes;
1451 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1452 use common_time::util::current_time_millis;
1453 use common_wal::options::{KafkaWalOptions, WalOptions};
1454 use futures::TryStreamExt;
1455 use store_api::storage::{RegionId, RegionNumber};
1456 use table::metadata::{RawTableInfo, TableInfo};
1457 use table::table_name::TableName;
1458
1459 use super::datanode_table::DatanodeTableKey;
1460 use super::test_utils;
1461 use crate::ddl::test_util::create_table::test_create_table_task;
1462 use crate::ddl::utils::region_storage_path;
1463 use crate::error::Result;
1464 use crate::key::datanode_table::RegionInfo;
1465 use crate::key::table_info::TableInfoValue;
1466 use crate::key::table_name::TableNameKey;
1467 use crate::key::table_route::TableRouteValue;
1468 use crate::key::{
1469 DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
1470 ViewInfoValue, TOPIC_REGION_PREFIX,
1471 };
1472 use crate::kv_backend::memory::MemoryKvBackend;
1473 use crate::kv_backend::KvBackend;
1474 use crate::peer::Peer;
1475 use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1476 use crate::rpc::store::RangeRequest;
1477 use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1478
1479 #[test]
1480 fn test_deserialized_value_with_bytes() {
1481 let region_route = new_test_region_route();
1482 let region_routes = vec![region_route.clone()];
1483
1484 let expected_region_routes =
1485 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1486 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1487
1488 let value = DeserializedValueWithBytes {
1491 inner: TableRouteValue::physical(region_routes.clone()),
1493 bytes: Bytes::from(expected.clone()),
1494 };
1495
1496 let encoded = serde_json::to_vec(&value).unwrap();
1497
1498 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1501 serde_json::from_slice(&encoded).unwrap();
1502
1503 assert_eq!(decoded.inner, expected_region_routes);
1504 assert_eq!(decoded.bytes, expected);
1505 }
1506
1507 fn new_test_region_route() -> RegionRoute {
1508 new_region_route(1, 2)
1509 }
1510
1511 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1512 RegionRoute {
1513 region: Region {
1514 id: region_id.into(),
1515 name: "r1".to_string(),
1516 partition: None,
1517 attrs: BTreeMap::new(),
1518 partition_expr: Default::default(),
1519 },
1520 leader_peer: Some(Peer::new(datanode, "a2")),
1521 follower_peers: vec![],
1522 leader_state: None,
1523 leader_down_since: None,
1524 }
1525 }
1526
1527 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1528 test_utils::new_test_table_info(10, region_numbers)
1529 }
1530
1531 fn new_test_table_names() -> HashSet<TableName> {
1532 let mut set = HashSet::new();
1533 set.insert(TableName {
1534 catalog_name: "greptime".to_string(),
1535 schema_name: "public".to_string(),
1536 table_name: "a_table".to_string(),
1537 });
1538 set.insert(TableName {
1539 catalog_name: "greptime".to_string(),
1540 schema_name: "public".to_string(),
1541 table_name: "b_table".to_string(),
1542 });
1543 set
1544 }
1545
1546 async fn create_physical_table_metadata(
1547 table_metadata_manager: &TableMetadataManager,
1548 table_info: RawTableInfo,
1549 region_routes: Vec<RegionRoute>,
1550 region_wal_options: HashMap<RegionNumber, String>,
1551 ) -> Result<()> {
1552 table_metadata_manager
1553 .create_table_metadata(
1554 table_info,
1555 TableRouteValue::physical(region_routes),
1556 region_wal_options,
1557 )
1558 .await
1559 }
1560
1561 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1562 let topics = (0..2)
1563 .map(|i| format!("greptimedb_topic{}", i))
1564 .collect::<Vec<_>>();
1565 let wal_options = topics
1566 .iter()
1567 .map(|topic| {
1568 WalOptions::Kafka(KafkaWalOptions {
1569 topic: topic.clone(),
1570 })
1571 })
1572 .collect::<Vec<_>>();
1573
1574 (0..16)
1575 .enumerate()
1576 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1577 .collect()
1578 }
1579
1580 #[tokio::test]
1581 async fn test_raft_engine_topic_region_map() {
1582 let mem_kv = Arc::new(MemoryKvBackend::default());
1583 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1584 let region_route = new_test_region_route();
1585 let region_routes = &vec![region_route.clone()];
1586 let table_info: RawTableInfo =
1587 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1588 let wal_allocator = WalOptionsAllocator::RaftEngine;
1589 let regions = (0..16).collect();
1590 let region_wal_options =
1591 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1592 create_physical_table_metadata(
1593 &table_metadata_manager,
1594 table_info.clone(),
1595 region_routes.clone(),
1596 region_wal_options.clone(),
1597 )
1598 .await
1599 .unwrap();
1600
1601 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1602 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1603 let resp = mem_kv.range(range_req).await.unwrap();
1604 assert!(resp.kvs.is_empty());
1606 }
1607
1608 #[tokio::test]
1609 async fn test_create_table_metadata() {
1610 let mem_kv = Arc::new(MemoryKvBackend::default());
1611 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1612 let region_route = new_test_region_route();
1613 let region_routes = &vec![region_route.clone()];
1614 let table_info: RawTableInfo =
1615 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1616 let region_wal_options = create_mock_region_wal_options()
1617 .into_iter()
1618 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1619 .collect::<HashMap<_, _>>();
1620
1621 create_physical_table_metadata(
1623 &table_metadata_manager,
1624 table_info.clone(),
1625 region_routes.clone(),
1626 region_wal_options.clone(),
1627 )
1628 .await
1629 .unwrap();
1630
1631 assert!(create_physical_table_metadata(
1633 &table_metadata_manager,
1634 table_info.clone(),
1635 region_routes.clone(),
1636 region_wal_options.clone(),
1637 )
1638 .await
1639 .is_ok());
1640
1641 let mut modified_region_routes = region_routes.clone();
1642 modified_region_routes.push(region_route.clone());
1643 assert!(create_physical_table_metadata(
1645 &table_metadata_manager,
1646 table_info.clone(),
1647 modified_region_routes,
1648 region_wal_options.clone(),
1649 )
1650 .await
1651 .is_err());
1652
1653 let (remote_table_info, remote_table_route) = table_metadata_manager
1654 .get_full_table_info(10)
1655 .await
1656 .unwrap();
1657
1658 assert_eq!(
1659 remote_table_info.unwrap().into_inner().table_info,
1660 table_info
1661 );
1662 assert_eq!(
1663 remote_table_route
1664 .unwrap()
1665 .into_inner()
1666 .region_routes()
1667 .unwrap(),
1668 region_routes
1669 );
1670
1671 for i in 0..2 {
1672 let region_number = i as u32;
1673 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1674 let topic = format!("greptimedb_topic{}", i);
1675 let regions = table_metadata_manager
1676 .topic_region_manager
1677 .regions(&topic)
1678 .await
1679 .unwrap();
1680 assert_eq!(regions.len(), 8);
1681 assert_eq!(regions[0], region_id);
1682 }
1683 }
1684
1685 #[tokio::test]
1686 async fn test_create_logic_tables_metadata() {
1687 let mem_kv = Arc::new(MemoryKvBackend::default());
1688 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1689 let region_route = new_test_region_route();
1690 let region_routes = vec![region_route.clone()];
1691 let table_info: RawTableInfo =
1692 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1693 let table_id = table_info.ident.table_id;
1694 let table_route_value = TableRouteValue::physical(region_routes.clone());
1695
1696 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1697 table_metadata_manager
1699 .create_logical_tables_metadata(tables_data.clone())
1700 .await
1701 .unwrap();
1702
1703 assert!(table_metadata_manager
1705 .create_logical_tables_metadata(tables_data)
1706 .await
1707 .is_ok());
1708
1709 let mut modified_region_routes = region_routes.clone();
1710 modified_region_routes.push(new_region_route(2, 3));
1711 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1712 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1713 assert!(table_metadata_manager
1715 .create_logical_tables_metadata(modified_tables_data)
1716 .await
1717 .is_err());
1718
1719 let (remote_table_info, remote_table_route) = table_metadata_manager
1720 .get_full_table_info(table_id)
1721 .await
1722 .unwrap();
1723
1724 assert_eq!(
1725 remote_table_info.unwrap().into_inner().table_info,
1726 table_info
1727 );
1728 assert_eq!(
1729 remote_table_route
1730 .unwrap()
1731 .into_inner()
1732 .region_routes()
1733 .unwrap(),
1734 ®ion_routes
1735 );
1736 }
1737
1738 #[tokio::test]
1739 async fn test_create_many_logical_tables_metadata() {
1740 let kv_backend = Arc::new(MemoryKvBackend::default());
1741 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1742
1743 let mut tables_data = vec![];
1744 for i in 0..128 {
1745 let table_id = i + 1;
1746 let regin_number = table_id * 3;
1747 let region_id = RegionId::new(table_id, regin_number);
1748 let region_route = new_region_route(region_id.as_u64(), 2);
1749 let region_routes = vec![region_route.clone()];
1750 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1751 table_id,
1752 &format!("my_table_{}", table_id),
1753 region_routes.iter().map(|r| r.region.id.region_number()),
1754 )
1755 .into();
1756 let table_route_value = TableRouteValue::physical(region_routes.clone());
1757
1758 tables_data.push((table_info, table_route_value));
1759 }
1760
1761 table_metadata_manager
1763 .create_logical_tables_metadata(tables_data)
1764 .await
1765 .unwrap();
1766 }
1767
1768 #[tokio::test]
1769 async fn test_delete_table_metadata() {
1770 let mem_kv = Arc::new(MemoryKvBackend::default());
1771 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1772 let region_route = new_test_region_route();
1773 let region_routes = &vec![region_route.clone()];
1774 let table_info: RawTableInfo =
1775 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1776 let table_id = table_info.ident.table_id;
1777 let datanode_id = 2;
1778 let region_wal_options = create_mock_region_wal_options();
1779 let serialized_region_wal_options = region_wal_options
1780 .iter()
1781 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1782 .collect::<HashMap<_, _>>();
1783
1784 create_physical_table_metadata(
1786 &table_metadata_manager,
1787 table_info.clone(),
1788 region_routes.clone(),
1789 serialized_region_wal_options,
1790 )
1791 .await
1792 .unwrap();
1793
1794 let table_name = TableName::new(
1795 table_info.catalog_name,
1796 table_info.schema_name,
1797 table_info.name,
1798 );
1799 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1800 table_metadata_manager
1802 .delete_table_metadata(
1803 table_id,
1804 &table_name,
1805 table_route_value,
1806 ®ion_wal_options,
1807 )
1808 .await
1809 .unwrap();
1810 table_metadata_manager
1812 .delete_table_metadata(
1813 table_id,
1814 &table_name,
1815 table_route_value,
1816 ®ion_wal_options,
1817 )
1818 .await
1819 .unwrap();
1820 assert!(table_metadata_manager
1821 .table_info_manager()
1822 .get(table_id)
1823 .await
1824 .unwrap()
1825 .is_none());
1826 assert!(table_metadata_manager
1827 .table_route_manager()
1828 .table_route_storage()
1829 .get(table_id)
1830 .await
1831 .unwrap()
1832 .is_none());
1833 assert!(table_metadata_manager
1834 .datanode_table_manager()
1835 .tables(datanode_id)
1836 .try_collect::<Vec<_>>()
1837 .await
1838 .unwrap()
1839 .is_empty());
1840 let table_info = table_metadata_manager
1842 .table_info_manager()
1843 .get(table_id)
1844 .await
1845 .unwrap();
1846 assert!(table_info.is_none());
1847 let table_route = table_metadata_manager
1848 .table_route_manager()
1849 .table_route_storage()
1850 .get(table_id)
1851 .await
1852 .unwrap();
1853 assert!(table_route.is_none());
1854 let regions = table_metadata_manager
1856 .topic_region_manager
1857 .regions("greptimedb_topic0")
1858 .await
1859 .unwrap();
1860 assert_eq!(regions.len(), 0);
1861 let regions = table_metadata_manager
1862 .topic_region_manager
1863 .regions("greptimedb_topic1")
1864 .await
1865 .unwrap();
1866 assert_eq!(regions.len(), 0);
1867 }
1868
1869 #[tokio::test]
1870 async fn test_rename_table() {
1871 let mem_kv = Arc::new(MemoryKvBackend::default());
1872 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1873 let region_route = new_test_region_route();
1874 let region_routes = vec![region_route.clone()];
1875 let table_info: RawTableInfo =
1876 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1877 let table_id = table_info.ident.table_id;
1878 create_physical_table_metadata(
1880 &table_metadata_manager,
1881 table_info.clone(),
1882 region_routes.clone(),
1883 HashMap::new(),
1884 )
1885 .await
1886 .unwrap();
1887
1888 let new_table_name = "another_name".to_string();
1889 let table_info_value =
1890 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1891
1892 table_metadata_manager
1893 .rename_table(&table_info_value, new_table_name.clone())
1894 .await
1895 .unwrap();
1896 table_metadata_manager
1898 .rename_table(&table_info_value, new_table_name.clone())
1899 .await
1900 .unwrap();
1901 let mut modified_table_info = table_info.clone();
1902 modified_table_info.name = "hi".to_string();
1903 let modified_table_info_value =
1904 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1905 assert!(table_metadata_manager
1908 .rename_table(&modified_table_info_value, new_table_name.clone())
1909 .await
1910 .is_err());
1911
1912 let old_table_name = TableNameKey::new(
1913 &table_info.catalog_name,
1914 &table_info.schema_name,
1915 &table_info.name,
1916 );
1917 let new_table_name = TableNameKey::new(
1918 &table_info.catalog_name,
1919 &table_info.schema_name,
1920 &new_table_name,
1921 );
1922
1923 assert!(table_metadata_manager
1924 .table_name_manager()
1925 .get(old_table_name)
1926 .await
1927 .unwrap()
1928 .is_none());
1929
1930 assert_eq!(
1931 table_metadata_manager
1932 .table_name_manager()
1933 .get(new_table_name)
1934 .await
1935 .unwrap()
1936 .unwrap()
1937 .table_id(),
1938 table_id
1939 );
1940 }
1941
1942 #[tokio::test]
1943 async fn test_update_table_info() {
1944 let mem_kv = Arc::new(MemoryKvBackend::default());
1945 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1946 let region_route = new_test_region_route();
1947 let region_routes = vec![region_route.clone()];
1948 let table_info: RawTableInfo =
1949 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1950 let table_id = table_info.ident.table_id;
1951 create_physical_table_metadata(
1953 &table_metadata_manager,
1954 table_info.clone(),
1955 region_routes.clone(),
1956 HashMap::new(),
1957 )
1958 .await
1959 .unwrap();
1960
1961 let mut new_table_info = table_info.clone();
1962 new_table_info.name = "hi".to_string();
1963 let current_table_info_value =
1964 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1965 table_metadata_manager
1967 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1968 .await
1969 .unwrap();
1970 table_metadata_manager
1972 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1973 .await
1974 .unwrap();
1975
1976 let updated_table_info = table_metadata_manager
1978 .table_info_manager()
1979 .get(table_id)
1980 .await
1981 .unwrap()
1982 .unwrap()
1983 .into_inner();
1984 assert_eq!(updated_table_info.table_info, new_table_info);
1985
1986 let mut wrong_table_info = table_info.clone();
1987 wrong_table_info.name = "wrong".to_string();
1988 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1989 current_table_info_value.update(wrong_table_info),
1990 );
1991 assert!(table_metadata_manager
1994 .update_table_info(&wrong_table_info_value, None, new_table_info)
1995 .await
1996 .is_err())
1997 }
1998
1999 #[tokio::test]
2000 async fn test_update_table_leader_region_status() {
2001 let mem_kv = Arc::new(MemoryKvBackend::default());
2002 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2003 let datanode = 1;
2004 let region_routes = vec![
2005 RegionRoute {
2006 region: Region {
2007 id: 1.into(),
2008 name: "r1".to_string(),
2009 partition: None,
2010 attrs: BTreeMap::new(),
2011 partition_expr: Default::default(),
2012 },
2013 leader_peer: Some(Peer::new(datanode, "a2")),
2014 leader_state: Some(LeaderState::Downgrading),
2015 follower_peers: vec![],
2016 leader_down_since: Some(current_time_millis()),
2017 },
2018 RegionRoute {
2019 region: Region {
2020 id: 2.into(),
2021 name: "r2".to_string(),
2022 partition: None,
2023 attrs: BTreeMap::new(),
2024 partition_expr: Default::default(),
2025 },
2026 leader_peer: Some(Peer::new(datanode, "a1")),
2027 leader_state: None,
2028 follower_peers: vec![],
2029 leader_down_since: None,
2030 },
2031 ];
2032 let table_info: RawTableInfo =
2033 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2034 let table_id = table_info.ident.table_id;
2035 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2036 TableRouteValue::physical(region_routes.clone()),
2037 );
2038
2039 create_physical_table_metadata(
2041 &table_metadata_manager,
2042 table_info.clone(),
2043 region_routes.clone(),
2044 HashMap::new(),
2045 )
2046 .await
2047 .unwrap();
2048
2049 table_metadata_manager
2050 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
2051 if region_route.leader_state.is_some() {
2052 None
2053 } else {
2054 Some(Some(LeaderState::Downgrading))
2055 }
2056 })
2057 .await
2058 .unwrap();
2059
2060 let updated_route_value = table_metadata_manager
2061 .table_route_manager()
2062 .table_route_storage()
2063 .get(table_id)
2064 .await
2065 .unwrap()
2066 .unwrap();
2067
2068 assert_eq!(
2069 updated_route_value.region_routes().unwrap()[0].leader_state,
2070 Some(LeaderState::Downgrading)
2071 );
2072
2073 assert!(updated_route_value.region_routes().unwrap()[0]
2074 .leader_down_since
2075 .is_some());
2076
2077 assert_eq!(
2078 updated_route_value.region_routes().unwrap()[1].leader_state,
2079 Some(LeaderState::Downgrading)
2080 );
2081 assert!(updated_route_value.region_routes().unwrap()[1]
2082 .leader_down_since
2083 .is_some());
2084 }
2085
2086 async fn assert_datanode_table(
2087 table_metadata_manager: &TableMetadataManager,
2088 table_id: u32,
2089 region_routes: &[RegionRoute],
2090 ) {
2091 let region_distribution = region_distribution(region_routes);
2092 for (datanode, regions) in region_distribution {
2093 let got = table_metadata_manager
2094 .datanode_table_manager()
2095 .get(&DatanodeTableKey::new(datanode, table_id))
2096 .await
2097 .unwrap()
2098 .unwrap();
2099
2100 assert_eq!(got.regions, regions.leader_regions);
2101 assert_eq!(got.follower_regions, regions.follower_regions);
2102 }
2103 }
2104
2105 #[tokio::test]
2106 async fn test_update_table_route() {
2107 let mem_kv = Arc::new(MemoryKvBackend::default());
2108 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2109 let region_route = new_test_region_route();
2110 let region_routes = vec![region_route.clone()];
2111 let table_info: RawTableInfo =
2112 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2113 let table_id = table_info.ident.table_id;
2114 let engine = table_info.meta.engine.as_str();
2115 let region_storage_path =
2116 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2117 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2118 TableRouteValue::physical(region_routes.clone()),
2119 );
2120
2121 create_physical_table_metadata(
2123 &table_metadata_manager,
2124 table_info.clone(),
2125 region_routes.clone(),
2126 HashMap::new(),
2127 )
2128 .await
2129 .unwrap();
2130
2131 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2132 let new_region_routes = vec![
2133 new_region_route(1, 1),
2134 new_region_route(2, 2),
2135 new_region_route(3, 3),
2136 ];
2137 table_metadata_manager
2139 .update_table_route(
2140 table_id,
2141 RegionInfo {
2142 engine: engine.to_string(),
2143 region_storage_path: region_storage_path.to_string(),
2144 region_options: HashMap::new(),
2145 region_wal_options: HashMap::new(),
2146 },
2147 ¤t_table_route_value,
2148 new_region_routes.clone(),
2149 &HashMap::new(),
2150 &HashMap::new(),
2151 )
2152 .await
2153 .unwrap();
2154 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2155
2156 table_metadata_manager
2158 .update_table_route(
2159 table_id,
2160 RegionInfo {
2161 engine: engine.to_string(),
2162 region_storage_path: region_storage_path.to_string(),
2163 region_options: HashMap::new(),
2164 region_wal_options: HashMap::new(),
2165 },
2166 ¤t_table_route_value,
2167 new_region_routes.clone(),
2168 &HashMap::new(),
2169 &HashMap::new(),
2170 )
2171 .await
2172 .unwrap();
2173
2174 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2175 current_table_route_value
2176 .inner
2177 .update(new_region_routes.clone())
2178 .unwrap(),
2179 );
2180 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2181 table_metadata_manager
2183 .update_table_route(
2184 table_id,
2185 RegionInfo {
2186 engine: engine.to_string(),
2187 region_storage_path: region_storage_path.to_string(),
2188 region_options: HashMap::new(),
2189 region_wal_options: HashMap::new(),
2190 },
2191 ¤t_table_route_value,
2192 new_region_routes.clone(),
2193 &HashMap::new(),
2194 &HashMap::new(),
2195 )
2196 .await
2197 .unwrap();
2198 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2199
2200 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2203 current_table_route_value
2204 .update(vec![
2205 new_region_route(1, 1),
2206 new_region_route(2, 2),
2207 new_region_route(3, 3),
2208 new_region_route(4, 4),
2209 ])
2210 .unwrap(),
2211 );
2212 assert!(table_metadata_manager
2213 .update_table_route(
2214 table_id,
2215 RegionInfo {
2216 engine: engine.to_string(),
2217 region_storage_path: region_storage_path.to_string(),
2218 region_options: HashMap::new(),
2219 region_wal_options: HashMap::new(),
2220 },
2221 &wrong_table_route_value,
2222 new_region_routes,
2223 &HashMap::new(),
2224 &HashMap::new(),
2225 )
2226 .await
2227 .is_err());
2228 }
2229
2230 #[tokio::test]
2231 async fn test_destroy_table_metadata() {
2232 let mem_kv = Arc::new(MemoryKvBackend::default());
2233 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2234 let table_id = 1025;
2235 let table_name = "foo";
2236 let task = test_create_table_task(table_name, table_id);
2237 let options = create_mock_region_wal_options();
2238 let serialized_options = options
2239 .iter()
2240 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2241 .collect::<HashMap<_, _>>();
2242 table_metadata_manager
2243 .create_table_metadata(
2244 task.table_info,
2245 TableRouteValue::physical(vec![
2246 RegionRoute {
2247 region: Region::new_test(RegionId::new(table_id, 1)),
2248 leader_peer: Some(Peer::empty(1)),
2249 follower_peers: vec![Peer::empty(5)],
2250 leader_state: None,
2251 leader_down_since: None,
2252 },
2253 RegionRoute {
2254 region: Region::new_test(RegionId::new(table_id, 2)),
2255 leader_peer: Some(Peer::empty(2)),
2256 follower_peers: vec![Peer::empty(4)],
2257 leader_state: None,
2258 leader_down_since: None,
2259 },
2260 RegionRoute {
2261 region: Region::new_test(RegionId::new(table_id, 3)),
2262 leader_peer: Some(Peer::empty(3)),
2263 follower_peers: vec![],
2264 leader_state: None,
2265 leader_down_since: None,
2266 },
2267 ]),
2268 serialized_options,
2269 )
2270 .await
2271 .unwrap();
2272 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2273 let table_route_value = table_metadata_manager
2274 .table_route_manager
2275 .table_route_storage()
2276 .get_with_raw_bytes(table_id)
2277 .await
2278 .unwrap()
2279 .unwrap();
2280 table_metadata_manager
2281 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2282 .await
2283 .unwrap();
2284 assert!(mem_kv.is_empty());
2285 }
2286
2287 #[tokio::test]
2288 async fn test_restore_table_metadata() {
2289 let mem_kv = Arc::new(MemoryKvBackend::default());
2290 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2291 let table_id = 1025;
2292 let table_name = "foo";
2293 let task = test_create_table_task(table_name, table_id);
2294 let options = create_mock_region_wal_options();
2295 let serialized_options = options
2296 .iter()
2297 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2298 .collect::<HashMap<_, _>>();
2299 table_metadata_manager
2300 .create_table_metadata(
2301 task.table_info,
2302 TableRouteValue::physical(vec![
2303 RegionRoute {
2304 region: Region::new_test(RegionId::new(table_id, 1)),
2305 leader_peer: Some(Peer::empty(1)),
2306 follower_peers: vec![Peer::empty(5)],
2307 leader_state: None,
2308 leader_down_since: None,
2309 },
2310 RegionRoute {
2311 region: Region::new_test(RegionId::new(table_id, 2)),
2312 leader_peer: Some(Peer::empty(2)),
2313 follower_peers: vec![Peer::empty(4)],
2314 leader_state: None,
2315 leader_down_since: None,
2316 },
2317 RegionRoute {
2318 region: Region::new_test(RegionId::new(table_id, 3)),
2319 leader_peer: Some(Peer::empty(3)),
2320 follower_peers: vec![],
2321 leader_state: None,
2322 leader_down_since: None,
2323 },
2324 ]),
2325 serialized_options,
2326 )
2327 .await
2328 .unwrap();
2329 let expected_result = mem_kv.dump();
2330 let table_route_value = table_metadata_manager
2331 .table_route_manager
2332 .table_route_storage()
2333 .get_with_raw_bytes(table_id)
2334 .await
2335 .unwrap()
2336 .unwrap();
2337 let region_routes = table_route_value.region_routes().unwrap();
2338 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2339 let table_route_value = TableRouteValue::physical(region_routes.clone());
2340 table_metadata_manager
2341 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2342 .await
2343 .unwrap();
2344 table_metadata_manager
2345 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2346 .await
2347 .unwrap();
2348 let kvs = mem_kv.dump();
2349 assert_eq!(kvs, expected_result);
2350 table_metadata_manager
2352 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2353 .await
2354 .unwrap();
2355 let kvs = mem_kv.dump();
2356 assert_eq!(kvs, expected_result);
2357 }
2358
2359 #[tokio::test]
2360 async fn test_create_update_view_info() {
2361 let mem_kv = Arc::new(MemoryKvBackend::default());
2362 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2363
2364 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2365
2366 let view_id = view_info.ident.table_id;
2367
2368 let logical_plan: Vec<u8> = vec![1, 2, 3];
2369 let columns = vec!["a".to_string()];
2370 let plan_columns = vec!["number".to_string()];
2371 let table_names = new_test_table_names();
2372 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2373
2374 table_metadata_manager
2376 .create_view_metadata(
2377 view_info.clone(),
2378 logical_plan.clone(),
2379 table_names.clone(),
2380 columns.clone(),
2381 plan_columns.clone(),
2382 definition.to_string(),
2383 )
2384 .await
2385 .unwrap();
2386
2387 {
2388 let current_view_info = table_metadata_manager
2390 .view_info_manager()
2391 .get(view_id)
2392 .await
2393 .unwrap()
2394 .unwrap()
2395 .into_inner();
2396 assert_eq!(current_view_info.view_info, logical_plan);
2397 assert_eq!(current_view_info.table_names, table_names);
2398 assert_eq!(current_view_info.definition, definition);
2399 assert_eq!(current_view_info.columns, columns);
2400 assert_eq!(current_view_info.plan_columns, plan_columns);
2401 let current_table_info = table_metadata_manager
2403 .table_info_manager()
2404 .get(view_id)
2405 .await
2406 .unwrap()
2407 .unwrap()
2408 .into_inner();
2409 assert_eq!(current_table_info.table_info, view_info);
2410 }
2411
2412 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2413 let new_table_names = {
2414 let mut set = HashSet::new();
2415 set.insert(TableName {
2416 catalog_name: "greptime".to_string(),
2417 schema_name: "public".to_string(),
2418 table_name: "b_table".to_string(),
2419 });
2420 set.insert(TableName {
2421 catalog_name: "greptime".to_string(),
2422 schema_name: "public".to_string(),
2423 table_name: "c_table".to_string(),
2424 });
2425 set
2426 };
2427 let new_columns = vec!["b".to_string()];
2428 let new_plan_columns = vec!["number2".to_string()];
2429 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2430
2431 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2432 logical_plan.clone(),
2433 table_names,
2434 columns,
2435 plan_columns,
2436 definition.to_string(),
2437 ));
2438 table_metadata_manager
2440 .update_view_info(
2441 view_id,
2442 ¤t_view_info_value,
2443 new_logical_plan.clone(),
2444 new_table_names.clone(),
2445 new_columns.clone(),
2446 new_plan_columns.clone(),
2447 new_definition.to_string(),
2448 )
2449 .await
2450 .unwrap();
2451 table_metadata_manager
2453 .update_view_info(
2454 view_id,
2455 ¤t_view_info_value,
2456 new_logical_plan.clone(),
2457 new_table_names.clone(),
2458 new_columns.clone(),
2459 new_plan_columns.clone(),
2460 new_definition.to_string(),
2461 )
2462 .await
2463 .unwrap();
2464
2465 let updated_view_info = table_metadata_manager
2467 .view_info_manager()
2468 .get(view_id)
2469 .await
2470 .unwrap()
2471 .unwrap()
2472 .into_inner();
2473 assert_eq!(updated_view_info.view_info, new_logical_plan);
2474 assert_eq!(updated_view_info.table_names, new_table_names);
2475 assert_eq!(updated_view_info.definition, new_definition);
2476 assert_eq!(updated_view_info.columns, new_columns);
2477 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2478
2479 let wrong_view_info = logical_plan.clone();
2480 let wrong_definition = "wrong_definition";
2481 let wrong_view_info_value =
2482 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2483 wrong_view_info,
2484 new_table_names.clone(),
2485 new_columns.clone(),
2486 new_plan_columns.clone(),
2487 wrong_definition.to_string(),
2488 ));
2489 assert!(table_metadata_manager
2492 .update_view_info(
2493 view_id,
2494 &wrong_view_info_value,
2495 new_logical_plan.clone(),
2496 new_table_names.clone(),
2497 vec!["c".to_string()],
2498 vec!["number3".to_string()],
2499 wrong_definition.to_string(),
2500 )
2501 .await
2502 .is_err());
2503
2504 let current_view_info = table_metadata_manager
2506 .view_info_manager()
2507 .get(view_id)
2508 .await
2509 .unwrap()
2510 .unwrap()
2511 .into_inner();
2512 assert_eq!(current_view_info.view_info, new_logical_plan);
2513 assert_eq!(current_view_info.table_names, new_table_names);
2514 assert_eq!(current_view_info.definition, new_definition);
2515 assert_eq!(current_view_info.columns, new_columns);
2516 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2517 }
2518
2519 #[test]
2520 fn test_region_role_set_deserialize() {
2521 let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
2522 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2523 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2524 assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
2525
2526 let s = r#"[1, 2, 3]"#;
2527 let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
2528 assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
2529 assert!(region_role_set.follower_regions.is_empty());
2530 }
2531
2532 #[test]
2533 fn test_region_distribution_deserialize() {
2534 let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
2535 let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
2536 assert_eq!(region_distribution.len(), 2);
2537 assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
2538 assert!(region_distribution[&1].follower_regions.is_empty());
2539 assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
2540 assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
2541 }
2542}