catalog/kvbackend/
builder.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_catalog::consts::DEFAULT_CATALOG_NAME;
19use common_error::ext::BoxedError;
20use common_meta::cache::LayeredCacheRegistryRef;
21use common_meta::key::TableMetadataManager;
22use common_meta::key::flow::FlowMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use common_procedure::ProcedureManagerRef;
25use moka::sync::Cache;
26use partition::manager::PartitionRuleManager;
27
28use crate::information_schema::{
29 InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
30};
31use crate::kvbackend::KvBackendCatalogManager;
32use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
33use crate::process_manager::ProcessManagerRef;
34use crate::system_schema::numbers_table_provider::NumbersTableProvider;
35use crate::system_schema::pg_catalog::PGCatalogProvider;
36
37#[async_trait::async_trait]
39pub trait CatalogManagerConfigurator<C>: Send + Sync {
40 async fn configure(
41 &self,
42 builder: KvBackendCatalogManagerBuilder,
43 ctx: C,
44 ) -> std::result::Result<KvBackendCatalogManagerBuilder, BoxedError>;
45}
46
47pub type CatalogManagerConfiguratorRef<C> = Arc<dyn CatalogManagerConfigurator<C>>;
48
49pub struct KvBackendCatalogManagerBuilder {
50 information_extension: InformationExtensionRef,
51 backend: KvBackendRef,
52 cache_registry: LayeredCacheRegistryRef,
53 procedure_manager: Option<ProcedureManagerRef>,
54 process_manager: Option<ProcessManagerRef>,
55 extra_information_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
56}
57
58impl KvBackendCatalogManagerBuilder {
59 pub fn new(
60 information_extension: InformationExtensionRef,
61 backend: KvBackendRef,
62 cache_registry: LayeredCacheRegistryRef,
63 ) -> Self {
64 Self {
65 information_extension,
66 backend,
67 cache_registry,
68 procedure_manager: None,
69 process_manager: None,
70 extra_information_table_factories: HashMap::new(),
71 }
72 }
73
74 pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
75 self.procedure_manager = Some(procedure_manager);
76 self
77 }
78
79 pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
80 self.process_manager = Some(process_manager);
81 self
82 }
83
84 pub fn with_extra_information_table_factories(
86 mut self,
87 factories: HashMap<String, InformationSchemaTableFactoryRef>,
88 ) -> Self {
89 self.extra_information_table_factories = factories;
90 self
91 }
92
93 pub fn build(self) -> Arc<KvBackendCatalogManager> {
94 let Self {
95 information_extension,
96 backend,
97 cache_registry,
98 procedure_manager,
99 process_manager,
100 extra_information_table_factories,
101 } = self;
102 Arc::new_cyclic(|me| KvBackendCatalogManager {
103 information_extension,
104 partition_manager: Arc::new(PartitionRuleManager::new(
105 backend.clone(),
106 cache_registry
107 .get()
108 .expect("Failed to get table_route_cache"),
109 )),
110 table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
111 system_catalog: SystemCatalog {
112 catalog_manager: me.clone(),
113 catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
114 pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
115 information_schema_provider: {
116 let provider = InformationSchemaProvider::new(
117 DEFAULT_CATALOG_NAME.to_string(),
118 me.clone(),
119 Arc::new(FlowMetadataManager::new(backend.clone())),
120 process_manager.clone(),
121 backend.clone(),
122 );
123 let provider = provider
124 .with_extra_table_factories(extra_information_table_factories.clone());
125 Arc::new(provider)
126 },
127 pg_catalog_provider: Arc::new(PGCatalogProvider::new(
128 DEFAULT_CATALOG_NAME.to_string(),
129 me.clone(),
130 )),
131 numbers_table_provider: NumbersTableProvider,
132 backend,
133 process_manager,
134 extra_information_table_factories,
135 },
136 cache_registry,
137 procedure_manager,
138 })
139 }
140}