frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37    ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38    StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::manager::PartitionRuleManager;
42use pipeline::pipeline_operator::PipelineOperator;
43use query::QueryEngineFactory;
44use query::region_query::RegionQueryHandlerFactoryRef;
45use snafu::{OptionExt, ResultExt};
46
47use crate::error::{self, ExternalSnafu, Result};
48use crate::events::EventHandlerImpl;
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51use crate::instance::region_query::FrontendRegionQueryHandler;
52use crate::limiter::Limiter;
53
54/// The frontend [`Instance`] builder.
55pub struct FrontendBuilder {
56    options: FrontendOptions,
57    kv_backend: KvBackendRef,
58    layered_cache_registry: LayeredCacheRegistryRef,
59    local_cache_invalidator: Option<CacheInvalidatorRef>,
60    catalog_manager: CatalogManagerRef,
61    node_manager: NodeManagerRef,
62    plugins: Option<Plugins>,
63    procedure_executor: ProcedureExecutorRef,
64    process_manager: ProcessManagerRef,
65}
66
67impl FrontendBuilder {
68    #[allow(clippy::too_many_arguments)]
69    pub fn new(
70        options: FrontendOptions,
71        kv_backend: KvBackendRef,
72        layered_cache_registry: LayeredCacheRegistryRef,
73        catalog_manager: CatalogManagerRef,
74        node_manager: NodeManagerRef,
75        procedure_executor: ProcedureExecutorRef,
76        process_manager: ProcessManagerRef,
77    ) -> Self {
78        Self {
79            options,
80            kv_backend,
81            layered_cache_registry,
82            local_cache_invalidator: None,
83            catalog_manager,
84            node_manager,
85            plugins: None,
86            procedure_executor,
87            process_manager,
88        }
89    }
90
91    #[cfg(test)]
92    pub(crate) fn new_test(
93        options: &FrontendOptions,
94        meta_client: meta_client::MetaClientRef,
95    ) -> Self {
96        let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
97
98        let layered_cache_registry = Arc::new(
99            common_meta::cache::LayeredCacheRegistryBuilder::default()
100                .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone()))
101                .build(),
102        );
103
104        Self::new(
105            options.clone(),
106            kv_backend,
107            layered_cache_registry,
108            catalog::memory::MemoryCatalogManager::with_default_setup(),
109            Arc::new(client::client_manager::NodeClients::default()),
110            meta_client,
111            Arc::new(catalog::process_manager::ProcessManager::new(
112                "".to_string(),
113                None,
114            )),
115        )
116    }
117
118    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
119        Self {
120            local_cache_invalidator: Some(cache_invalidator),
121            ..self
122        }
123    }
124
125    pub fn with_plugin(self, plugins: Plugins) -> Self {
126        Self {
127            plugins: Some(plugins),
128            ..self
129        }
130    }
131
132    pub async fn try_build(self) -> Result<Instance> {
133        let kv_backend = self.kv_backend;
134        let node_manager = self.node_manager;
135        let plugins = self.plugins.unwrap_or_default();
136        let process_manager = self.process_manager;
137        let table_route_cache: TableRouteCacheRef =
138            self.layered_cache_registry
139                .get()
140                .context(error::CacheRequiredSnafu {
141                    name: TABLE_ROUTE_CACHE_NAME,
142                })?;
143        let partition_manager = Arc::new(PartitionRuleManager::new(
144            kv_backend.clone(),
145            table_route_cache.clone(),
146        ));
147
148        let local_cache_invalidator = self
149            .local_cache_invalidator
150            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
151
152        let region_query_handler =
153            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
154                factory.build(partition_manager.clone(), node_manager.clone())
155            } else {
156                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
157            };
158
159        let table_flownode_cache =
160            self.layered_cache_registry
161                .get()
162                .context(error::CacheRequiredSnafu {
163                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
164                })?;
165
166        let inserter = Arc::new(Inserter::new(
167            self.catalog_manager.clone(),
168            partition_manager.clone(),
169            node_manager.clone(),
170            table_flownode_cache,
171        ));
172        let deleter = Arc::new(Deleter::new(
173            self.catalog_manager.clone(),
174            partition_manager.clone(),
175            node_manager.clone(),
176        ));
177        let requester = Arc::new(Requester::new(
178            self.catalog_manager.clone(),
179            partition_manager.clone(),
180            node_manager.clone(),
181        ));
182        let table_mutation_handler = Arc::new(TableMutationOperator::new(
183            inserter.clone(),
184            deleter.clone(),
185            requester,
186        ));
187
188        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
189            self.procedure_executor.clone(),
190            self.catalog_manager.clone(),
191        ));
192
193        let flow_metadata_manager: Arc<FlowMetadataManager> =
194            Arc::new(FlowMetadataManager::new(kv_backend.clone()));
195        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
196
197        let query_engine = QueryEngineFactory::new_with_plugins(
198            self.catalog_manager.clone(),
199            Some(partition_manager.clone()),
200            Some(region_query_handler.clone()),
201            Some(table_mutation_handler),
202            Some(procedure_service_handler),
203            Some(Arc::new(flow_service)),
204            true,
205            plugins.clone(),
206            self.options.query.clone(),
207        )
208        .query_engine();
209
210        let statement_executor = StatementExecutor::new(
211            self.catalog_manager.clone(),
212            query_engine.clone(),
213            self.procedure_executor,
214            kv_backend.clone(),
215            local_cache_invalidator,
216            inserter.clone(),
217            table_route_cache,
218            Some(process_manager.clone()),
219        );
220
221        let statement_executor =
222            if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
223                let ctx = ExecutorConfigureContext {
224                    kv_backend: kv_backend.clone(),
225                };
226                configurator
227                    .configure(statement_executor, ctx)
228                    .await
229                    .context(ExternalSnafu)?
230            } else {
231                statement_executor
232            };
233
234        let statement_executor = Arc::new(statement_executor);
235
236        let pipeline_operator = Arc::new(PipelineOperator::new(
237            inserter.clone(),
238            statement_executor.clone(),
239            self.catalog_manager.clone(),
240            query_engine.clone(),
241        ));
242
243        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
244
245        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
246            statement_executor.clone(),
247            self.options.slow_query.ttl,
248            self.options.event_recorder.ttl,
249        ))));
250
251        // Create the limiter if the max_in_flight_write_bytes is set.
252        let limiter = self
253            .options
254            .max_in_flight_write_bytes
255            .map(|max_in_flight_write_bytes| {
256                Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
257            });
258
259        Ok(Instance {
260            catalog_manager: self.catalog_manager,
261            pipeline_operator,
262            statement_executor,
263            query_engine,
264            plugins,
265            inserter,
266            deleter,
267            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
268            event_recorder: Some(event_recorder),
269            limiter,
270            process_manager,
271            otlp_metrics_table_legacy_cache: DashMap::new(),
272            slow_query_options: self.options.slow_query.clone(),
273            suspend: Arc::new(AtomicBool::new(false)),
274        })
275    }
276}