frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::process_manager::ProcessManagerRef;
19use catalog::CatalogManagerRef;
20use common_base::Plugins;
21use common_event_recorder::EventRecorderImpl;
22use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
23use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
24use common_meta::key::flow::FlowMetadataManager;
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::node_manager::NodeManagerRef;
28use common_meta::procedure_executor::ProcedureExecutorRef;
29use dashmap::DashMap;
30use operator::delete::Deleter;
31use operator::flow::FlowServiceOperator;
32use operator::insert::Inserter;
33use operator::procedure::ProcedureServiceOperator;
34use operator::request::Requester;
35use operator::statement::{StatementExecutor, StatementExecutorRef};
36use operator::table::TableMutationOperator;
37use partition::manager::PartitionRuleManager;
38use pipeline::pipeline_operator::PipelineOperator;
39use query::region_query::RegionQueryHandlerFactoryRef;
40use query::QueryEngineFactory;
41use snafu::OptionExt;
42
43use crate::error::{self, Result};
44use crate::events::EventHandlerImpl;
45use crate::frontend::FrontendOptions;
46use crate::instance::region_query::FrontendRegionQueryHandler;
47use crate::instance::Instance;
48use crate::limiter::Limiter;
49
50/// The frontend [`Instance`] builder.
51pub struct FrontendBuilder {
52    options: FrontendOptions,
53    kv_backend: KvBackendRef,
54    layered_cache_registry: LayeredCacheRegistryRef,
55    local_cache_invalidator: Option<CacheInvalidatorRef>,
56    catalog_manager: CatalogManagerRef,
57    node_manager: NodeManagerRef,
58    plugins: Option<Plugins>,
59    procedure_executor: ProcedureExecutorRef,
60    process_manager: ProcessManagerRef,
61}
62
63impl FrontendBuilder {
64    #[allow(clippy::too_many_arguments)]
65    pub fn new(
66        options: FrontendOptions,
67        kv_backend: KvBackendRef,
68        layered_cache_registry: LayeredCacheRegistryRef,
69        catalog_manager: CatalogManagerRef,
70        node_manager: NodeManagerRef,
71        procedure_executor: ProcedureExecutorRef,
72        process_manager: ProcessManagerRef,
73    ) -> Self {
74        Self {
75            options,
76            kv_backend,
77            layered_cache_registry,
78            local_cache_invalidator: None,
79            catalog_manager,
80            node_manager,
81            plugins: None,
82            procedure_executor,
83            process_manager,
84        }
85    }
86
87    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
88        Self {
89            local_cache_invalidator: Some(cache_invalidator),
90            ..self
91        }
92    }
93
94    pub fn with_plugin(self, plugins: Plugins) -> Self {
95        Self {
96            plugins: Some(plugins),
97            ..self
98        }
99    }
100
101    pub async fn try_build(self) -> Result<Instance> {
102        let kv_backend = self.kv_backend;
103        let node_manager = self.node_manager;
104        let plugins = self.plugins.unwrap_or_default();
105        let process_manager = self.process_manager;
106        let table_route_cache: TableRouteCacheRef =
107            self.layered_cache_registry
108                .get()
109                .context(error::CacheRequiredSnafu {
110                    name: TABLE_ROUTE_CACHE_NAME,
111                })?;
112        let partition_manager = Arc::new(PartitionRuleManager::new(
113            kv_backend.clone(),
114            table_route_cache.clone(),
115        ));
116
117        let local_cache_invalidator = self
118            .local_cache_invalidator
119            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
120
121        let region_query_handler =
122            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
123                factory.build(partition_manager.clone(), node_manager.clone())
124            } else {
125                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
126            };
127
128        let table_flownode_cache =
129            self.layered_cache_registry
130                .get()
131                .context(error::CacheRequiredSnafu {
132                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
133                })?;
134
135        let inserter = Arc::new(Inserter::new(
136            self.catalog_manager.clone(),
137            partition_manager.clone(),
138            node_manager.clone(),
139            table_flownode_cache,
140        ));
141        let deleter = Arc::new(Deleter::new(
142            self.catalog_manager.clone(),
143            partition_manager.clone(),
144            node_manager.clone(),
145        ));
146        let requester = Arc::new(Requester::new(
147            self.catalog_manager.clone(),
148            partition_manager.clone(),
149            node_manager.clone(),
150        ));
151        let table_mutation_handler = Arc::new(TableMutationOperator::new(
152            inserter.clone(),
153            deleter.clone(),
154            requester,
155        ));
156
157        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
158            self.procedure_executor.clone(),
159            self.catalog_manager.clone(),
160        ));
161
162        let flow_metadata_manager: Arc<FlowMetadataManager> =
163            Arc::new(FlowMetadataManager::new(kv_backend.clone()));
164        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
165
166        let query_engine = QueryEngineFactory::new_with_plugins(
167            self.catalog_manager.clone(),
168            Some(partition_manager.clone()),
169            Some(region_query_handler.clone()),
170            Some(table_mutation_handler),
171            Some(procedure_service_handler),
172            Some(Arc::new(flow_service)),
173            true,
174            plugins.clone(),
175            self.options.query.clone(),
176        )
177        .query_engine();
178
179        let statement_executor = Arc::new(StatementExecutor::new(
180            self.catalog_manager.clone(),
181            query_engine.clone(),
182            self.procedure_executor,
183            kv_backend.clone(),
184            local_cache_invalidator,
185            inserter.clone(),
186            table_route_cache,
187            Some(process_manager.clone()),
188        ));
189
190        let pipeline_operator = Arc::new(PipelineOperator::new(
191            inserter.clone(),
192            statement_executor.clone(),
193            self.catalog_manager.clone(),
194            query_engine.clone(),
195        ));
196
197        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
198
199        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
200            statement_executor.clone(),
201            self.options.slow_query.ttl,
202            self.options.event_recorder.ttl,
203        ))));
204
205        // Create the limiter if the max_in_flight_write_bytes is set.
206        let limiter = self
207            .options
208            .max_in_flight_write_bytes
209            .map(|max_in_flight_write_bytes| {
210                Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
211            });
212
213        Ok(Instance {
214            catalog_manager: self.catalog_manager,
215            pipeline_operator,
216            statement_executor,
217            query_engine,
218            plugins,
219            inserter,
220            deleter,
221            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
222            event_recorder: Some(event_recorder),
223            limiter,
224            process_manager,
225            otlp_metrics_table_legacy_cache: DashMap::new(),
226            slow_query_options: self.options.slow_query.clone(),
227        })
228    }
229}