frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::CatalogManagerRef;
19use common_base::Plugins;
20use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
21use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
22use common_meta::ddl::ProcedureExecutorRef;
23use common_meta::key::flow::FlowMetadataManager;
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::KvBackendRef;
26use common_meta::node_manager::NodeManagerRef;
27use operator::delete::Deleter;
28use operator::flow::FlowServiceOperator;
29use operator::insert::Inserter;
30use operator::procedure::ProcedureServiceOperator;
31use operator::request::Requester;
32use operator::statement::{StatementExecutor, StatementExecutorRef};
33use operator::table::TableMutationOperator;
34use partition::manager::PartitionRuleManager;
35use pipeline::pipeline_operator::PipelineOperator;
36use query::region_query::RegionQueryHandlerFactoryRef;
37use query::QueryEngineFactory;
38use snafu::OptionExt;
39
40use crate::error::{self, Result};
41use crate::frontend::FrontendOptions;
42use crate::instance::region_query::FrontendRegionQueryHandler;
43use crate::instance::Instance;
44use crate::limiter::Limiter;
45use crate::slow_query_recorder::SlowQueryRecorder;
46
47/// The frontend [`Instance`] builder.
48pub struct FrontendBuilder {
49    options: FrontendOptions,
50    kv_backend: KvBackendRef,
51    layered_cache_registry: LayeredCacheRegistryRef,
52    local_cache_invalidator: Option<CacheInvalidatorRef>,
53    catalog_manager: CatalogManagerRef,
54    node_manager: NodeManagerRef,
55    plugins: Option<Plugins>,
56    procedure_executor: ProcedureExecutorRef,
57}
58
59impl FrontendBuilder {
60    pub fn new(
61        options: FrontendOptions,
62        kv_backend: KvBackendRef,
63        layered_cache_registry: LayeredCacheRegistryRef,
64        catalog_manager: CatalogManagerRef,
65        node_manager: NodeManagerRef,
66        procedure_executor: ProcedureExecutorRef,
67    ) -> Self {
68        Self {
69            options,
70            kv_backend,
71            layered_cache_registry,
72            local_cache_invalidator: None,
73            catalog_manager,
74            node_manager,
75            plugins: None,
76            procedure_executor,
77        }
78    }
79
80    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
81        Self {
82            local_cache_invalidator: Some(cache_invalidator),
83            ..self
84        }
85    }
86
87    pub fn with_plugin(self, plugins: Plugins) -> Self {
88        Self {
89            plugins: Some(plugins),
90            ..self
91        }
92    }
93
94    pub async fn try_build(self) -> Result<Instance> {
95        let kv_backend = self.kv_backend;
96        let node_manager = self.node_manager;
97        let plugins = self.plugins.unwrap_or_default();
98
99        let table_route_cache: TableRouteCacheRef =
100            self.layered_cache_registry
101                .get()
102                .context(error::CacheRequiredSnafu {
103                    name: TABLE_ROUTE_CACHE_NAME,
104                })?;
105        let partition_manager = Arc::new(PartitionRuleManager::new(
106            kv_backend.clone(),
107            table_route_cache.clone(),
108        ));
109
110        let local_cache_invalidator = self
111            .local_cache_invalidator
112            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
113
114        let region_query_handler =
115            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
116                factory.build(partition_manager.clone(), node_manager.clone())
117            } else {
118                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
119            };
120
121        let table_flownode_cache =
122            self.layered_cache_registry
123                .get()
124                .context(error::CacheRequiredSnafu {
125                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
126                })?;
127
128        let inserter = Arc::new(Inserter::new(
129            self.catalog_manager.clone(),
130            partition_manager.clone(),
131            node_manager.clone(),
132            table_flownode_cache,
133        ));
134        let deleter = Arc::new(Deleter::new(
135            self.catalog_manager.clone(),
136            partition_manager.clone(),
137            node_manager.clone(),
138        ));
139        let requester = Arc::new(Requester::new(
140            self.catalog_manager.clone(),
141            partition_manager,
142            node_manager.clone(),
143        ));
144        let table_mutation_handler = Arc::new(TableMutationOperator::new(
145            inserter.clone(),
146            deleter.clone(),
147            requester,
148        ));
149
150        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
151            self.procedure_executor.clone(),
152            self.catalog_manager.clone(),
153        ));
154
155        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
156        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
157
158        let query_engine = QueryEngineFactory::new_with_plugins(
159            self.catalog_manager.clone(),
160            Some(region_query_handler.clone()),
161            Some(table_mutation_handler),
162            Some(procedure_service_handler),
163            Some(Arc::new(flow_service)),
164            true,
165            plugins.clone(),
166            self.options.query.clone(),
167        )
168        .query_engine();
169
170        let statement_executor = Arc::new(StatementExecutor::new(
171            self.catalog_manager.clone(),
172            query_engine.clone(),
173            self.procedure_executor,
174            kv_backend.clone(),
175            local_cache_invalidator,
176            inserter.clone(),
177            table_route_cache,
178        ));
179
180        let pipeline_operator = Arc::new(PipelineOperator::new(
181            inserter.clone(),
182            statement_executor.clone(),
183            self.catalog_manager.clone(),
184            query_engine.clone(),
185        ));
186
187        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
188
189        let slow_query_recorder = self.options.slow_query.and_then(|opts| {
190            opts.enable.then(|| {
191                SlowQueryRecorder::new(
192                    opts.clone(),
193                    inserter.clone(),
194                    statement_executor.clone(),
195                    self.catalog_manager.clone(),
196                )
197            })
198        });
199
200        // Create the limiter if the max_in_flight_write_bytes is set.
201        let limiter = self
202            .options
203            .max_in_flight_write_bytes
204            .map(|max_in_flight_write_bytes| {
205                Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
206            });
207
208        Ok(Instance {
209            catalog_manager: self.catalog_manager,
210            pipeline_operator,
211            statement_executor,
212            query_engine,
213            plugins,
214            inserter,
215            deleter,
216            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
217            slow_query_recorder,
218            limiter,
219        })
220    }
221}