1pub mod catalog_name;
101pub mod datanode_table;
102pub mod flow;
103pub mod maintenance;
104pub mod node_address;
105mod schema_metadata_manager;
106pub mod schema_name;
107pub mod table_info;
108pub mod table_name;
109pub mod table_route;
110#[cfg(any(test, feature = "testing"))]
111pub mod test_utils;
112mod tombstone;
113pub mod topic_name;
114pub mod topic_region;
115pub mod txn_helper;
116pub mod view_info;
117
118use std::collections::{BTreeMap, HashMap, HashSet};
119use std::fmt::Debug;
120use std::ops::{Deref, DerefMut};
121use std::sync::Arc;
122
123use bytes::Bytes;
124use common_catalog::consts::{
125 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
126};
127use common_telemetry::warn;
128use common_wal::options::WalOptions;
129use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
130use flow::flow_route::FlowRouteValue;
131use flow::table_flow::TableFlowValue;
132use lazy_static::lazy_static;
133use regex::Regex;
134pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
135use serde::de::DeserializeOwned;
136use serde::{Deserialize, Serialize};
137use snafu::{ensure, OptionExt, ResultExt};
138use store_api::storage::RegionNumber;
139use table::metadata::{RawTableInfo, TableId};
140use table::table_name::TableName;
141use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
142use table_name::{TableNameKey, TableNameManager, TableNameValue};
143use topic_name::TopicNameManager;
144use topic_region::{TopicRegionKey, TopicRegionManager};
145use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
146
147use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
148use self::datanode_table::RegionInfo;
149use self::flow::flow_info::FlowInfoValue;
150use self::flow::flow_name::FlowNameValue;
151use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
152use self::table_route::{TableRouteManager, TableRouteValue};
153use self::tombstone::TombstoneManager;
154use crate::error::{self, Result, SerdeJsonSnafu};
155use crate::key::flow::flow_state::FlowStateValue;
156use crate::key::node_address::NodeAddressValue;
157use crate::key::table_route::TableRouteKey;
158use crate::key::txn_helper::TxnOpGetResponseSet;
159use crate::kv_backend::txn::{Txn, TxnOp};
160use crate::kv_backend::KvBackendRef;
161use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
162use crate::rpc::store::BatchDeleteRequest;
163use crate::state_store::PoisonValue;
164use crate::DatanodeId;
165
166pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
167pub const MAINTENANCE_KEY: &str = "__maintenance";
168
169pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
170pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
171pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
172pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
173pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
174pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
175pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
176pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
177pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
178pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
180pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
181
182pub const CACHE_KEY_PREFIXES: [&str; 5] = [
184 TABLE_NAME_KEY_PREFIX,
185 CATALOG_NAME_KEY_PREFIX,
186 SCHEMA_NAME_KEY_PREFIX,
187 TABLE_ROUTE_PREFIX,
188 NODE_ADDRESS_PREFIX,
189];
190
191pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
192
193pub type FlowId = u32;
195pub type FlowPartitionId = u32;
197
198lazy_static! {
199 pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
200}
201
202lazy_static! {
203 static ref TABLE_INFO_KEY_PATTERN: Regex =
204 Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
205}
206
207lazy_static! {
208 static ref VIEW_INFO_KEY_PATTERN: Regex =
209 Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
210}
211
212lazy_static! {
213 static ref TABLE_ROUTE_KEY_PATTERN: Regex =
214 Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
215}
216
217lazy_static! {
218 static ref DATANODE_TABLE_KEY_PATTERN: Regex =
219 Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
220}
221
222lazy_static! {
223 static ref TABLE_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
224 "^{TABLE_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})/({NAME_PATTERN})$"
225 ))
226 .unwrap();
227}
228
229lazy_static! {
230 static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
232 "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
233 ))
234 .unwrap();
235}
236
237lazy_static! {
238 static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
240 "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
241 ))
242 .unwrap();
243}
244
245lazy_static! {
246 static ref NODE_ADDRESS_PATTERN: Regex =
247 Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
248}
249
250lazy_static! {
251 pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
252 Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
253}
254
255lazy_static! {
256 pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
257 "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
258 ))
259 .unwrap();
260}
261
262pub trait MetadataKey<'a, T> {
264 fn to_bytes(&self) -> Vec<u8>;
265
266 fn from_bytes(bytes: &'a [u8]) -> Result<T>;
267}
268
269#[derive(Debug, Clone, PartialEq)]
270pub struct BytesAdapter(Vec<u8>);
271
272impl From<Vec<u8>> for BytesAdapter {
273 fn from(value: Vec<u8>) -> Self {
274 Self(value)
275 }
276}
277
278impl<'a> MetadataKey<'a, BytesAdapter> for BytesAdapter {
279 fn to_bytes(&self) -> Vec<u8> {
280 self.0.clone()
281 }
282
283 fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
284 Ok(BytesAdapter(bytes.to_vec()))
285 }
286}
287
288pub(crate) trait MetadataKeyGetTxnOp {
289 fn build_get_op(
290 &self,
291 ) -> (
292 TxnOp,
293 impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
294 );
295}
296
297pub trait MetadataValue {
298 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
299 where
300 Self: Sized;
301
302 fn try_as_raw_value(&self) -> Result<Vec<u8>>;
303}
304
305pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
306
307pub struct TableMetadataManager {
308 table_name_manager: TableNameManager,
309 table_info_manager: TableInfoManager,
310 view_info_manager: ViewInfoManager,
311 datanode_table_manager: DatanodeTableManager,
312 catalog_manager: CatalogManager,
313 schema_manager: SchemaManager,
314 table_route_manager: TableRouteManager,
315 tombstone_manager: TombstoneManager,
316 topic_name_manager: TopicNameManager,
317 topic_region_manager: TopicRegionManager,
318 kv_backend: KvBackendRef,
319}
320
321#[macro_export]
322macro_rules! ensure_values {
323 ($got:expr, $expected_value:expr, $name:expr) => {
324 ensure!(
325 $got == $expected_value,
326 error::UnexpectedSnafu {
327 err_msg: format!(
328 "Reads the different value: {:?} during {}, expected: {:?}",
329 $got, $name, $expected_value
330 )
331 }
332 );
333 };
334}
335
336pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
346 bytes: Bytes,
348 inner: T,
350}
351
352impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
353 type Target = T;
354
355 fn deref(&self) -> &Self::Target {
356 &self.inner
357 }
358}
359
360impl<T: DeserializeOwned + Serialize> DerefMut for DeserializedValueWithBytes<T> {
361 fn deref_mut(&mut self) -> &mut Self::Target {
362 &mut self.inner
363 }
364}
365
366impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 write!(
369 f,
370 "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
371 self.inner, self.bytes
372 )
373 }
374}
375
376impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
377 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
381 where
382 S: serde::Serializer,
383 {
384 serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
387 }
388}
389
390impl<'de, T: DeserializeOwned + Serialize + MetadataValue> Deserialize<'de>
391 for DeserializedValueWithBytes<T>
392{
393 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
397 where
398 D: serde::Deserializer<'de>,
399 {
400 let buf = String::deserialize(deserializer)?;
401 let bytes = Bytes::from(buf);
402
403 let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
404 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
405
406 Ok(value)
407 }
408}
409
410impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
411 fn clone(&self) -> Self {
412 Self {
413 bytes: self.bytes.clone(),
414 inner: self.inner.clone(),
415 }
416 }
417}
418
419impl<T: Serialize + DeserializeOwned + MetadataValue> DeserializedValueWithBytes<T> {
420 pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
423 let inner = T::try_from_raw_value(&bytes)?;
424 Ok(Self { bytes, inner })
425 }
426
427 pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
430 Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
431 }
432
433 pub fn into_inner(self) -> T {
434 self.inner
435 }
436
437 pub fn get_inner_ref(&self) -> &T {
438 &self.inner
439 }
440
441 pub fn get_raw_bytes(&self) -> Vec<u8> {
443 self.bytes.to_vec()
444 }
445
446 #[cfg(any(test, feature = "testing"))]
447 pub fn from_inner(inner: T) -> Self {
448 let bytes = serde_json::to_vec(&inner).unwrap();
449
450 Self {
451 bytes: Bytes::from(bytes),
452 inner,
453 }
454 }
455}
456
457impl TableMetadataManager {
458 pub fn new(kv_backend: KvBackendRef) -> Self {
459 TableMetadataManager {
460 table_name_manager: TableNameManager::new(kv_backend.clone()),
461 table_info_manager: TableInfoManager::new(kv_backend.clone()),
462 view_info_manager: ViewInfoManager::new(kv_backend.clone()),
463 datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
464 catalog_manager: CatalogManager::new(kv_backend.clone()),
465 schema_manager: SchemaManager::new(kv_backend.clone()),
466 table_route_manager: TableRouteManager::new(kv_backend.clone()),
467 tombstone_manager: TombstoneManager::new(kv_backend.clone()),
468 topic_name_manager: TopicNameManager::new(kv_backend.clone()),
469 topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
470 kv_backend,
471 }
472 }
473
474 pub async fn init(&self) -> Result<()> {
475 let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
476
477 self.catalog_manager().create(catalog_name, true).await?;
478
479 let internal_schemas = [
480 DEFAULT_SCHEMA_NAME,
481 INFORMATION_SCHEMA_NAME,
482 DEFAULT_PRIVATE_SCHEMA_NAME,
483 ];
484
485 for schema_name in internal_schemas {
486 let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);
487
488 self.schema_manager().create(schema_key, None, true).await?;
489 }
490
491 Ok(())
492 }
493
494 pub fn table_name_manager(&self) -> &TableNameManager {
495 &self.table_name_manager
496 }
497
498 pub fn table_info_manager(&self) -> &TableInfoManager {
499 &self.table_info_manager
500 }
501
502 pub fn view_info_manager(&self) -> &ViewInfoManager {
503 &self.view_info_manager
504 }
505
506 pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
507 &self.datanode_table_manager
508 }
509
510 pub fn catalog_manager(&self) -> &CatalogManager {
511 &self.catalog_manager
512 }
513
514 pub fn schema_manager(&self) -> &SchemaManager {
515 &self.schema_manager
516 }
517
518 pub fn table_route_manager(&self) -> &TableRouteManager {
519 &self.table_route_manager
520 }
521
522 pub fn topic_name_manager(&self) -> &TopicNameManager {
523 &self.topic_name_manager
524 }
525
526 pub fn topic_region_manager(&self) -> &TopicRegionManager {
527 &self.topic_region_manager
528 }
529
530 #[cfg(feature = "testing")]
531 pub fn kv_backend(&self) -> &KvBackendRef {
532 &self.kv_backend
533 }
534
535 pub async fn get_full_table_info(
536 &self,
537 table_id: TableId,
538 ) -> Result<(
539 Option<DeserializedValueWithBytes<TableInfoValue>>,
540 Option<DeserializedValueWithBytes<TableRouteValue>>,
541 )> {
542 let table_info_key = TableInfoKey::new(table_id);
543 let table_route_key = TableRouteKey::new(table_id);
544 let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
545 let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
546
547 let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
548 let mut res = self.kv_backend.txn(txn).await?;
549 let mut set = TxnOpGetResponseSet::from(&mut res.responses);
550 let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
551 let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
552 Ok((table_info_value, table_route_value))
553 }
554
555 pub async fn create_view_metadata(
565 &self,
566 view_info: RawTableInfo,
567 raw_logical_plan: Vec<u8>,
568 table_names: HashSet<TableName>,
569 columns: Vec<String>,
570 plan_columns: Vec<String>,
571 definition: String,
572 ) -> Result<()> {
573 let view_id = view_info.ident.table_id;
574
575 let view_name = TableNameKey::new(
577 &view_info.catalog_name,
578 &view_info.schema_name,
579 &view_info.name,
580 );
581 let create_table_name_txn = self
582 .table_name_manager()
583 .build_create_txn(&view_name, view_id)?;
584
585 let table_info_value = TableInfoValue::new(view_info);
587
588 let (create_table_info_txn, on_create_table_info_failure) = self
589 .table_info_manager()
590 .build_create_txn(view_id, &table_info_value)?;
591
592 let view_info_value = ViewInfoValue::new(
594 raw_logical_plan,
595 table_names,
596 columns,
597 plan_columns,
598 definition,
599 );
600 let (create_view_info_txn, on_create_view_info_failure) = self
601 .view_info_manager()
602 .build_create_txn(view_id, &view_info_value)?;
603
604 let txn = Txn::merge_all(vec![
605 create_table_name_txn,
606 create_table_info_txn,
607 create_view_info_txn,
608 ]);
609
610 let mut r = self.kv_backend.txn(txn).await?;
611
612 if !r.succeeded {
614 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
615 let remote_table_info = on_create_table_info_failure(&mut set)?
616 .context(error::UnexpectedSnafu {
617 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
618 })?
619 .into_inner();
620
621 let remote_view_info = on_create_view_info_failure(&mut set)?
622 .context(error::UnexpectedSnafu {
623 err_msg: "Reads the empty view info in comparing operation of creating view metadata",
624 })?
625 .into_inner();
626
627 let op_name = "the creating view metadata";
628 ensure_values!(remote_table_info, table_info_value, op_name);
629 ensure_values!(remote_view_info, view_info_value, op_name);
630 }
631
632 Ok(())
633 }
634
635 pub async fn create_table_metadata(
638 &self,
639 mut table_info: RawTableInfo,
640 table_route_value: TableRouteValue,
641 region_wal_options: HashMap<RegionNumber, String>,
642 ) -> Result<()> {
643 let region_numbers = table_route_value.region_numbers();
644 table_info.meta.region_numbers = region_numbers;
645 let table_id = table_info.ident.table_id;
646 let engine = table_info.meta.engine.clone();
647
648 let table_name = TableNameKey::new(
650 &table_info.catalog_name,
651 &table_info.schema_name,
652 &table_info.name,
653 );
654 let create_table_name_txn = self
655 .table_name_manager()
656 .build_create_txn(&table_name, table_id)?;
657
658 let region_options = table_info.to_region_options();
659 let table_info_value = TableInfoValue::new(table_info);
661 let (create_table_info_txn, on_create_table_info_failure) = self
662 .table_info_manager()
663 .build_create_txn(table_id, &table_info_value)?;
664
665 let (create_table_route_txn, on_create_table_route_failure) = self
666 .table_route_manager()
667 .table_route_storage()
668 .build_create_txn(table_id, &table_route_value)?;
669
670 let create_topic_region_txn = self
671 .topic_region_manager
672 .build_create_txn(table_id, ®ion_wal_options)?;
673
674 let mut txn = Txn::merge_all(vec![
675 create_table_name_txn,
676 create_table_info_txn,
677 create_table_route_txn,
678 create_topic_region_txn,
679 ]);
680
681 if let TableRouteValue::Physical(x) = &table_route_value {
682 let region_storage_path = table_info_value.region_storage_path();
683 let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
684 table_id,
685 &engine,
686 ®ion_storage_path,
687 region_options,
688 region_wal_options,
689 region_distribution(&x.region_routes),
690 )?;
691 txn = txn.merge(create_datanode_table_txn);
692 }
693
694 let mut r = self.kv_backend.txn(txn).await?;
695
696 if !r.succeeded {
698 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
699 let remote_table_info = on_create_table_info_failure(&mut set)?
700 .context(error::UnexpectedSnafu {
701 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
702 })?
703 .into_inner();
704
705 let remote_table_route = on_create_table_route_failure(&mut set)?
706 .context(error::UnexpectedSnafu {
707 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
708 })?
709 .into_inner();
710
711 let op_name = "the creating table metadata";
712 ensure_values!(remote_table_info, table_info_value, op_name);
713 ensure_values!(remote_table_route, table_route_value, op_name);
714 }
715
716 Ok(())
717 }
718
719 pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
720 self.kv_backend.max_txn_ops() / 3
723 }
724
725 pub async fn create_logical_tables_metadata(
727 &self,
728 tables_data: Vec<(RawTableInfo, TableRouteValue)>,
729 ) -> Result<()> {
730 let len = tables_data.len();
731 let mut txns = Vec::with_capacity(3 * len);
732 struct OnFailure<F1, R1, F2, R2>
733 where
734 F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
735 F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
736 {
737 table_info_value: TableInfoValue,
738 on_create_table_info_failure: F1,
739 table_route_value: TableRouteValue,
740 on_create_table_route_failure: F2,
741 }
742 let mut on_failures = Vec::with_capacity(len);
743 for (mut table_info, table_route_value) in tables_data {
744 table_info.meta.region_numbers = table_route_value.region_numbers();
745 let table_id = table_info.ident.table_id;
746
747 let table_name = TableNameKey::new(
749 &table_info.catalog_name,
750 &table_info.schema_name,
751 &table_info.name,
752 );
753 let create_table_name_txn = self
754 .table_name_manager()
755 .build_create_txn(&table_name, table_id)?;
756 txns.push(create_table_name_txn);
757
758 let table_info_value = TableInfoValue::new(table_info);
760 let (create_table_info_txn, on_create_table_info_failure) =
761 self.table_info_manager()
762 .build_create_txn(table_id, &table_info_value)?;
763 txns.push(create_table_info_txn);
764
765 let (create_table_route_txn, on_create_table_route_failure) = self
766 .table_route_manager()
767 .table_route_storage()
768 .build_create_txn(table_id, &table_route_value)?;
769 txns.push(create_table_route_txn);
770
771 on_failures.push(OnFailure {
772 table_info_value,
773 on_create_table_info_failure,
774 table_route_value,
775 on_create_table_route_failure,
776 });
777 }
778
779 let txn = Txn::merge_all(txns);
780 let mut r = self.kv_backend.txn(txn).await?;
781
782 if !r.succeeded {
784 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
785 for on_failure in on_failures {
786 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
787 .context(error::UnexpectedSnafu {
788 err_msg: "Reads the empty table info in comparing operation of creating table metadata",
789 })?
790 .into_inner();
791
792 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
793 .context(error::UnexpectedSnafu {
794 err_msg: "Reads the empty table route in comparing operation of creating table metadata",
795 })?
796 .into_inner();
797
798 let op_name = "the creating logical tables metadata";
799 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
800 ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
801 }
802 }
803
804 Ok(())
805 }
806
807 fn table_metadata_keys(
808 &self,
809 table_id: TableId,
810 table_name: &TableName,
811 table_route_value: &TableRouteValue,
812 region_wal_options: &HashMap<RegionNumber, WalOptions>,
813 ) -> Result<Vec<Vec<u8>>> {
814 let datanode_ids = if table_route_value.is_physical() {
816 region_distribution(table_route_value.region_routes()?)
817 .into_keys()
818 .collect()
819 } else {
820 vec![]
821 };
822 let mut keys = Vec::with_capacity(3 + datanode_ids.len());
823 let table_name = TableNameKey::new(
824 &table_name.catalog_name,
825 &table_name.schema_name,
826 &table_name.table_name,
827 );
828 let table_info_key = TableInfoKey::new(table_id);
829 let table_route_key = TableRouteKey::new(table_id);
830 let datanode_table_keys = datanode_ids
831 .into_iter()
832 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
833 .collect::<HashSet<_>>();
834 let topic_region_map = self
835 .topic_region_manager
836 .get_topic_region_mapping(table_id, region_wal_options);
837 let topic_region_keys = topic_region_map
838 .iter()
839 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
840 .collect::<Vec<_>>();
841 keys.push(table_name.to_bytes());
842 keys.push(table_info_key.to_bytes());
843 keys.push(table_route_key.to_bytes());
844 for key in &datanode_table_keys {
845 keys.push(key.to_bytes());
846 }
847 for key in topic_region_keys {
848 keys.push(key.to_bytes());
849 }
850 Ok(keys)
851 }
852
853 pub async fn delete_table_metadata(
856 &self,
857 table_id: TableId,
858 table_name: &TableName,
859 table_route_value: &TableRouteValue,
860 region_wal_options: &HashMap<RegionNumber, WalOptions>,
861 ) -> Result<()> {
862 let keys =
863 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
864 self.tombstone_manager.create(keys).await
865 }
866
867 pub async fn delete_table_metadata_tombstone(
870 &self,
871 table_id: TableId,
872 table_name: &TableName,
873 table_route_value: &TableRouteValue,
874 region_wal_options: &HashMap<RegionNumber, WalOptions>,
875 ) -> Result<()> {
876 let table_metadata_keys =
877 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
878 self.tombstone_manager.delete(table_metadata_keys).await
879 }
880
881 pub async fn restore_table_metadata(
884 &self,
885 table_id: TableId,
886 table_name: &TableName,
887 table_route_value: &TableRouteValue,
888 region_wal_options: &HashMap<RegionNumber, WalOptions>,
889 ) -> Result<()> {
890 let keys =
891 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
892 self.tombstone_manager.restore(keys).await
893 }
894
895 pub async fn destroy_table_metadata(
898 &self,
899 table_id: TableId,
900 table_name: &TableName,
901 table_route_value: &TableRouteValue,
902 region_wal_options: &HashMap<RegionNumber, WalOptions>,
903 ) -> Result<()> {
904 let keys =
905 self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
906 let _ = self
907 .kv_backend
908 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
909 .await?;
910 Ok(())
911 }
912
913 fn view_info_keys(&self, view_id: TableId, view_name: &TableName) -> Result<Vec<Vec<u8>>> {
914 let mut keys = Vec::with_capacity(3);
915 let view_name = TableNameKey::new(
916 &view_name.catalog_name,
917 &view_name.schema_name,
918 &view_name.table_name,
919 );
920 let table_info_key = TableInfoKey::new(view_id);
921 let view_info_key = ViewInfoKey::new(view_id);
922 keys.push(view_name.to_bytes());
923 keys.push(table_info_key.to_bytes());
924 keys.push(view_info_key.to_bytes());
925
926 Ok(keys)
927 }
928
929 pub async fn destroy_view_info(&self, view_id: TableId, view_name: &TableName) -> Result<()> {
932 let keys = self.view_info_keys(view_id, view_name)?;
933 let _ = self
934 .kv_backend
935 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
936 .await?;
937 Ok(())
938 }
939
940 pub async fn rename_table(
944 &self,
945 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
946 new_table_name: String,
947 ) -> Result<()> {
948 let current_table_info = ¤t_table_info_value.table_info;
949 let table_id = current_table_info.ident.table_id;
950
951 let table_name_key = TableNameKey::new(
952 ¤t_table_info.catalog_name,
953 ¤t_table_info.schema_name,
954 ¤t_table_info.name,
955 );
956
957 let new_table_name_key = TableNameKey::new(
958 ¤t_table_info.catalog_name,
959 ¤t_table_info.schema_name,
960 &new_table_name,
961 );
962
963 let update_table_name_txn = self.table_name_manager().build_update_txn(
965 &table_name_key,
966 &new_table_name_key,
967 table_id,
968 )?;
969
970 let new_table_info_value = current_table_info_value
971 .inner
972 .with_update(move |table_info| {
973 table_info.name = new_table_name;
974 });
975
976 let (update_table_info_txn, on_update_table_info_failure) = self
978 .table_info_manager()
979 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
980
981 let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
982
983 let mut r = self.kv_backend.txn(txn).await?;
984
985 if !r.succeeded {
987 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
988 let remote_table_info = on_update_table_info_failure(&mut set)?
989 .context(error::UnexpectedSnafu {
990 err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
991 })?
992 .into_inner();
993
994 let op_name = "the renaming table metadata";
995 ensure_values!(remote_table_info, new_table_info_value, op_name);
996 }
997
998 Ok(())
999 }
1000
1001 pub async fn update_table_info(
1005 &self,
1006 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
1007 region_distribution: Option<RegionDistribution>,
1008 new_table_info: RawTableInfo,
1009 ) -> Result<()> {
1010 let table_id = current_table_info_value.table_info.ident.table_id;
1011 let new_table_info_value = current_table_info_value.update(new_table_info);
1012
1013 let (update_table_info_txn, on_update_table_info_failure) = self
1015 .table_info_manager()
1016 .build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
1017
1018 let txn = if let Some(region_distribution) = region_distribution {
1019 let new_region_options = new_table_info_value.table_info.to_region_options();
1021 let update_datanode_table_options_txn = self
1022 .datanode_table_manager
1023 .build_update_table_options_txn(table_id, region_distribution, new_region_options)
1024 .await?;
1025 Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
1026 } else {
1027 update_table_info_txn
1028 };
1029
1030 let mut r = self.kv_backend.txn(txn).await?;
1031 if !r.succeeded {
1033 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1034 let remote_table_info = on_update_table_info_failure(&mut set)?
1035 .context(error::UnexpectedSnafu {
1036 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1037 })?
1038 .into_inner();
1039
1040 let op_name = "the updating table info";
1041 ensure_values!(remote_table_info, new_table_info_value, op_name);
1042 }
1043 Ok(())
1044 }
1045
1046 #[allow(clippy::too_many_arguments)]
1057 pub async fn update_view_info(
1058 &self,
1059 view_id: TableId,
1060 current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
1061 new_view_info: Vec<u8>,
1062 table_names: HashSet<TableName>,
1063 columns: Vec<String>,
1064 plan_columns: Vec<String>,
1065 definition: String,
1066 ) -> Result<()> {
1067 let new_view_info_value = current_view_info_value.update(
1068 new_view_info,
1069 table_names,
1070 columns,
1071 plan_columns,
1072 definition,
1073 );
1074
1075 let (update_view_info_txn, on_update_view_info_failure) = self
1077 .view_info_manager()
1078 .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
1079
1080 let mut r = self.kv_backend.txn(update_view_info_txn).await?;
1081
1082 if !r.succeeded {
1084 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1085 let remote_view_info = on_update_view_info_failure(&mut set)?
1086 .context(error::UnexpectedSnafu {
1087 err_msg: "Reads the empty view info in comparing operation of the updating view info",
1088 })?
1089 .into_inner();
1090
1091 let op_name = "the updating view info";
1092 ensure_values!(remote_view_info, new_view_info_value, op_name);
1093 }
1094 Ok(())
1095 }
1096
1097 pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
1098 self.kv_backend.max_txn_ops()
1099 }
1100
1101 pub async fn batch_update_table_info_values(
1102 &self,
1103 table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
1104 ) -> Result<()> {
1105 let len = table_info_value_pairs.len();
1106 let mut txns = Vec::with_capacity(len);
1107 struct OnFailure<F, R>
1108 where
1109 F: FnOnce(&mut TxnOpGetResponseSet) -> R,
1110 {
1111 table_info_value: TableInfoValue,
1112 on_update_table_info_failure: F,
1113 }
1114 let mut on_failures = Vec::with_capacity(len);
1115
1116 for (table_info_value, new_table_info) in table_info_value_pairs {
1117 let table_id = table_info_value.table_info.ident.table_id;
1118
1119 let new_table_info_value = table_info_value.update(new_table_info);
1120
1121 let (update_table_info_txn, on_update_table_info_failure) =
1122 self.table_info_manager().build_update_txn(
1123 table_id,
1124 &table_info_value,
1125 &new_table_info_value,
1126 )?;
1127
1128 txns.push(update_table_info_txn);
1129
1130 on_failures.push(OnFailure {
1131 table_info_value: new_table_info_value,
1132 on_update_table_info_failure,
1133 });
1134 }
1135
1136 let txn = Txn::merge_all(txns);
1137 let mut r = self.kv_backend.txn(txn).await?;
1138
1139 if !r.succeeded {
1140 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1141 for on_failure in on_failures {
1142 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
1143 .context(error::UnexpectedSnafu {
1144 err_msg: "Reads the empty table info in comparing operation of the updating table info",
1145 })?
1146 .into_inner();
1147
1148 let op_name = "the batch updating table info";
1149 ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
1150 }
1151 }
1152
1153 Ok(())
1154 }
1155
1156 pub async fn update_table_route(
1157 &self,
1158 table_id: TableId,
1159 region_info: RegionInfo,
1160 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1161 new_region_routes: Vec<RegionRoute>,
1162 new_region_options: &HashMap<String, String>,
1163 new_region_wal_options: &HashMap<RegionNumber, String>,
1164 ) -> Result<()> {
1165 let current_region_distribution =
1167 region_distribution(current_table_route_value.region_routes()?);
1168 let new_region_distribution = region_distribution(&new_region_routes);
1169
1170 let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
1171 table_id,
1172 region_info,
1173 current_region_distribution,
1174 new_region_distribution,
1175 new_region_options,
1176 new_region_wal_options,
1177 )?;
1178
1179 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1181
1182 let (update_table_route_txn, on_update_table_route_failure) = self
1183 .table_route_manager()
1184 .table_route_storage()
1185 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1186
1187 let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
1188
1189 let mut r = self.kv_backend.txn(txn).await?;
1190
1191 if !r.succeeded {
1193 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1194 let remote_table_route = on_update_table_route_failure(&mut set)?
1195 .context(error::UnexpectedSnafu {
1196 err_msg: "Reads the empty table route in comparing operation of the updating table route",
1197 })?
1198 .into_inner();
1199
1200 let op_name = "the updating table route";
1201 ensure_values!(remote_table_route, new_table_route_value, op_name);
1202 }
1203
1204 Ok(())
1205 }
1206
1207 pub async fn update_leader_region_status<F>(
1209 &self,
1210 table_id: TableId,
1211 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
1212 next_region_route_status: F,
1213 ) -> Result<()>
1214 where
1215 F: Fn(&RegionRoute) -> Option<Option<LeaderState>>,
1216 {
1217 let mut new_region_routes = current_table_route_value.region_routes()?.clone();
1218
1219 let mut updated = 0;
1220 for route in &mut new_region_routes {
1221 if let Some(state) = next_region_route_status(route) {
1222 if route.set_leader_state(state) {
1223 updated += 1;
1224 }
1225 }
1226 }
1227
1228 if updated == 0 {
1229 warn!("No leader status updated");
1230 return Ok(());
1231 }
1232
1233 let new_table_route_value = current_table_route_value.update(new_region_routes)?;
1235
1236 let (update_table_route_txn, on_update_table_route_failure) = self
1237 .table_route_manager()
1238 .table_route_storage()
1239 .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
1240
1241 let mut r = self.kv_backend.txn(update_table_route_txn).await?;
1242
1243 if !r.succeeded {
1245 let mut set = TxnOpGetResponseSet::from(&mut r.responses);
1246 let remote_table_route = on_update_table_route_failure(&mut set)?
1247 .context(error::UnexpectedSnafu {
1248 err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
1249 })?
1250 .into_inner();
1251
1252 let op_name = "the updating leader region status";
1253 ensure_values!(remote_table_route, new_table_route_value, op_name);
1254 }
1255
1256 Ok(())
1257 }
1258}
1259
1260#[macro_export]
1261macro_rules! impl_metadata_value {
1262 ($($val_ty: ty), *) => {
1263 $(
1264 impl $crate::key::MetadataValue for $val_ty {
1265 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
1266 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1267 }
1268
1269 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1270 serde_json::to_vec(self).context(SerdeJsonSnafu)
1271 }
1272 }
1273 )*
1274 }
1275}
1276
1277macro_rules! impl_metadata_key_get_txn_op {
1278 ($($key: ty), *) => {
1279 $(
1280 impl $crate::key::MetadataKeyGetTxnOp for $key {
1281 fn build_get_op(
1284 &self,
1285 ) -> (
1286 TxnOp,
1287 impl for<'a> FnMut(
1288 &'a mut TxnOpGetResponseSet,
1289 ) -> Option<Vec<u8>>,
1290 ) {
1291 let raw_key = self.to_bytes();
1292 (
1293 TxnOp::Get(raw_key.clone()),
1294 TxnOpGetResponseSet::filter(raw_key),
1295 )
1296 }
1297 }
1298 )*
1299 }
1300}
1301
1302impl_metadata_key_get_txn_op! {
1303 TableNameKey<'_>,
1304 TableInfoKey,
1305 ViewInfoKey,
1306 TableRouteKey,
1307 DatanodeTableKey
1308}
1309
1310#[macro_export]
1311macro_rules! impl_optional_metadata_value {
1312 ($($val_ty: ty), *) => {
1313 $(
1314 impl $val_ty {
1315 pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Option<Self>> {
1316 serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
1317 }
1318
1319 pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
1320 serde_json::to_vec(self).context(SerdeJsonSnafu)
1321 }
1322 }
1323 )*
1324 }
1325}
1326
1327impl_metadata_value! {
1328 TableNameValue,
1329 TableInfoValue,
1330 ViewInfoValue,
1331 DatanodeTableValue,
1332 FlowInfoValue,
1333 FlowNameValue,
1334 FlowRouteValue,
1335 TableFlowValue,
1336 NodeAddressValue,
1337 SchemaNameValue,
1338 FlowStateValue,
1339 PoisonValue
1340}
1341
1342impl_optional_metadata_value! {
1343 CatalogNameValue,
1344 SchemaNameValue
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use std::collections::{BTreeMap, HashMap, HashSet};
1350 use std::sync::Arc;
1351
1352 use bytes::Bytes;
1353 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1354 use common_time::util::current_time_millis;
1355 use common_wal::options::{KafkaWalOptions, WalOptions};
1356 use futures::TryStreamExt;
1357 use store_api::storage::{RegionId, RegionNumber};
1358 use table::metadata::{RawTableInfo, TableInfo};
1359 use table::table_name::TableName;
1360
1361 use super::datanode_table::DatanodeTableKey;
1362 use super::test_utils;
1363 use crate::ddl::test_util::create_table::test_create_table_task;
1364 use crate::ddl::utils::region_storage_path;
1365 use crate::error::Result;
1366 use crate::key::datanode_table::RegionInfo;
1367 use crate::key::table_info::TableInfoValue;
1368 use crate::key::table_name::TableNameKey;
1369 use crate::key::table_route::TableRouteValue;
1370 use crate::key::{
1371 DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
1372 };
1373 use crate::kv_backend::memory::MemoryKvBackend;
1374 use crate::kv_backend::KvBackend;
1375 use crate::peer::Peer;
1376 use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
1377 use crate::rpc::store::RangeRequest;
1378 use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
1379
1380 #[test]
1381 fn test_deserialized_value_with_bytes() {
1382 let region_route = new_test_region_route();
1383 let region_routes = vec![region_route.clone()];
1384
1385 let expected_region_routes =
1386 TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
1387 let expected = serde_json::to_vec(&expected_region_routes).unwrap();
1388
1389 let value = DeserializedValueWithBytes {
1392 inner: TableRouteValue::physical(region_routes.clone()),
1394 bytes: Bytes::from(expected.clone()),
1395 };
1396
1397 let encoded = serde_json::to_vec(&value).unwrap();
1398
1399 let decoded: DeserializedValueWithBytes<TableRouteValue> =
1402 serde_json::from_slice(&encoded).unwrap();
1403
1404 assert_eq!(decoded.inner, expected_region_routes);
1405 assert_eq!(decoded.bytes, expected);
1406 }
1407
1408 fn new_test_region_route() -> RegionRoute {
1409 new_region_route(1, 2)
1410 }
1411
1412 fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
1413 RegionRoute {
1414 region: Region {
1415 id: region_id.into(),
1416 name: "r1".to_string(),
1417 partition: None,
1418 attrs: BTreeMap::new(),
1419 },
1420 leader_peer: Some(Peer::new(datanode, "a2")),
1421 follower_peers: vec![],
1422 leader_state: None,
1423 leader_down_since: None,
1424 }
1425 }
1426
1427 fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
1428 test_utils::new_test_table_info(10, region_numbers)
1429 }
1430
1431 fn new_test_table_names() -> HashSet<TableName> {
1432 let mut set = HashSet::new();
1433 set.insert(TableName {
1434 catalog_name: "greptime".to_string(),
1435 schema_name: "public".to_string(),
1436 table_name: "a_table".to_string(),
1437 });
1438 set.insert(TableName {
1439 catalog_name: "greptime".to_string(),
1440 schema_name: "public".to_string(),
1441 table_name: "b_table".to_string(),
1442 });
1443 set
1444 }
1445
1446 async fn create_physical_table_metadata(
1447 table_metadata_manager: &TableMetadataManager,
1448 table_info: RawTableInfo,
1449 region_routes: Vec<RegionRoute>,
1450 region_wal_options: HashMap<RegionNumber, String>,
1451 ) -> Result<()> {
1452 table_metadata_manager
1453 .create_table_metadata(
1454 table_info,
1455 TableRouteValue::physical(region_routes),
1456 region_wal_options,
1457 )
1458 .await
1459 }
1460
1461 fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
1462 let topics = (0..2)
1463 .map(|i| format!("greptimedb_topic{}", i))
1464 .collect::<Vec<_>>();
1465 let wal_options = topics
1466 .iter()
1467 .map(|topic| {
1468 WalOptions::Kafka(KafkaWalOptions {
1469 topic: topic.clone(),
1470 })
1471 })
1472 .collect::<Vec<_>>();
1473
1474 (0..16)
1475 .enumerate()
1476 .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
1477 .collect()
1478 }
1479
1480 #[tokio::test]
1481 async fn test_raft_engine_topic_region_map() {
1482 let mem_kv = Arc::new(MemoryKvBackend::default());
1483 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
1484 let region_route = new_test_region_route();
1485 let region_routes = &vec![region_route.clone()];
1486 let table_info: RawTableInfo =
1487 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1488 let wal_allocator = WalOptionsAllocator::RaftEngine;
1489 let regions = (0..16).collect();
1490 let region_wal_options =
1491 allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
1492 create_physical_table_metadata(
1493 &table_metadata_manager,
1494 table_info.clone(),
1495 region_routes.clone(),
1496 region_wal_options.clone(),
1497 )
1498 .await
1499 .unwrap();
1500
1501 let topic_region_key = TOPIC_REGION_PREFIX.to_string();
1502 let range_req = RangeRequest::new().with_prefix(topic_region_key);
1503 let resp = mem_kv.range(range_req).await.unwrap();
1504 assert!(resp.kvs.is_empty());
1506 }
1507
1508 #[tokio::test]
1509 async fn test_create_table_metadata() {
1510 let mem_kv = Arc::new(MemoryKvBackend::default());
1511 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1512 let region_route = new_test_region_route();
1513 let region_routes = &vec![region_route.clone()];
1514 let table_info: RawTableInfo =
1515 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1516 let region_wal_options = create_mock_region_wal_options()
1517 .into_iter()
1518 .map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
1519 .collect::<HashMap<_, _>>();
1520
1521 create_physical_table_metadata(
1523 &table_metadata_manager,
1524 table_info.clone(),
1525 region_routes.clone(),
1526 region_wal_options.clone(),
1527 )
1528 .await
1529 .unwrap();
1530
1531 assert!(create_physical_table_metadata(
1533 &table_metadata_manager,
1534 table_info.clone(),
1535 region_routes.clone(),
1536 region_wal_options.clone(),
1537 )
1538 .await
1539 .is_ok());
1540
1541 let mut modified_region_routes = region_routes.clone();
1542 modified_region_routes.push(region_route.clone());
1543 assert!(create_physical_table_metadata(
1545 &table_metadata_manager,
1546 table_info.clone(),
1547 modified_region_routes,
1548 region_wal_options.clone(),
1549 )
1550 .await
1551 .is_err());
1552
1553 let (remote_table_info, remote_table_route) = table_metadata_manager
1554 .get_full_table_info(10)
1555 .await
1556 .unwrap();
1557
1558 assert_eq!(
1559 remote_table_info.unwrap().into_inner().table_info,
1560 table_info
1561 );
1562 assert_eq!(
1563 remote_table_route
1564 .unwrap()
1565 .into_inner()
1566 .region_routes()
1567 .unwrap(),
1568 region_routes
1569 );
1570
1571 for i in 0..2 {
1572 let region_number = i as u32;
1573 let region_id = RegionId::new(table_info.ident.table_id, region_number);
1574 let topic = format!("greptimedb_topic{}", i);
1575 let regions = table_metadata_manager
1576 .topic_region_manager
1577 .regions(&topic)
1578 .await
1579 .unwrap();
1580 assert_eq!(regions.len(), 8);
1581 assert_eq!(regions[0], region_id);
1582 }
1583 }
1584
1585 #[tokio::test]
1586 async fn test_create_logic_tables_metadata() {
1587 let mem_kv = Arc::new(MemoryKvBackend::default());
1588 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1589 let region_route = new_test_region_route();
1590 let region_routes = vec![region_route.clone()];
1591 let table_info: RawTableInfo =
1592 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1593 let table_id = table_info.ident.table_id;
1594 let table_route_value = TableRouteValue::physical(region_routes.clone());
1595
1596 let tables_data = vec![(table_info.clone(), table_route_value.clone())];
1597 table_metadata_manager
1599 .create_logical_tables_metadata(tables_data.clone())
1600 .await
1601 .unwrap();
1602
1603 assert!(table_metadata_manager
1605 .create_logical_tables_metadata(tables_data)
1606 .await
1607 .is_ok());
1608
1609 let mut modified_region_routes = region_routes.clone();
1610 modified_region_routes.push(new_region_route(2, 3));
1611 let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
1612 let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
1613 assert!(table_metadata_manager
1615 .create_logical_tables_metadata(modified_tables_data)
1616 .await
1617 .is_err());
1618
1619 let (remote_table_info, remote_table_route) = table_metadata_manager
1620 .get_full_table_info(table_id)
1621 .await
1622 .unwrap();
1623
1624 assert_eq!(
1625 remote_table_info.unwrap().into_inner().table_info,
1626 table_info
1627 );
1628 assert_eq!(
1629 remote_table_route
1630 .unwrap()
1631 .into_inner()
1632 .region_routes()
1633 .unwrap(),
1634 ®ion_routes
1635 );
1636 }
1637
1638 #[tokio::test]
1639 async fn test_create_many_logical_tables_metadata() {
1640 let kv_backend = Arc::new(MemoryKvBackend::default());
1641 let table_metadata_manager = TableMetadataManager::new(kv_backend);
1642
1643 let mut tables_data = vec![];
1644 for i in 0..128 {
1645 let table_id = i + 1;
1646 let regin_number = table_id * 3;
1647 let region_id = RegionId::new(table_id, regin_number);
1648 let region_route = new_region_route(region_id.as_u64(), 2);
1649 let region_routes = vec![region_route.clone()];
1650 let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
1651 table_id,
1652 &format!("my_table_{}", table_id),
1653 region_routes.iter().map(|r| r.region.id.region_number()),
1654 )
1655 .into();
1656 let table_route_value = TableRouteValue::physical(region_routes.clone());
1657
1658 tables_data.push((table_info, table_route_value));
1659 }
1660
1661 table_metadata_manager
1663 .create_logical_tables_metadata(tables_data)
1664 .await
1665 .unwrap();
1666 }
1667
1668 #[tokio::test]
1669 async fn test_delete_table_metadata() {
1670 let mem_kv = Arc::new(MemoryKvBackend::default());
1671 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1672 let region_route = new_test_region_route();
1673 let region_routes = &vec![region_route.clone()];
1674 let table_info: RawTableInfo =
1675 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1676 let table_id = table_info.ident.table_id;
1677 let datanode_id = 2;
1678 let region_wal_options = create_mock_region_wal_options();
1679 let serialized_region_wal_options = region_wal_options
1680 .iter()
1681 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
1682 .collect::<HashMap<_, _>>();
1683
1684 create_physical_table_metadata(
1686 &table_metadata_manager,
1687 table_info.clone(),
1688 region_routes.clone(),
1689 serialized_region_wal_options,
1690 )
1691 .await
1692 .unwrap();
1693
1694 let table_name = TableName::new(
1695 table_info.catalog_name,
1696 table_info.schema_name,
1697 table_info.name,
1698 );
1699 let table_route_value = &TableRouteValue::physical(region_routes.clone());
1700 table_metadata_manager
1702 .delete_table_metadata(
1703 table_id,
1704 &table_name,
1705 table_route_value,
1706 ®ion_wal_options,
1707 )
1708 .await
1709 .unwrap();
1710 table_metadata_manager
1712 .delete_table_metadata(
1713 table_id,
1714 &table_name,
1715 table_route_value,
1716 ®ion_wal_options,
1717 )
1718 .await
1719 .unwrap();
1720 assert!(table_metadata_manager
1721 .table_info_manager()
1722 .get(table_id)
1723 .await
1724 .unwrap()
1725 .is_none());
1726 assert!(table_metadata_manager
1727 .table_route_manager()
1728 .table_route_storage()
1729 .get(table_id)
1730 .await
1731 .unwrap()
1732 .is_none());
1733 assert!(table_metadata_manager
1734 .datanode_table_manager()
1735 .tables(datanode_id)
1736 .try_collect::<Vec<_>>()
1737 .await
1738 .unwrap()
1739 .is_empty());
1740 let table_info = table_metadata_manager
1742 .table_info_manager()
1743 .get(table_id)
1744 .await
1745 .unwrap();
1746 assert!(table_info.is_none());
1747 let table_route = table_metadata_manager
1748 .table_route_manager()
1749 .table_route_storage()
1750 .get(table_id)
1751 .await
1752 .unwrap();
1753 assert!(table_route.is_none());
1754 let regions = table_metadata_manager
1756 .topic_region_manager
1757 .regions("greptimedb_topic0")
1758 .await
1759 .unwrap();
1760 assert_eq!(regions.len(), 0);
1761 let regions = table_metadata_manager
1762 .topic_region_manager
1763 .regions("greptimedb_topic1")
1764 .await
1765 .unwrap();
1766 assert_eq!(regions.len(), 0);
1767 }
1768
1769 #[tokio::test]
1770 async fn test_rename_table() {
1771 let mem_kv = Arc::new(MemoryKvBackend::default());
1772 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1773 let region_route = new_test_region_route();
1774 let region_routes = vec![region_route.clone()];
1775 let table_info: RawTableInfo =
1776 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1777 let table_id = table_info.ident.table_id;
1778 create_physical_table_metadata(
1780 &table_metadata_manager,
1781 table_info.clone(),
1782 region_routes.clone(),
1783 HashMap::new(),
1784 )
1785 .await
1786 .unwrap();
1787
1788 let new_table_name = "another_name".to_string();
1789 let table_info_value =
1790 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1791
1792 table_metadata_manager
1793 .rename_table(&table_info_value, new_table_name.clone())
1794 .await
1795 .unwrap();
1796 table_metadata_manager
1798 .rename_table(&table_info_value, new_table_name.clone())
1799 .await
1800 .unwrap();
1801 let mut modified_table_info = table_info.clone();
1802 modified_table_info.name = "hi".to_string();
1803 let modified_table_info_value =
1804 DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
1805 assert!(table_metadata_manager
1808 .rename_table(&modified_table_info_value, new_table_name.clone())
1809 .await
1810 .is_err());
1811
1812 let old_table_name = TableNameKey::new(
1813 &table_info.catalog_name,
1814 &table_info.schema_name,
1815 &table_info.name,
1816 );
1817 let new_table_name = TableNameKey::new(
1818 &table_info.catalog_name,
1819 &table_info.schema_name,
1820 &new_table_name,
1821 );
1822
1823 assert!(table_metadata_manager
1824 .table_name_manager()
1825 .get(old_table_name)
1826 .await
1827 .unwrap()
1828 .is_none());
1829
1830 assert_eq!(
1831 table_metadata_manager
1832 .table_name_manager()
1833 .get(new_table_name)
1834 .await
1835 .unwrap()
1836 .unwrap()
1837 .table_id(),
1838 table_id
1839 );
1840 }
1841
1842 #[tokio::test]
1843 async fn test_update_table_info() {
1844 let mem_kv = Arc::new(MemoryKvBackend::default());
1845 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1846 let region_route = new_test_region_route();
1847 let region_routes = vec![region_route.clone()];
1848 let table_info: RawTableInfo =
1849 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1850 let table_id = table_info.ident.table_id;
1851 create_physical_table_metadata(
1853 &table_metadata_manager,
1854 table_info.clone(),
1855 region_routes.clone(),
1856 HashMap::new(),
1857 )
1858 .await
1859 .unwrap();
1860
1861 let mut new_table_info = table_info.clone();
1862 new_table_info.name = "hi".to_string();
1863 let current_table_info_value =
1864 DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
1865 table_metadata_manager
1867 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1868 .await
1869 .unwrap();
1870 table_metadata_manager
1872 .update_table_info(¤t_table_info_value, None, new_table_info.clone())
1873 .await
1874 .unwrap();
1875
1876 let updated_table_info = table_metadata_manager
1878 .table_info_manager()
1879 .get(table_id)
1880 .await
1881 .unwrap()
1882 .unwrap()
1883 .into_inner();
1884 assert_eq!(updated_table_info.table_info, new_table_info);
1885
1886 let mut wrong_table_info = table_info.clone();
1887 wrong_table_info.name = "wrong".to_string();
1888 let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
1889 current_table_info_value.update(wrong_table_info),
1890 );
1891 assert!(table_metadata_manager
1894 .update_table_info(&wrong_table_info_value, None, new_table_info)
1895 .await
1896 .is_err())
1897 }
1898
1899 #[tokio::test]
1900 async fn test_update_table_leader_region_status() {
1901 let mem_kv = Arc::new(MemoryKvBackend::default());
1902 let table_metadata_manager = TableMetadataManager::new(mem_kv);
1903 let datanode = 1;
1904 let region_routes = vec![
1905 RegionRoute {
1906 region: Region {
1907 id: 1.into(),
1908 name: "r1".to_string(),
1909 partition: None,
1910 attrs: BTreeMap::new(),
1911 },
1912 leader_peer: Some(Peer::new(datanode, "a2")),
1913 leader_state: Some(LeaderState::Downgrading),
1914 follower_peers: vec![],
1915 leader_down_since: Some(current_time_millis()),
1916 },
1917 RegionRoute {
1918 region: Region {
1919 id: 2.into(),
1920 name: "r2".to_string(),
1921 partition: None,
1922 attrs: BTreeMap::new(),
1923 },
1924 leader_peer: Some(Peer::new(datanode, "a1")),
1925 leader_state: None,
1926 follower_peers: vec![],
1927 leader_down_since: None,
1928 },
1929 ];
1930 let table_info: RawTableInfo =
1931 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
1932 let table_id = table_info.ident.table_id;
1933 let current_table_route_value = DeserializedValueWithBytes::from_inner(
1934 TableRouteValue::physical(region_routes.clone()),
1935 );
1936
1937 create_physical_table_metadata(
1939 &table_metadata_manager,
1940 table_info.clone(),
1941 region_routes.clone(),
1942 HashMap::new(),
1943 )
1944 .await
1945 .unwrap();
1946
1947 table_metadata_manager
1948 .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
1949 if region_route.leader_state.is_some() {
1950 None
1951 } else {
1952 Some(Some(LeaderState::Downgrading))
1953 }
1954 })
1955 .await
1956 .unwrap();
1957
1958 let updated_route_value = table_metadata_manager
1959 .table_route_manager()
1960 .table_route_storage()
1961 .get(table_id)
1962 .await
1963 .unwrap()
1964 .unwrap();
1965
1966 assert_eq!(
1967 updated_route_value.region_routes().unwrap()[0].leader_state,
1968 Some(LeaderState::Downgrading)
1969 );
1970
1971 assert!(updated_route_value.region_routes().unwrap()[0]
1972 .leader_down_since
1973 .is_some());
1974
1975 assert_eq!(
1976 updated_route_value.region_routes().unwrap()[1].leader_state,
1977 Some(LeaderState::Downgrading)
1978 );
1979 assert!(updated_route_value.region_routes().unwrap()[1]
1980 .leader_down_since
1981 .is_some());
1982 }
1983
1984 async fn assert_datanode_table(
1985 table_metadata_manager: &TableMetadataManager,
1986 table_id: u32,
1987 region_routes: &[RegionRoute],
1988 ) {
1989 let region_distribution = region_distribution(region_routes);
1990 for (datanode, regions) in region_distribution {
1991 let got = table_metadata_manager
1992 .datanode_table_manager()
1993 .get(&DatanodeTableKey::new(datanode, table_id))
1994 .await
1995 .unwrap()
1996 .unwrap();
1997
1998 assert_eq!(got.regions, regions)
1999 }
2000 }
2001
2002 #[tokio::test]
2003 async fn test_update_table_route() {
2004 let mem_kv = Arc::new(MemoryKvBackend::default());
2005 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2006 let region_route = new_test_region_route();
2007 let region_routes = vec![region_route.clone()];
2008 let table_info: RawTableInfo =
2009 new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
2010 let table_id = table_info.ident.table_id;
2011 let engine = table_info.meta.engine.as_str();
2012 let region_storage_path =
2013 region_storage_path(&table_info.catalog_name, &table_info.schema_name);
2014 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2015 TableRouteValue::physical(region_routes.clone()),
2016 );
2017
2018 create_physical_table_metadata(
2020 &table_metadata_manager,
2021 table_info.clone(),
2022 region_routes.clone(),
2023 HashMap::new(),
2024 )
2025 .await
2026 .unwrap();
2027
2028 assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await;
2029 let new_region_routes = vec![
2030 new_region_route(1, 1),
2031 new_region_route(2, 2),
2032 new_region_route(3, 3),
2033 ];
2034 table_metadata_manager
2036 .update_table_route(
2037 table_id,
2038 RegionInfo {
2039 engine: engine.to_string(),
2040 region_storage_path: region_storage_path.to_string(),
2041 region_options: HashMap::new(),
2042 region_wal_options: HashMap::new(),
2043 },
2044 ¤t_table_route_value,
2045 new_region_routes.clone(),
2046 &HashMap::new(),
2047 &HashMap::new(),
2048 )
2049 .await
2050 .unwrap();
2051 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2052
2053 table_metadata_manager
2055 .update_table_route(
2056 table_id,
2057 RegionInfo {
2058 engine: engine.to_string(),
2059 region_storage_path: region_storage_path.to_string(),
2060 region_options: HashMap::new(),
2061 region_wal_options: HashMap::new(),
2062 },
2063 ¤t_table_route_value,
2064 new_region_routes.clone(),
2065 &HashMap::new(),
2066 &HashMap::new(),
2067 )
2068 .await
2069 .unwrap();
2070
2071 let current_table_route_value = DeserializedValueWithBytes::from_inner(
2072 current_table_route_value
2073 .inner
2074 .update(new_region_routes.clone())
2075 .unwrap(),
2076 );
2077 let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
2078 table_metadata_manager
2080 .update_table_route(
2081 table_id,
2082 RegionInfo {
2083 engine: engine.to_string(),
2084 region_storage_path: region_storage_path.to_string(),
2085 region_options: HashMap::new(),
2086 region_wal_options: HashMap::new(),
2087 },
2088 ¤t_table_route_value,
2089 new_region_routes.clone(),
2090 &HashMap::new(),
2091 &HashMap::new(),
2092 )
2093 .await
2094 .unwrap();
2095 assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
2096
2097 let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
2100 current_table_route_value
2101 .update(vec![
2102 new_region_route(1, 1),
2103 new_region_route(2, 2),
2104 new_region_route(3, 3),
2105 new_region_route(4, 4),
2106 ])
2107 .unwrap(),
2108 );
2109 assert!(table_metadata_manager
2110 .update_table_route(
2111 table_id,
2112 RegionInfo {
2113 engine: engine.to_string(),
2114 region_storage_path: region_storage_path.to_string(),
2115 region_options: HashMap::new(),
2116 region_wal_options: HashMap::new(),
2117 },
2118 &wrong_table_route_value,
2119 new_region_routes,
2120 &HashMap::new(),
2121 &HashMap::new(),
2122 )
2123 .await
2124 .is_err());
2125 }
2126
2127 #[tokio::test]
2128 async fn test_destroy_table_metadata() {
2129 let mem_kv = Arc::new(MemoryKvBackend::default());
2130 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2131 let table_id = 1025;
2132 let table_name = "foo";
2133 let task = test_create_table_task(table_name, table_id);
2134 let options = create_mock_region_wal_options();
2135 let serialized_options = options
2136 .iter()
2137 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2138 .collect::<HashMap<_, _>>();
2139 table_metadata_manager
2140 .create_table_metadata(
2141 task.table_info,
2142 TableRouteValue::physical(vec![
2143 RegionRoute {
2144 region: Region::new_test(RegionId::new(table_id, 1)),
2145 leader_peer: Some(Peer::empty(1)),
2146 follower_peers: vec![Peer::empty(5)],
2147 leader_state: None,
2148 leader_down_since: None,
2149 },
2150 RegionRoute {
2151 region: Region::new_test(RegionId::new(table_id, 2)),
2152 leader_peer: Some(Peer::empty(2)),
2153 follower_peers: vec![Peer::empty(4)],
2154 leader_state: None,
2155 leader_down_since: None,
2156 },
2157 RegionRoute {
2158 region: Region::new_test(RegionId::new(table_id, 3)),
2159 leader_peer: Some(Peer::empty(3)),
2160 follower_peers: vec![],
2161 leader_state: None,
2162 leader_down_since: None,
2163 },
2164 ]),
2165 serialized_options,
2166 )
2167 .await
2168 .unwrap();
2169 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2170 let table_route_value = table_metadata_manager
2171 .table_route_manager
2172 .table_route_storage()
2173 .get_with_raw_bytes(table_id)
2174 .await
2175 .unwrap()
2176 .unwrap();
2177 table_metadata_manager
2178 .destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
2179 .await
2180 .unwrap();
2181 assert!(mem_kv.is_empty());
2182 }
2183
2184 #[tokio::test]
2185 async fn test_restore_table_metadata() {
2186 let mem_kv = Arc::new(MemoryKvBackend::default());
2187 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
2188 let table_id = 1025;
2189 let table_name = "foo";
2190 let task = test_create_table_task(table_name, table_id);
2191 let options = create_mock_region_wal_options();
2192 let serialized_options = options
2193 .iter()
2194 .map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
2195 .collect::<HashMap<_, _>>();
2196 table_metadata_manager
2197 .create_table_metadata(
2198 task.table_info,
2199 TableRouteValue::physical(vec![
2200 RegionRoute {
2201 region: Region::new_test(RegionId::new(table_id, 1)),
2202 leader_peer: Some(Peer::empty(1)),
2203 follower_peers: vec![Peer::empty(5)],
2204 leader_state: None,
2205 leader_down_since: None,
2206 },
2207 RegionRoute {
2208 region: Region::new_test(RegionId::new(table_id, 2)),
2209 leader_peer: Some(Peer::empty(2)),
2210 follower_peers: vec![Peer::empty(4)],
2211 leader_state: None,
2212 leader_down_since: None,
2213 },
2214 RegionRoute {
2215 region: Region::new_test(RegionId::new(table_id, 3)),
2216 leader_peer: Some(Peer::empty(3)),
2217 follower_peers: vec![],
2218 leader_state: None,
2219 leader_down_since: None,
2220 },
2221 ]),
2222 serialized_options,
2223 )
2224 .await
2225 .unwrap();
2226 let expected_result = mem_kv.dump();
2227 let table_route_value = table_metadata_manager
2228 .table_route_manager
2229 .table_route_storage()
2230 .get_with_raw_bytes(table_id)
2231 .await
2232 .unwrap()
2233 .unwrap();
2234 let region_routes = table_route_value.region_routes().unwrap();
2235 let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
2236 let table_route_value = TableRouteValue::physical(region_routes.clone());
2237 table_metadata_manager
2238 .delete_table_metadata(table_id, &table_name, &table_route_value, &options)
2239 .await
2240 .unwrap();
2241 table_metadata_manager
2242 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2243 .await
2244 .unwrap();
2245 let kvs = mem_kv.dump();
2246 assert_eq!(kvs, expected_result);
2247 table_metadata_manager
2249 .restore_table_metadata(table_id, &table_name, &table_route_value, &options)
2250 .await
2251 .unwrap();
2252 let kvs = mem_kv.dump();
2253 assert_eq!(kvs, expected_result);
2254 }
2255
2256 #[tokio::test]
2257 async fn test_create_update_view_info() {
2258 let mem_kv = Arc::new(MemoryKvBackend::default());
2259 let table_metadata_manager = TableMetadataManager::new(mem_kv);
2260
2261 let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
2262
2263 let view_id = view_info.ident.table_id;
2264
2265 let logical_plan: Vec<u8> = vec![1, 2, 3];
2266 let columns = vec!["a".to_string()];
2267 let plan_columns = vec!["number".to_string()];
2268 let table_names = new_test_table_names();
2269 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
2270
2271 table_metadata_manager
2273 .create_view_metadata(
2274 view_info.clone(),
2275 logical_plan.clone(),
2276 table_names.clone(),
2277 columns.clone(),
2278 plan_columns.clone(),
2279 definition.to_string(),
2280 )
2281 .await
2282 .unwrap();
2283
2284 {
2285 let current_view_info = table_metadata_manager
2287 .view_info_manager()
2288 .get(view_id)
2289 .await
2290 .unwrap()
2291 .unwrap()
2292 .into_inner();
2293 assert_eq!(current_view_info.view_info, logical_plan);
2294 assert_eq!(current_view_info.table_names, table_names);
2295 assert_eq!(current_view_info.definition, definition);
2296 assert_eq!(current_view_info.columns, columns);
2297 assert_eq!(current_view_info.plan_columns, plan_columns);
2298 let current_table_info = table_metadata_manager
2300 .table_info_manager()
2301 .get(view_id)
2302 .await
2303 .unwrap()
2304 .unwrap()
2305 .into_inner();
2306 assert_eq!(current_table_info.table_info, view_info);
2307 }
2308
2309 let new_logical_plan: Vec<u8> = vec![4, 5, 6];
2310 let new_table_names = {
2311 let mut set = HashSet::new();
2312 set.insert(TableName {
2313 catalog_name: "greptime".to_string(),
2314 schema_name: "public".to_string(),
2315 table_name: "b_table".to_string(),
2316 });
2317 set.insert(TableName {
2318 catalog_name: "greptime".to_string(),
2319 schema_name: "public".to_string(),
2320 table_name: "c_table".to_string(),
2321 });
2322 set
2323 };
2324 let new_columns = vec!["b".to_string()];
2325 let new_plan_columns = vec!["number2".to_string()];
2326 let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
2327
2328 let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
2329 logical_plan.clone(),
2330 table_names,
2331 columns,
2332 plan_columns,
2333 definition.to_string(),
2334 ));
2335 table_metadata_manager
2337 .update_view_info(
2338 view_id,
2339 ¤t_view_info_value,
2340 new_logical_plan.clone(),
2341 new_table_names.clone(),
2342 new_columns.clone(),
2343 new_plan_columns.clone(),
2344 new_definition.to_string(),
2345 )
2346 .await
2347 .unwrap();
2348 table_metadata_manager
2350 .update_view_info(
2351 view_id,
2352 ¤t_view_info_value,
2353 new_logical_plan.clone(),
2354 new_table_names.clone(),
2355 new_columns.clone(),
2356 new_plan_columns.clone(),
2357 new_definition.to_string(),
2358 )
2359 .await
2360 .unwrap();
2361
2362 let updated_view_info = table_metadata_manager
2364 .view_info_manager()
2365 .get(view_id)
2366 .await
2367 .unwrap()
2368 .unwrap()
2369 .into_inner();
2370 assert_eq!(updated_view_info.view_info, new_logical_plan);
2371 assert_eq!(updated_view_info.table_names, new_table_names);
2372 assert_eq!(updated_view_info.definition, new_definition);
2373 assert_eq!(updated_view_info.columns, new_columns);
2374 assert_eq!(updated_view_info.plan_columns, new_plan_columns);
2375
2376 let wrong_view_info = logical_plan.clone();
2377 let wrong_definition = "wrong_definition";
2378 let wrong_view_info_value =
2379 DeserializedValueWithBytes::from_inner(current_view_info_value.update(
2380 wrong_view_info,
2381 new_table_names.clone(),
2382 new_columns.clone(),
2383 new_plan_columns.clone(),
2384 wrong_definition.to_string(),
2385 ));
2386 assert!(table_metadata_manager
2389 .update_view_info(
2390 view_id,
2391 &wrong_view_info_value,
2392 new_logical_plan.clone(),
2393 new_table_names.clone(),
2394 vec!["c".to_string()],
2395 vec!["number3".to_string()],
2396 wrong_definition.to_string(),
2397 )
2398 .await
2399 .is_err());
2400
2401 let current_view_info = table_metadata_manager
2403 .view_info_manager()
2404 .get(view_id)
2405 .await
2406 .unwrap()
2407 .unwrap()
2408 .into_inner();
2409 assert_eq!(current_view_info.view_info, new_logical_plan);
2410 assert_eq!(current_view_info.table_names, new_table_names);
2411 assert_eq!(current_view_info.definition, new_definition);
2412 assert_eq!(current_view_info.columns, new_columns);
2413 assert_eq!(current_view_info.plan_columns, new_plan_columns);
2414 }
2415}