1use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::process_manager::ProcessManagerRef;
19use catalog::CatalogManagerRef;
20use common_base::Plugins;
21use common_event_recorder::EventRecorderImpl;
22use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
23use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
24use common_meta::key::flow::FlowMetadataManager;
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::node_manager::NodeManagerRef;
28use common_meta::procedure_executor::ProcedureExecutorRef;
29use dashmap::DashMap;
30use operator::delete::Deleter;
31use operator::flow::FlowServiceOperator;
32use operator::insert::Inserter;
33use operator::procedure::ProcedureServiceOperator;
34use operator::request::Requester;
35use operator::statement::{StatementExecutor, StatementExecutorRef};
36use operator::table::TableMutationOperator;
37use partition::manager::PartitionRuleManager;
38use pipeline::pipeline_operator::PipelineOperator;
39use query::region_query::RegionQueryHandlerFactoryRef;
40use query::QueryEngineFactory;
41use snafu::OptionExt;
42
43use crate::error::{self, Result};
44use crate::events::EventHandlerImpl;
45use crate::frontend::FrontendOptions;
46use crate::instance::region_query::FrontendRegionQueryHandler;
47use crate::instance::Instance;
48use crate::limiter::Limiter;
49
50pub struct FrontendBuilder {
52 options: FrontendOptions,
53 kv_backend: KvBackendRef,
54 layered_cache_registry: LayeredCacheRegistryRef,
55 local_cache_invalidator: Option<CacheInvalidatorRef>,
56 catalog_manager: CatalogManagerRef,
57 node_manager: NodeManagerRef,
58 plugins: Option<Plugins>,
59 procedure_executor: ProcedureExecutorRef,
60 process_manager: ProcessManagerRef,
61}
62
63impl FrontendBuilder {
64 #[allow(clippy::too_many_arguments)]
65 pub fn new(
66 options: FrontendOptions,
67 kv_backend: KvBackendRef,
68 layered_cache_registry: LayeredCacheRegistryRef,
69 catalog_manager: CatalogManagerRef,
70 node_manager: NodeManagerRef,
71 procedure_executor: ProcedureExecutorRef,
72 process_manager: ProcessManagerRef,
73 ) -> Self {
74 Self {
75 options,
76 kv_backend,
77 layered_cache_registry,
78 local_cache_invalidator: None,
79 catalog_manager,
80 node_manager,
81 plugins: None,
82 procedure_executor,
83 process_manager,
84 }
85 }
86
87 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
88 Self {
89 local_cache_invalidator: Some(cache_invalidator),
90 ..self
91 }
92 }
93
94 pub fn with_plugin(self, plugins: Plugins) -> Self {
95 Self {
96 plugins: Some(plugins),
97 ..self
98 }
99 }
100
101 pub async fn try_build(self) -> Result<Instance> {
102 let kv_backend = self.kv_backend;
103 let node_manager = self.node_manager;
104 let plugins = self.plugins.unwrap_or_default();
105 let process_manager = self.process_manager;
106 let table_route_cache: TableRouteCacheRef =
107 self.layered_cache_registry
108 .get()
109 .context(error::CacheRequiredSnafu {
110 name: TABLE_ROUTE_CACHE_NAME,
111 })?;
112 let partition_manager = Arc::new(PartitionRuleManager::new(
113 kv_backend.clone(),
114 table_route_cache.clone(),
115 ));
116
117 let local_cache_invalidator = self
118 .local_cache_invalidator
119 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
120
121 let region_query_handler =
122 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
123 factory.build(partition_manager.clone(), node_manager.clone())
124 } else {
125 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
126 };
127
128 let table_flownode_cache =
129 self.layered_cache_registry
130 .get()
131 .context(error::CacheRequiredSnafu {
132 name: TABLE_FLOWNODE_SET_CACHE_NAME,
133 })?;
134
135 let inserter = Arc::new(Inserter::new(
136 self.catalog_manager.clone(),
137 partition_manager.clone(),
138 node_manager.clone(),
139 table_flownode_cache,
140 ));
141 let deleter = Arc::new(Deleter::new(
142 self.catalog_manager.clone(),
143 partition_manager.clone(),
144 node_manager.clone(),
145 ));
146 let requester = Arc::new(Requester::new(
147 self.catalog_manager.clone(),
148 partition_manager.clone(),
149 node_manager.clone(),
150 ));
151 let table_mutation_handler = Arc::new(TableMutationOperator::new(
152 inserter.clone(),
153 deleter.clone(),
154 requester,
155 ));
156
157 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
158 self.procedure_executor.clone(),
159 self.catalog_manager.clone(),
160 ));
161
162 let flow_metadata_manager: Arc<FlowMetadataManager> =
163 Arc::new(FlowMetadataManager::new(kv_backend.clone()));
164 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
165
166 let query_engine = QueryEngineFactory::new_with_plugins(
167 self.catalog_manager.clone(),
168 Some(partition_manager.clone()),
169 Some(region_query_handler.clone()),
170 Some(table_mutation_handler),
171 Some(procedure_service_handler),
172 Some(Arc::new(flow_service)),
173 true,
174 plugins.clone(),
175 self.options.query.clone(),
176 )
177 .query_engine();
178
179 let statement_executor = Arc::new(StatementExecutor::new(
180 self.catalog_manager.clone(),
181 query_engine.clone(),
182 self.procedure_executor,
183 kv_backend.clone(),
184 local_cache_invalidator,
185 inserter.clone(),
186 table_route_cache,
187 Some(process_manager.clone()),
188 ));
189
190 let pipeline_operator = Arc::new(PipelineOperator::new(
191 inserter.clone(),
192 statement_executor.clone(),
193 self.catalog_manager.clone(),
194 query_engine.clone(),
195 ));
196
197 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
198
199 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
200 statement_executor.clone(),
201 self.options.slow_query.ttl,
202 self.options.event_recorder.ttl,
203 ))));
204
205 let limiter = self
207 .options
208 .max_in_flight_write_bytes
209 .map(|max_in_flight_write_bytes| {
210 Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
211 });
212
213 Ok(Instance {
214 catalog_manager: self.catalog_manager,
215 pipeline_operator,
216 statement_executor,
217 query_engine,
218 plugins,
219 inserter,
220 deleter,
221 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
222 event_recorder: Some(event_recorder),
223 limiter,
224 process_manager,
225 otlp_metrics_table_legacy_cache: DashMap::new(),
226 slow_query_options: self.options.slow_query.clone(),
227 })
228 }
229}