frontend/instance/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::CatalogManagerRef;
19use common_base::Plugins;
20use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
21use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
22use common_meta::ddl::ProcedureExecutorRef;
23use common_meta::key::flow::FlowMetadataManager;
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::KvBackendRef;
26use common_meta::node_manager::NodeManagerRef;
27use operator::delete::Deleter;
28use operator::flow::FlowServiceOperator;
29use operator::insert::Inserter;
30use operator::procedure::ProcedureServiceOperator;
31use operator::request::Requester;
32use operator::statement::{StatementExecutor, StatementExecutorRef};
33use operator::table::TableMutationOperator;
34use partition::manager::PartitionRuleManager;
35use pipeline::pipeline_operator::PipelineOperator;
36use query::region_query::RegionQueryHandlerFactoryRef;
37use query::stats::StatementStatistics;
38use query::QueryEngineFactory;
39use snafu::OptionExt;
40
41use crate::error::{self, Result};
42use crate::frontend::FrontendOptions;
43use crate::instance::region_query::FrontendRegionQueryHandler;
44use crate::instance::Instance;
45use crate::limiter::Limiter;
46
47/// The frontend [`Instance`] builder.
48pub struct FrontendBuilder {
49    options: FrontendOptions,
50    kv_backend: KvBackendRef,
51    layered_cache_registry: LayeredCacheRegistryRef,
52    local_cache_invalidator: Option<CacheInvalidatorRef>,
53    catalog_manager: CatalogManagerRef,
54    node_manager: NodeManagerRef,
55    plugins: Option<Plugins>,
56    procedure_executor: ProcedureExecutorRef,
57    stats: StatementStatistics,
58}
59
60impl FrontendBuilder {
61    pub fn new(
62        options: FrontendOptions,
63        kv_backend: KvBackendRef,
64        layered_cache_registry: LayeredCacheRegistryRef,
65        catalog_manager: CatalogManagerRef,
66        node_manager: NodeManagerRef,
67        procedure_executor: ProcedureExecutorRef,
68        stats: StatementStatistics,
69    ) -> Self {
70        Self {
71            options,
72            kv_backend,
73            layered_cache_registry,
74            local_cache_invalidator: None,
75            catalog_manager,
76            node_manager,
77            plugins: None,
78            procedure_executor,
79            stats,
80        }
81    }
82
83    pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
84        Self {
85            local_cache_invalidator: Some(cache_invalidator),
86            ..self
87        }
88    }
89
90    pub fn with_plugin(self, plugins: Plugins) -> Self {
91        Self {
92            plugins: Some(plugins),
93            ..self
94        }
95    }
96
97    pub async fn try_build(self) -> Result<Instance> {
98        let kv_backend = self.kv_backend;
99        let node_manager = self.node_manager;
100        let plugins = self.plugins.unwrap_or_default();
101
102        let table_route_cache: TableRouteCacheRef =
103            self.layered_cache_registry
104                .get()
105                .context(error::CacheRequiredSnafu {
106                    name: TABLE_ROUTE_CACHE_NAME,
107                })?;
108        let partition_manager = Arc::new(PartitionRuleManager::new(
109            kv_backend.clone(),
110            table_route_cache.clone(),
111        ));
112
113        let local_cache_invalidator = self
114            .local_cache_invalidator
115            .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
116
117        let region_query_handler =
118            if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
119                factory.build(partition_manager.clone(), node_manager.clone())
120            } else {
121                FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
122            };
123
124        let table_flownode_cache =
125            self.layered_cache_registry
126                .get()
127                .context(error::CacheRequiredSnafu {
128                    name: TABLE_FLOWNODE_SET_CACHE_NAME,
129                })?;
130
131        let inserter = Arc::new(Inserter::new(
132            self.catalog_manager.clone(),
133            partition_manager.clone(),
134            node_manager.clone(),
135            table_flownode_cache,
136        ));
137        let deleter = Arc::new(Deleter::new(
138            self.catalog_manager.clone(),
139            partition_manager.clone(),
140            node_manager.clone(),
141        ));
142        let requester = Arc::new(Requester::new(
143            self.catalog_manager.clone(),
144            partition_manager,
145            node_manager.clone(),
146        ));
147        let table_mutation_handler = Arc::new(TableMutationOperator::new(
148            inserter.clone(),
149            deleter.clone(),
150            requester,
151        ));
152
153        let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
154            self.procedure_executor.clone(),
155            self.catalog_manager.clone(),
156        ));
157
158        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
159        let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
160
161        let query_engine = QueryEngineFactory::new_with_plugins(
162            self.catalog_manager.clone(),
163            Some(region_query_handler.clone()),
164            Some(table_mutation_handler),
165            Some(procedure_service_handler),
166            Some(Arc::new(flow_service)),
167            true,
168            plugins.clone(),
169            self.options.query.clone(),
170        )
171        .query_engine();
172
173        let statement_executor = Arc::new(StatementExecutor::new(
174            self.catalog_manager.clone(),
175            query_engine.clone(),
176            self.procedure_executor,
177            kv_backend.clone(),
178            local_cache_invalidator,
179            inserter.clone(),
180            table_route_cache,
181        ));
182
183        let pipeline_operator = Arc::new(PipelineOperator::new(
184            inserter.clone(),
185            statement_executor.clone(),
186            self.catalog_manager.clone(),
187            query_engine.clone(),
188        ));
189
190        plugins.insert::<StatementExecutorRef>(statement_executor.clone());
191
192        // Create the limiter if the max_in_flight_write_bytes is set.
193        let limiter = self
194            .options
195            .max_in_flight_write_bytes
196            .map(|max_in_flight_write_bytes| {
197                Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
198            });
199
200        Ok(Instance {
201            catalog_manager: self.catalog_manager,
202            pipeline_operator,
203            statement_executor,
204            query_engine,
205            plugins,
206            inserter,
207            deleter,
208            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
209            stats: self.stats,
210            limiter,
211        })
212    }
213}