catalog/kvbackend/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_catalog::consts::DEFAULT_CATALOG_NAME;
19use common_error::ext::BoxedError;
20use common_meta::cache::LayeredCacheRegistryRef;
21use common_meta::key::TableMetadataManager;
22use common_meta::key::flow::FlowMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use common_procedure::ProcedureManagerRef;
25use moka::sync::Cache;
26use partition::manager::PartitionRuleManager;
27
28use crate::information_schema::{
29    InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
30};
31use crate::kvbackend::KvBackendCatalogManager;
32use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
33use crate::process_manager::ProcessManagerRef;
34use crate::system_schema::numbers_table_provider::NumbersTableProvider;
35use crate::system_schema::pg_catalog::PGCatalogProvider;
36
37/// The configurator that customizes or enhances the [`KvBackendCatalogManagerBuilder`].
38#[async_trait::async_trait]
39pub trait CatalogManagerConfigurator<C>: Send + Sync {
40    async fn configure(
41        &self,
42        builder: KvBackendCatalogManagerBuilder,
43        ctx: C,
44    ) -> std::result::Result<KvBackendCatalogManagerBuilder, BoxedError>;
45}
46
47pub type CatalogManagerConfiguratorRef<C> = Arc<dyn CatalogManagerConfigurator<C>>;
48
49pub struct KvBackendCatalogManagerBuilder {
50    information_extension: InformationExtensionRef,
51    backend: KvBackendRef,
52    cache_registry: LayeredCacheRegistryRef,
53    procedure_manager: Option<ProcedureManagerRef>,
54    process_manager: Option<ProcessManagerRef>,
55    extra_information_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
56}
57
58impl KvBackendCatalogManagerBuilder {
59    pub fn new(
60        information_extension: InformationExtensionRef,
61        backend: KvBackendRef,
62        cache_registry: LayeredCacheRegistryRef,
63    ) -> Self {
64        Self {
65            information_extension,
66            backend,
67            cache_registry,
68            procedure_manager: None,
69            process_manager: None,
70            extra_information_table_factories: HashMap::new(),
71        }
72    }
73
74    pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
75        self.procedure_manager = Some(procedure_manager);
76        self
77    }
78
79    pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
80        self.process_manager = Some(process_manager);
81        self
82    }
83
84    /// Sets the extra information tables.
85    pub fn with_extra_information_table_factories(
86        mut self,
87        factories: HashMap<String, InformationSchemaTableFactoryRef>,
88    ) -> Self {
89        self.extra_information_table_factories = factories;
90        self
91    }
92
93    pub fn build(self) -> Arc<KvBackendCatalogManager> {
94        let Self {
95            information_extension,
96            backend,
97            cache_registry,
98            procedure_manager,
99            process_manager,
100            extra_information_table_factories,
101        } = self;
102        Arc::new_cyclic(|me| KvBackendCatalogManager {
103            information_extension,
104            partition_manager: Arc::new(PartitionRuleManager::new(
105                backend.clone(),
106                cache_registry
107                    .get()
108                    .expect("Failed to get table_route_cache"),
109            )),
110            table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
111            system_catalog: SystemCatalog {
112                catalog_manager: me.clone(),
113                catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
114                pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
115                information_schema_provider: {
116                    let provider = InformationSchemaProvider::new(
117                        DEFAULT_CATALOG_NAME.to_string(),
118                        me.clone(),
119                        Arc::new(FlowMetadataManager::new(backend.clone())),
120                        process_manager.clone(),
121                        backend.clone(),
122                    );
123                    let provider = provider
124                        .with_extra_table_factories(extra_information_table_factories.clone());
125                    Arc::new(provider)
126                },
127                pg_catalog_provider: Arc::new(PGCatalogProvider::new(
128                    DEFAULT_CATALOG_NAME.to_string(),
129                    me.clone(),
130                )),
131                numbers_table_provider: NumbersTableProvider,
132                backend,
133                process_manager,
134                extra_information_table_factories,
135            },
136            cache_registry,
137            procedure_manager,
138        })
139    }
140}