catalog/kvbackend/
builder.rs1use std::sync::Arc;
16
17use common_catalog::consts::DEFAULT_CATALOG_NAME;
18use common_meta::cache::LayeredCacheRegistryRef;
19use common_meta::key::flow::FlowMetadataManager;
20use common_meta::key::TableMetadataManager;
21use common_meta::kv_backend::KvBackendRef;
22use common_procedure::ProcedureManagerRef;
23use moka::sync::Cache;
24use partition::manager::PartitionRuleManager;
25
26#[cfg(feature = "enterprise")]
27use crate::information_schema::InformationSchemaTableFactoryRef;
28use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
29use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
30use crate::kvbackend::KvBackendCatalogManager;
31use crate::process_manager::ProcessManagerRef;
32use crate::system_schema::pg_catalog::PGCatalogProvider;
33
34pub struct KvBackendCatalogManagerBuilder {
35 information_extension: InformationExtensionRef,
36 backend: KvBackendRef,
37 cache_registry: LayeredCacheRegistryRef,
38 procedure_manager: Option<ProcedureManagerRef>,
39 process_manager: Option<ProcessManagerRef>,
40 #[cfg(feature = "enterprise")]
41 extra_information_table_factories:
42 std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
43}
44
45impl KvBackendCatalogManagerBuilder {
46 pub fn new(
47 information_extension: InformationExtensionRef,
48 backend: KvBackendRef,
49 cache_registry: LayeredCacheRegistryRef,
50 ) -> Self {
51 Self {
52 information_extension,
53 backend,
54 cache_registry,
55 procedure_manager: None,
56 process_manager: None,
57 #[cfg(feature = "enterprise")]
58 extra_information_table_factories: std::collections::HashMap::new(),
59 }
60 }
61
62 pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
63 self.procedure_manager = Some(procedure_manager);
64 self
65 }
66
67 pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
68 self.process_manager = Some(process_manager);
69 self
70 }
71
72 #[cfg(feature = "enterprise")]
74 pub fn with_extra_information_table_factories(
75 mut self,
76 factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
77 ) -> Self {
78 self.extra_information_table_factories = factories;
79 self
80 }
81
82 pub fn build(self) -> Arc<KvBackendCatalogManager> {
83 let Self {
84 information_extension,
85 backend,
86 cache_registry,
87 procedure_manager,
88 process_manager,
89 #[cfg(feature = "enterprise")]
90 extra_information_table_factories,
91 } = self;
92 Arc::new_cyclic(|me| KvBackendCatalogManager {
93 information_extension,
94 partition_manager: Arc::new(PartitionRuleManager::new(
95 backend.clone(),
96 cache_registry
97 .get()
98 .expect("Failed to get table_route_cache"),
99 )),
100 table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
101 system_catalog: SystemCatalog {
102 catalog_manager: me.clone(),
103 catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
104 pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
105 information_schema_provider: {
106 let provider = InformationSchemaProvider::new(
107 DEFAULT_CATALOG_NAME.to_string(),
108 me.clone(),
109 Arc::new(FlowMetadataManager::new(backend.clone())),
110 process_manager.clone(),
111 backend.clone(),
112 );
113 #[cfg(feature = "enterprise")]
114 let provider = provider
115 .with_extra_table_factories(extra_information_table_factories.clone());
116 Arc::new(provider)
117 },
118 pg_catalog_provider: Arc::new(PGCatalogProvider::new(
119 DEFAULT_CATALOG_NAME.to_string(),
120 me.clone(),
121 )),
122 backend,
123 process_manager,
124 #[cfg(feature = "enterprise")]
125 extra_information_table_factories,
126 },
127 cache_registry,
128 procedure_manager,
129 })
130 }
131}