1use std::sync::Arc;
16use std::sync::atomic::AtomicBool;
17
18use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
19use catalog::CatalogManagerRef;
20use catalog::process_manager::ProcessManagerRef;
21use common_base::Plugins;
22use common_event_recorder::EventRecorderImpl;
23use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
24use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
25use common_meta::key::TableMetadataManager;
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::node_manager::NodeManagerRef;
29use common_meta::procedure_executor::ProcedureExecutorRef;
30use dashmap::DashMap;
31use operator::delete::Deleter;
32use operator::flow::FlowServiceOperator;
33use operator::insert::Inserter;
34use operator::procedure::ProcedureServiceOperator;
35use operator::request::Requester;
36use operator::statement::{
37 ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
38 StatementExecutorRef,
39};
40use operator::table::TableMutationOperator;
41use partition::manager::PartitionRuleManager;
42use pipeline::pipeline_operator::PipelineOperator;
43use query::QueryEngineFactory;
44use query::region_query::RegionQueryHandlerFactoryRef;
45use snafu::{OptionExt, ResultExt};
46
47use crate::error::{self, ExternalSnafu, Result};
48use crate::events::EventHandlerImpl;
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51use crate::instance::region_query::FrontendRegionQueryHandler;
52use crate::limiter::Limiter;
53
54pub struct FrontendBuilder {
56 options: FrontendOptions,
57 kv_backend: KvBackendRef,
58 layered_cache_registry: LayeredCacheRegistryRef,
59 local_cache_invalidator: Option<CacheInvalidatorRef>,
60 catalog_manager: CatalogManagerRef,
61 node_manager: NodeManagerRef,
62 plugins: Option<Plugins>,
63 procedure_executor: ProcedureExecutorRef,
64 process_manager: ProcessManagerRef,
65}
66
67impl FrontendBuilder {
68 #[allow(clippy::too_many_arguments)]
69 pub fn new(
70 options: FrontendOptions,
71 kv_backend: KvBackendRef,
72 layered_cache_registry: LayeredCacheRegistryRef,
73 catalog_manager: CatalogManagerRef,
74 node_manager: NodeManagerRef,
75 procedure_executor: ProcedureExecutorRef,
76 process_manager: ProcessManagerRef,
77 ) -> Self {
78 Self {
79 options,
80 kv_backend,
81 layered_cache_registry,
82 local_cache_invalidator: None,
83 catalog_manager,
84 node_manager,
85 plugins: None,
86 procedure_executor,
87 process_manager,
88 }
89 }
90
91 #[cfg(test)]
92 pub(crate) fn new_test(
93 options: &FrontendOptions,
94 meta_client: meta_client::MetaClientRef,
95 ) -> Self {
96 let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
97
98 let layered_cache_registry = Arc::new(
99 common_meta::cache::LayeredCacheRegistryBuilder::default()
100 .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone()))
101 .build(),
102 );
103
104 Self::new(
105 options.clone(),
106 kv_backend,
107 layered_cache_registry,
108 catalog::memory::MemoryCatalogManager::with_default_setup(),
109 Arc::new(client::client_manager::NodeClients::default()),
110 meta_client,
111 Arc::new(catalog::process_manager::ProcessManager::new(
112 "".to_string(),
113 None,
114 )),
115 )
116 }
117
118 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
119 Self {
120 local_cache_invalidator: Some(cache_invalidator),
121 ..self
122 }
123 }
124
125 pub fn with_plugin(self, plugins: Plugins) -> Self {
126 Self {
127 plugins: Some(plugins),
128 ..self
129 }
130 }
131
132 pub async fn try_build(self) -> Result<Instance> {
133 let kv_backend = self.kv_backend;
134 let node_manager = self.node_manager;
135 let plugins = self.plugins.unwrap_or_default();
136 let process_manager = self.process_manager;
137 let table_route_cache: TableRouteCacheRef =
138 self.layered_cache_registry
139 .get()
140 .context(error::CacheRequiredSnafu {
141 name: TABLE_ROUTE_CACHE_NAME,
142 })?;
143 let partition_manager = Arc::new(PartitionRuleManager::new(
144 kv_backend.clone(),
145 table_route_cache.clone(),
146 ));
147
148 let local_cache_invalidator = self
149 .local_cache_invalidator
150 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
151
152 let region_query_handler =
153 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
154 factory.build(partition_manager.clone(), node_manager.clone())
155 } else {
156 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
157 };
158
159 let table_flownode_cache =
160 self.layered_cache_registry
161 .get()
162 .context(error::CacheRequiredSnafu {
163 name: TABLE_FLOWNODE_SET_CACHE_NAME,
164 })?;
165
166 let inserter = Arc::new(Inserter::new(
167 self.catalog_manager.clone(),
168 partition_manager.clone(),
169 node_manager.clone(),
170 table_flownode_cache,
171 ));
172 let deleter = Arc::new(Deleter::new(
173 self.catalog_manager.clone(),
174 partition_manager.clone(),
175 node_manager.clone(),
176 ));
177 let requester = Arc::new(Requester::new(
178 self.catalog_manager.clone(),
179 partition_manager.clone(),
180 node_manager.clone(),
181 ));
182 let table_mutation_handler = Arc::new(TableMutationOperator::new(
183 inserter.clone(),
184 deleter.clone(),
185 requester,
186 ));
187
188 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
189 self.procedure_executor.clone(),
190 self.catalog_manager.clone(),
191 ));
192
193 let flow_metadata_manager: Arc<FlowMetadataManager> =
194 Arc::new(FlowMetadataManager::new(kv_backend.clone()));
195 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
196
197 let query_engine = QueryEngineFactory::new_with_plugins(
198 self.catalog_manager.clone(),
199 Some(partition_manager.clone()),
200 Some(region_query_handler.clone()),
201 Some(table_mutation_handler),
202 Some(procedure_service_handler),
203 Some(Arc::new(flow_service)),
204 true,
205 plugins.clone(),
206 self.options.query.clone(),
207 )
208 .query_engine();
209
210 let statement_executor = StatementExecutor::new(
211 self.catalog_manager.clone(),
212 query_engine.clone(),
213 self.procedure_executor,
214 kv_backend.clone(),
215 local_cache_invalidator,
216 inserter.clone(),
217 table_route_cache,
218 Some(process_manager.clone()),
219 );
220
221 let statement_executor =
222 if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
223 let ctx = ExecutorConfigureContext {
224 kv_backend: kv_backend.clone(),
225 };
226 configurator
227 .configure(statement_executor, ctx)
228 .await
229 .context(ExternalSnafu)?
230 } else {
231 statement_executor
232 };
233
234 let statement_executor = Arc::new(statement_executor);
235
236 let pipeline_operator = Arc::new(PipelineOperator::new(
237 inserter.clone(),
238 statement_executor.clone(),
239 self.catalog_manager.clone(),
240 query_engine.clone(),
241 ));
242
243 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
244
245 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
246 statement_executor.clone(),
247 self.options.slow_query.ttl,
248 self.options.event_recorder.ttl,
249 ))));
250
251 let limiter = self
253 .options
254 .max_in_flight_write_bytes
255 .map(|max_in_flight_write_bytes| {
256 Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
257 });
258
259 Ok(Instance {
260 catalog_manager: self.catalog_manager,
261 pipeline_operator,
262 statement_executor,
263 query_engine,
264 plugins,
265 inserter,
266 deleter,
267 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
268 event_recorder: Some(event_recorder),
269 limiter,
270 process_manager,
271 otlp_metrics_table_legacy_cache: DashMap::new(),
272 slow_query_options: self.options.slow_query.clone(),
273 suspend: Arc::new(AtomicBool::new(false)),
274 })
275 }
276}