1use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37 ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38 StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use pipeline::pipeline_operator::PipelineOperator;
44use query::QueryEngineFactory;
45use query::region_query::RegionQueryHandlerFactoryRef;
46use snafu::{OptionExt, ResultExt};
47
48use crate::error::{self, ExternalSnafu, Result};
49use crate::events::EventHandlerImpl;
50use crate::frontend::FrontendOptions;
51use crate::instance::Instance;
52use crate::instance::region_query::FrontendRegionQueryHandler;
53
54pub struct FrontendBuilder {
56 options: FrontendOptions,
57 kv_backend: KvBackendRef,
58 layered_cache_registry: LayeredCacheRegistryRef,
59 local_cache_invalidator: Option<CacheInvalidatorRef>,
60 catalog_manager: CatalogManagerRef,
61 node_manager: NodeManagerRef,
62 plugins: Option<Plugins>,
63 procedure_executor: ProcedureExecutorRef,
64 process_manager: ProcessManagerRef,
65}
66
67impl FrontendBuilder {
68 #[allow(clippy::too_many_arguments)]
69 pub fn new(
70 options: FrontendOptions,
71 kv_backend: KvBackendRef,
72 layered_cache_registry: LayeredCacheRegistryRef,
73 catalog_manager: CatalogManagerRef,
74 node_manager: NodeManagerRef,
75 procedure_executor: ProcedureExecutorRef,
76 process_manager: ProcessManagerRef,
77 ) -> Self {
78 Self {
79 options,
80 kv_backend,
81 layered_cache_registry,
82 local_cache_invalidator: None,
83 catalog_manager,
84 node_manager,
85 plugins: None,
86 procedure_executor,
87 process_manager,
88 }
89 }
90
91 #[cfg(test)]
92 pub(crate) fn new_test(
93 options: &FrontendOptions,
94 meta_client: meta_client::MetaClientRef,
95 ) -> Self {
96 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
97 use common_meta::cache::LayeredCacheRegistryBuilder;
98 use common_meta::kv_backend::memory::MemoryKvBackend;
99
100 let kv_backend = Arc::new(MemoryKvBackend::new());
101
102 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
104 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
105 let layered_cache_registry = Arc::new(
106 with_default_composite_cache_registry(
107 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
108 )
109 .unwrap()
110 .build(),
111 );
112
113 Self::new(
114 options.clone(),
115 kv_backend,
116 layered_cache_registry,
117 catalog::memory::MemoryCatalogManager::with_default_setup(),
118 Arc::new(client::client_manager::NodeClients::default()),
119 meta_client,
120 Arc::new(catalog::process_manager::ProcessManager::new(
121 "".to_string(),
122 None,
123 )),
124 )
125 }
126
127 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
128 Self {
129 local_cache_invalidator: Some(cache_invalidator),
130 ..self
131 }
132 }
133
134 pub fn with_plugin(self, plugins: Plugins) -> Self {
135 Self {
136 plugins: Some(plugins),
137 ..self
138 }
139 }
140
141 pub async fn try_build(self) -> Result<Instance> {
142 let kv_backend = self.kv_backend;
143 let node_manager = self.node_manager;
144 let plugins = self.plugins.unwrap_or_default();
145 let process_manager = self.process_manager;
146 let table_route_cache: TableRouteCacheRef =
147 self.layered_cache_registry
148 .get()
149 .context(error::CacheRequiredSnafu {
150 name: TABLE_ROUTE_CACHE_NAME,
151 })?;
152 let partition_info_cache: PartitionInfoCacheRef = self
153 .layered_cache_registry
154 .get()
155 .context(error::CacheRequiredSnafu {
156 name: PARTITION_INFO_CACHE_NAME,
157 })?;
158 let partition_manager = Arc::new(PartitionRuleManager::new(
159 kv_backend.clone(),
160 table_route_cache.clone(),
161 partition_info_cache.clone(),
162 ));
163
164 let local_cache_invalidator = self
165 .local_cache_invalidator
166 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
167
168 let region_query_handler =
169 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
170 factory.build(partition_manager.clone(), node_manager.clone())
171 } else {
172 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
173 };
174
175 let table_flownode_cache =
176 self.layered_cache_registry
177 .get()
178 .context(error::CacheRequiredSnafu {
179 name: TABLE_FLOWNODE_SET_CACHE_NAME,
180 })?;
181
182 let inserter = Arc::new(Inserter::new(
183 self.catalog_manager.clone(),
184 partition_manager.clone(),
185 node_manager.clone(),
186 table_flownode_cache,
187 ));
188 let deleter = Arc::new(Deleter::new(
189 self.catalog_manager.clone(),
190 partition_manager.clone(),
191 node_manager.clone(),
192 ));
193 let requester = Arc::new(Requester::new(
194 self.catalog_manager.clone(),
195 partition_manager.clone(),
196 node_manager.clone(),
197 ));
198 let table_mutation_handler = Arc::new(TableMutationOperator::new(
199 inserter.clone(),
200 deleter.clone(),
201 requester,
202 ));
203
204 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
205 self.procedure_executor.clone(),
206 self.catalog_manager.clone(),
207 ));
208
209 let flow_metadata_manager: Arc<FlowMetadataManager> =
210 Arc::new(FlowMetadataManager::new(kv_backend.clone()));
211 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
212
213 let query_engine = QueryEngineFactory::new_with_plugins(
214 self.catalog_manager.clone(),
215 Some(partition_manager.clone()),
216 Some(region_query_handler.clone()),
217 Some(table_mutation_handler),
218 Some(procedure_service_handler),
219 Some(Arc::new(flow_service)),
220 true,
221 plugins.clone(),
222 self.options.query.clone(),
223 )
224 .query_engine();
225
226 let statement_executor = StatementExecutor::new(
227 self.catalog_manager.clone(),
228 query_engine.clone(),
229 self.procedure_executor,
230 kv_backend.clone(),
231 local_cache_invalidator,
232 inserter.clone(),
233 partition_manager,
234 Some(process_manager.clone()),
235 );
236
237 let statement_executor =
238 if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
239 let ctx = ExecutorConfigureContext {
240 kv_backend: kv_backend.clone(),
241 };
242 configurator
243 .configure(statement_executor, ctx)
244 .await
245 .context(ExternalSnafu)?
246 } else {
247 statement_executor
248 };
249
250 let statement_executor = Arc::new(statement_executor);
251
252 let pipeline_operator = Arc::new(PipelineOperator::new(
253 inserter.clone(),
254 statement_executor.clone(),
255 self.catalog_manager.clone(),
256 query_engine.clone(),
257 ));
258
259 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
260
261 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
262 statement_executor.clone(),
263 self.options.slow_query.ttl,
264 self.options.event_recorder.ttl,
265 ))));
266
267 Ok(Instance {
268 catalog_manager: self.catalog_manager,
269 pipeline_operator,
270 statement_executor,
271 query_engine,
272 plugins,
273 inserter,
274 deleter,
275 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
276 event_recorder: Some(event_recorder),
277 process_manager,
278 otlp_metrics_table_legacy_cache: DashMap::new(),
279 slow_query_options: self.options.slow_query.clone(),
280 suspend: Arc::new(AtomicBool::new(false)),
281 })
282 }
283}