frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37    ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38    StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use pipeline::pipeline_operator::PipelineOperator;
44use query::QueryEngineFactory;
45use query::region_query::RegionQueryHandlerFactoryRef;
46use snafu::{OptionExt, ResultExt};
47
48use crate::error::{self, ExternalSnafu, Result};
49use crate::events::EventHandlerImpl;
50use crate::frontend::FrontendOptions;
51use crate::instance::Instance;
52use crate::instance::region_query::FrontendRegionQueryHandler;
53
54/// The frontend [`Instance`] builder.
55pub struct FrontendBuilder {
56    options: FrontendOptions,
57    kv_backend: KvBackendRef,
58    layered_cache_registry: LayeredCacheRegistryRef,
59    local_cache_invalidator: Option<CacheInvalidatorRef>,
60    catalog_manager: CatalogManagerRef,
61    node_manager: NodeManagerRef,
62    plugins: Option<Plugins>,
63    procedure_executor: ProcedureExecutorRef,
64    process_manager: ProcessManagerRef,
65}
66
67impl FrontendBuilder {
68    #[allow(clippy::too_many_arguments)]
69    pub fn new(
70        options: FrontendOptions,
71        kv_backend: KvBackendRef,
72        layered_cache_registry: LayeredCacheRegistryRef,
73        catalog_manager: CatalogManagerRef,
74        node_manager: NodeManagerRef,
75        procedure_executor: ProcedureExecutorRef,
76        process_manager: ProcessManagerRef,
77    ) -> Self {
78        Self {
79            options,
80            kv_backend,
81            layered_cache_registry,
82            local_cache_invalidator: None,
83            catalog_manager,
84            node_manager,
85            plugins: None,
86            procedure_executor,
87            process_manager,
88        }
89    }
90
91    #[cfg(test)]
92    pub(crate) fn new_test(
93        options: &FrontendOptions,
94        meta_client: meta_client::MetaClientRef,
95    ) -> Self {
96        use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
97        use common_meta::cache::LayeredCacheRegistryBuilder;
98        use common_meta::kv_backend::memory::MemoryKvBackend;
99
100        let kv_backend = Arc::new(MemoryKvBackend::new());
101
102        // Builds cache registry
103        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
104        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
105        let layered_cache_registry = Arc::new(
106            with_default_composite_cache_registry(
107                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
108            )
109            .unwrap()
110            .build(),
111        );
112
113        Self::new(
114            options.clone(),
115            kv_backend,
116            layered_cache_registry,
117            catalog::memory::MemoryCatalogManager::with_default_setup(),
118            Arc::new(client::client_manager::NodeClients::default()),
119            meta_client,
120            Arc::new(catalog::process_manager::ProcessManager::new(
121                "".to_string(),
122                None,
123            )),
124        )
125    }
126
127    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
128        Self {
129            local_cache_invalidator: Some(cache_invalidator),
130            ..self
131        }
132    }
133
134    pub fn with_plugin(self, plugins: Plugins) -> Self {
135        Self {
136            plugins: Some(plugins),
137            ..self
138        }
139    }
140
141    pub async fn try_build(self) -> Result<Instance> {
142        let kv_backend = self.kv_backend;
143        let node_manager = self.node_manager;
144        let plugins = self.plugins.unwrap_or_default();
145        let process_manager = self.process_manager;
146        let table_route_cache: TableRouteCacheRef =
147            self.layered_cache_registry
148                .get()
149                .context(error::CacheRequiredSnafu {
150                    name: TABLE_ROUTE_CACHE_NAME,
151                })?;
152        let partition_info_cache: PartitionInfoCacheRef = self
153            .layered_cache_registry
154            .get()
155            .context(error::CacheRequiredSnafu {
156                name: PARTITION_INFO_CACHE_NAME,
157            })?;
158        let partition_manager = Arc::new(PartitionRuleManager::new(
159            kv_backend.clone(),
160            table_route_cache.clone(),
161            partition_info_cache.clone(),
162        ));
163
164        let local_cache_invalidator = self
165            .local_cache_invalidator
166            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
167
168        let region_query_handler =
169            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
170                factory.build(partition_manager.clone(), node_manager.clone())
171            } else {
172                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
173            };
174
175        let table_flownode_cache =
176            self.layered_cache_registry
177                .get()
178                .context(error::CacheRequiredSnafu {
179                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
180                })?;
181
182        let inserter = Arc::new(Inserter::new(
183            self.catalog_manager.clone(),
184            partition_manager.clone(),
185            node_manager.clone(),
186            table_flownode_cache,
187        ));
188        let deleter = Arc::new(Deleter::new(
189            self.catalog_manager.clone(),
190            partition_manager.clone(),
191            node_manager.clone(),
192        ));
193        let requester = Arc::new(Requester::new(
194            self.catalog_manager.clone(),
195            partition_manager.clone(),
196            node_manager.clone(),
197        ));
198        let table_mutation_handler = Arc::new(TableMutationOperator::new(
199            inserter.clone(),
200            deleter.clone(),
201            requester,
202        ));
203
204        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
205            self.procedure_executor.clone(),
206            self.catalog_manager.clone(),
207        ));
208
209        let flow_metadata_manager: Arc<FlowMetadataManager> =
210            Arc::new(FlowMetadataManager::new(kv_backend.clone()));
211        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
212
213        let query_engine = QueryEngineFactory::new_with_plugins(
214            self.catalog_manager.clone(),
215            Some(partition_manager.clone()),
216            Some(region_query_handler.clone()),
217            Some(table_mutation_handler),
218            Some(procedure_service_handler),
219            Some(Arc::new(flow_service)),
220            true,
221            plugins.clone(),
222            self.options.query.clone(),
223        )
224        .query_engine();
225
226        let statement_executor = StatementExecutor::new(
227            self.catalog_manager.clone(),
228            query_engine.clone(),
229            self.procedure_executor,
230            kv_backend.clone(),
231            local_cache_invalidator,
232            inserter.clone(),
233            partition_manager,
234            Some(process_manager.clone()),
235        );
236
237        let statement_executor =
238            if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
239                let ctx = ExecutorConfigureContext {
240                    kv_backend: kv_backend.clone(),
241                };
242                configurator
243                    .configure(statement_executor, ctx)
244                    .await
245                    .context(ExternalSnafu)?
246            } else {
247                statement_executor
248            };
249
250        let statement_executor = Arc::new(statement_executor);
251
252        let pipeline_operator = Arc::new(PipelineOperator::new(
253            inserter.clone(),
254            statement_executor.clone(),
255            self.catalog_manager.clone(),
256            query_engine.clone(),
257        ));
258
259        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
260
261        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
262            statement_executor.clone(),
263            self.options.slow_query.ttl,
264            self.options.event_recorder.ttl,
265        ))));
266
267        Ok(Instance {
268            catalog_manager: self.catalog_manager,
269            pipeline_operator,
270            statement_executor,
271            query_engine,
272            plugins,
273            inserter,
274            deleter,
275            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
276            event_recorder: Some(event_recorder),
277            process_manager,
278            otlp_metrics_table_legacy_cache: DashMap::new(),
279            slow_query_options: self.options.slow_query.clone(),
280            suspend: Arc::new(AtomicBool::new(false)),
281        })
282    }
283}