frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::process_manager::ProcessManagerRef;
19use catalog::CatalogManagerRef;
20use common_base::Plugins;
21use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
22use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
23use common_meta::ddl::ProcedureExecutorRef;
24use common_meta::key::flow::FlowMetadataManager;
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::node_manager::NodeManagerRef;
28use operator::delete::Deleter;
29use operator::flow::FlowServiceOperator;
30use operator::insert::Inserter;
31use operator::procedure::ProcedureServiceOperator;
32use operator::request::Requester;
33use operator::statement::{StatementExecutor, StatementExecutorRef};
34use operator::table::TableMutationOperator;
35use partition::manager::PartitionRuleManager;
36use pipeline::pipeline_operator::PipelineOperator;
37use query::region_query::RegionQueryHandlerFactoryRef;
38use query::QueryEngineFactory;
39use snafu::OptionExt;
40
41use crate::error::{self, Result};
42use crate::frontend::FrontendOptions;
43use crate::instance::region_query::FrontendRegionQueryHandler;
44use crate::instance::Instance;
45use crate::limiter::Limiter;
46use crate::slow_query_recorder::SlowQueryRecorder;
47
48/// The frontend [`Instance`] builder.
49pub struct FrontendBuilder {
50    options: FrontendOptions,
51    kv_backend: KvBackendRef,
52    layered_cache_registry: LayeredCacheRegistryRef,
53    local_cache_invalidator: Option<CacheInvalidatorRef>,
54    catalog_manager: CatalogManagerRef,
55    node_manager: NodeManagerRef,
56    plugins: Option<Plugins>,
57    procedure_executor: ProcedureExecutorRef,
58    process_manager: ProcessManagerRef,
59}
60
61impl FrontendBuilder {
62    #[allow(clippy::too_many_arguments)]
63    pub fn new(
64        options: FrontendOptions,
65        kv_backend: KvBackendRef,
66        layered_cache_registry: LayeredCacheRegistryRef,
67        catalog_manager: CatalogManagerRef,
68        node_manager: NodeManagerRef,
69        procedure_executor: ProcedureExecutorRef,
70        process_manager: ProcessManagerRef,
71    ) -> Self {
72        Self {
73            options,
74            kv_backend,
75            layered_cache_registry,
76            local_cache_invalidator: None,
77            catalog_manager,
78            node_manager,
79            plugins: None,
80            procedure_executor,
81            process_manager,
82        }
83    }
84
85    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
86        Self {
87            local_cache_invalidator: Some(cache_invalidator),
88            ..self
89        }
90    }
91
92    pub fn with_plugin(self, plugins: Plugins) -> Self {
93        Self {
94            plugins: Some(plugins),
95            ..self
96        }
97    }
98
99    pub async fn try_build(self) -> Result<Instance> {
100        let kv_backend = self.kv_backend;
101        let node_manager = self.node_manager;
102        let plugins = self.plugins.unwrap_or_default();
103        let process_manager = self.process_manager;
104        let table_route_cache: TableRouteCacheRef =
105            self.layered_cache_registry
106                .get()
107                .context(error::CacheRequiredSnafu {
108                    name: TABLE_ROUTE_CACHE_NAME,
109                })?;
110        let partition_manager = Arc::new(PartitionRuleManager::new(
111            kv_backend.clone(),
112            table_route_cache.clone(),
113        ));
114
115        let local_cache_invalidator = self
116            .local_cache_invalidator
117            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
118
119        let region_query_handler =
120            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
121                factory.build(partition_manager.clone(), node_manager.clone())
122            } else {
123                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
124            };
125
126        let table_flownode_cache =
127            self.layered_cache_registry
128                .get()
129                .context(error::CacheRequiredSnafu {
130                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
131                })?;
132
133        let inserter = Arc::new(Inserter::new(
134            self.catalog_manager.clone(),
135            partition_manager.clone(),
136            node_manager.clone(),
137            table_flownode_cache,
138        ));
139        let deleter = Arc::new(Deleter::new(
140            self.catalog_manager.clone(),
141            partition_manager.clone(),
142            node_manager.clone(),
143        ));
144        let requester = Arc::new(Requester::new(
145            self.catalog_manager.clone(),
146            partition_manager,
147            node_manager.clone(),
148        ));
149        let table_mutation_handler = Arc::new(TableMutationOperator::new(
150            inserter.clone(),
151            deleter.clone(),
152            requester,
153        ));
154
155        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
156            self.procedure_executor.clone(),
157            self.catalog_manager.clone(),
158        ));
159
160        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
161        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
162
163        let query_engine = QueryEngineFactory::new_with_plugins(
164            self.catalog_manager.clone(),
165            Some(region_query_handler.clone()),
166            Some(table_mutation_handler),
167            Some(procedure_service_handler),
168            Some(Arc::new(flow_service)),
169            true,
170            plugins.clone(),
171            self.options.query.clone(),
172        )
173        .query_engine();
174
175        let statement_executor = Arc::new(StatementExecutor::new(
176            self.catalog_manager.clone(),
177            query_engine.clone(),
178            self.procedure_executor,
179            kv_backend.clone(),
180            local_cache_invalidator,
181            inserter.clone(),
182            table_route_cache,
183            Some(process_manager.clone()),
184        ));
185
186        let pipeline_operator = Arc::new(PipelineOperator::new(
187            inserter.clone(),
188            statement_executor.clone(),
189            self.catalog_manager.clone(),
190            query_engine.clone(),
191        ));
192
193        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
194
195        let slow_query_recorder = self.options.slow_query.and_then(|opts| {
196            opts.enable.then(|| {
197                SlowQueryRecorder::new(
198                    opts.clone(),
199                    inserter.clone(),
200                    statement_executor.clone(),
201                    self.catalog_manager.clone(),
202                )
203            })
204        });
205
206        // Create the limiter if the max_in_flight_write_bytes is set.
207        let limiter = self
208            .options
209            .max_in_flight_write_bytes
210            .map(|max_in_flight_write_bytes| {
211                Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
212            });
213
214        Ok(Instance {
215            catalog_manager: self.catalog_manager,
216            pipeline_operator,
217            statement_executor,
218            query_engine,
219            plugins,
220            inserter,
221            deleter,
222            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
223            slow_query_recorder,
224            limiter,
225            process_manager,
226        })
227    }
228}