catalog/kvbackend/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_catalog::consts::DEFAULT_CATALOG_NAME;
18use common_meta::cache::LayeredCacheRegistryRef;
19use common_meta::key::TableMetadataManager;
20use common_meta::key::flow::FlowMetadataManager;
21use common_meta::kv_backend::KvBackendRef;
22use common_procedure::ProcedureManagerRef;
23use moka::sync::Cache;
24use partition::manager::PartitionRuleManager;
25
26#[cfg(feature = "enterprise")]
27use crate::information_schema::InformationSchemaTableFactoryRef;
28use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
29use crate::kvbackend::KvBackendCatalogManager;
30use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
31use crate::process_manager::ProcessManagerRef;
32use crate::system_schema::numbers_table_provider::NumbersTableProvider;
33use crate::system_schema::pg_catalog::PGCatalogProvider;
34
35pub struct KvBackendCatalogManagerBuilder {
36    information_extension: InformationExtensionRef,
37    backend: KvBackendRef,
38    cache_registry: LayeredCacheRegistryRef,
39    procedure_manager: Option<ProcedureManagerRef>,
40    process_manager: Option<ProcessManagerRef>,
41    #[cfg(feature = "enterprise")]
42    extra_information_table_factories:
43        std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
44}
45
46impl KvBackendCatalogManagerBuilder {
47    pub fn new(
48        information_extension: InformationExtensionRef,
49        backend: KvBackendRef,
50        cache_registry: LayeredCacheRegistryRef,
51    ) -> Self {
52        Self {
53            information_extension,
54            backend,
55            cache_registry,
56            procedure_manager: None,
57            process_manager: None,
58            #[cfg(feature = "enterprise")]
59            extra_information_table_factories: std::collections::HashMap::new(),
60        }
61    }
62
63    pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
64        self.procedure_manager = Some(procedure_manager);
65        self
66    }
67
68    pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
69        self.process_manager = Some(process_manager);
70        self
71    }
72
73    /// Sets the extra information tables.
74    #[cfg(feature = "enterprise")]
75    pub fn with_extra_information_table_factories(
76        mut self,
77        factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
78    ) -> Self {
79        self.extra_information_table_factories = factories;
80        self
81    }
82
83    pub fn build(self) -> Arc<KvBackendCatalogManager> {
84        let Self {
85            information_extension,
86            backend,
87            cache_registry,
88            procedure_manager,
89            process_manager,
90            #[cfg(feature = "enterprise")]
91            extra_information_table_factories,
92        } = self;
93        Arc::new_cyclic(|me| KvBackendCatalogManager {
94            information_extension,
95            partition_manager: Arc::new(PartitionRuleManager::new(
96                backend.clone(),
97                cache_registry
98                    .get()
99                    .expect("Failed to get table_route_cache"),
100            )),
101            table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
102            system_catalog: SystemCatalog {
103                catalog_manager: me.clone(),
104                catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
105                pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
106                information_schema_provider: {
107                    let provider = InformationSchemaProvider::new(
108                        DEFAULT_CATALOG_NAME.to_string(),
109                        me.clone(),
110                        Arc::new(FlowMetadataManager::new(backend.clone())),
111                        process_manager.clone(),
112                        backend.clone(),
113                    );
114                    #[cfg(feature = "enterprise")]
115                    let provider = provider
116                        .with_extra_table_factories(extra_information_table_factories.clone());
117                    Arc::new(provider)
118                },
119                pg_catalog_provider: Arc::new(PGCatalogProvider::new(
120                    DEFAULT_CATALOG_NAME.to_string(),
121                    me.clone(),
122                )),
123                numbers_table_provider: NumbersTableProvider,
124                backend,
125                process_manager,
126                #[cfg(feature = "enterprise")]
127                extra_information_table_factories,
128            },
129            cache_registry,
130            procedure_manager,
131        })
132    }
133}