catalog/kvbackend/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_catalog::consts::DEFAULT_CATALOG_NAME;
18use common_meta::cache::LayeredCacheRegistryRef;
19use common_meta::key::flow::FlowMetadataManager;
20use common_meta::key::TableMetadataManager;
21use common_meta::kv_backend::KvBackendRef;
22use common_procedure::ProcedureManagerRef;
23use moka::sync::Cache;
24use partition::manager::PartitionRuleManager;
25
26#[cfg(feature = "enterprise")]
27use crate::information_schema::InformationSchemaTableFactoryRef;
28use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
29use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
30use crate::kvbackend::KvBackendCatalogManager;
31use crate::process_manager::ProcessManagerRef;
32use crate::system_schema::pg_catalog::PGCatalogProvider;
33
34pub struct KvBackendCatalogManagerBuilder {
35    information_extension: InformationExtensionRef,
36    backend: KvBackendRef,
37    cache_registry: LayeredCacheRegistryRef,
38    procedure_manager: Option<ProcedureManagerRef>,
39    process_manager: Option<ProcessManagerRef>,
40    #[cfg(feature = "enterprise")]
41    extra_information_table_factories:
42        std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
43}
44
45impl KvBackendCatalogManagerBuilder {
46    pub fn new(
47        information_extension: InformationExtensionRef,
48        backend: KvBackendRef,
49        cache_registry: LayeredCacheRegistryRef,
50    ) -> Self {
51        Self {
52            information_extension,
53            backend,
54            cache_registry,
55            procedure_manager: None,
56            process_manager: None,
57            #[cfg(feature = "enterprise")]
58            extra_information_table_factories: std::collections::HashMap::new(),
59        }
60    }
61
62    pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
63        self.procedure_manager = Some(procedure_manager);
64        self
65    }
66
67    pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
68        self.process_manager = Some(process_manager);
69        self
70    }
71
72    /// Sets the extra information tables.
73    #[cfg(feature = "enterprise")]
74    pub fn with_extra_information_table_factories(
75        mut self,
76        factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
77    ) -> Self {
78        self.extra_information_table_factories = factories;
79        self
80    }
81
82    pub fn build(self) -> Arc<KvBackendCatalogManager> {
83        let Self {
84            information_extension,
85            backend,
86            cache_registry,
87            procedure_manager,
88            process_manager,
89            #[cfg(feature = "enterprise")]
90            extra_information_table_factories,
91        } = self;
92        Arc::new_cyclic(|me| KvBackendCatalogManager {
93            information_extension,
94            partition_manager: Arc::new(PartitionRuleManager::new(
95                backend.clone(),
96                cache_registry
97                    .get()
98                    .expect("Failed to get table_route_cache"),
99            )),
100            table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
101            system_catalog: SystemCatalog {
102                catalog_manager: me.clone(),
103                catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
104                pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
105                information_schema_provider: {
106                    let provider = InformationSchemaProvider::new(
107                        DEFAULT_CATALOG_NAME.to_string(),
108                        me.clone(),
109                        Arc::new(FlowMetadataManager::new(backend.clone())),
110                        process_manager.clone(),
111                        backend.clone(),
112                    );
113                    #[cfg(feature = "enterprise")]
114                    let provider = provider
115                        .with_extra_table_factories(extra_information_table_factories.clone());
116                    Arc::new(provider)
117                },
118                pg_catalog_provider: Arc::new(PGCatalogProvider::new(
119                    DEFAULT_CATALOG_NAME.to_string(),
120                    me.clone(),
121                )),
122                backend,
123                process_manager,
124                #[cfg(feature = "enterprise")]
125                extra_information_table_factories,
126            },
127            cache_registry,
128            procedure_manager,
129        })
130    }
131}