1use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::process_manager::ProcessManagerRef;
19use catalog::CatalogManagerRef;
20use common_base::Plugins;
21use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
22use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
23use common_meta::ddl::ProcedureExecutorRef;
24use common_meta::key::flow::FlowMetadataManager;
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::node_manager::NodeManagerRef;
28use operator::delete::Deleter;
29use operator::flow::FlowServiceOperator;
30use operator::insert::Inserter;
31use operator::procedure::ProcedureServiceOperator;
32use operator::request::Requester;
33use operator::statement::{StatementExecutor, StatementExecutorRef};
34use operator::table::TableMutationOperator;
35use partition::manager::PartitionRuleManager;
36use pipeline::pipeline_operator::PipelineOperator;
37use query::region_query::RegionQueryHandlerFactoryRef;
38use query::QueryEngineFactory;
39use snafu::OptionExt;
40
41use crate::error::{self, Result};
42use crate::frontend::FrontendOptions;
43use crate::instance::region_query::FrontendRegionQueryHandler;
44use crate::instance::Instance;
45use crate::limiter::Limiter;
46use crate::slow_query_recorder::SlowQueryRecorder;
47
48pub struct FrontendBuilder {
50 options: FrontendOptions,
51 kv_backend: KvBackendRef,
52 layered_cache_registry: LayeredCacheRegistryRef,
53 local_cache_invalidator: Option<CacheInvalidatorRef>,
54 catalog_manager: CatalogManagerRef,
55 node_manager: NodeManagerRef,
56 plugins: Option<Plugins>,
57 procedure_executor: ProcedureExecutorRef,
58 process_manager: ProcessManagerRef,
59}
60
61impl FrontendBuilder {
62 #[allow(clippy::too_many_arguments)]
63 pub fn new(
64 options: FrontendOptions,
65 kv_backend: KvBackendRef,
66 layered_cache_registry: LayeredCacheRegistryRef,
67 catalog_manager: CatalogManagerRef,
68 node_manager: NodeManagerRef,
69 procedure_executor: ProcedureExecutorRef,
70 process_manager: ProcessManagerRef,
71 ) -> Self {
72 Self {
73 options,
74 kv_backend,
75 layered_cache_registry,
76 local_cache_invalidator: None,
77 catalog_manager,
78 node_manager,
79 plugins: None,
80 procedure_executor,
81 process_manager,
82 }
83 }
84
85 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
86 Self {
87 local_cache_invalidator: Some(cache_invalidator),
88 ..self
89 }
90 }
91
92 pub fn with_plugin(self, plugins: Plugins) -> Self {
93 Self {
94 plugins: Some(plugins),
95 ..self
96 }
97 }
98
99 pub async fn try_build(self) -> Result<Instance> {
100 let kv_backend = self.kv_backend;
101 let node_manager = self.node_manager;
102 let plugins = self.plugins.unwrap_or_default();
103 let process_manager = self.process_manager;
104 let table_route_cache: TableRouteCacheRef =
105 self.layered_cache_registry
106 .get()
107 .context(error::CacheRequiredSnafu {
108 name: TABLE_ROUTE_CACHE_NAME,
109 })?;
110 let partition_manager = Arc::new(PartitionRuleManager::new(
111 kv_backend.clone(),
112 table_route_cache.clone(),
113 ));
114
115 let local_cache_invalidator = self
116 .local_cache_invalidator
117 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
118
119 let region_query_handler =
120 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
121 factory.build(partition_manager.clone(), node_manager.clone())
122 } else {
123 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
124 };
125
126 let table_flownode_cache =
127 self.layered_cache_registry
128 .get()
129 .context(error::CacheRequiredSnafu {
130 name: TABLE_FLOWNODE_SET_CACHE_NAME,
131 })?;
132
133 let inserter = Arc::new(Inserter::new(
134 self.catalog_manager.clone(),
135 partition_manager.clone(),
136 node_manager.clone(),
137 table_flownode_cache,
138 ));
139 let deleter = Arc::new(Deleter::new(
140 self.catalog_manager.clone(),
141 partition_manager.clone(),
142 node_manager.clone(),
143 ));
144 let requester = Arc::new(Requester::new(
145 self.catalog_manager.clone(),
146 partition_manager,
147 node_manager.clone(),
148 ));
149 let table_mutation_handler = Arc::new(TableMutationOperator::new(
150 inserter.clone(),
151 deleter.clone(),
152 requester,
153 ));
154
155 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
156 self.procedure_executor.clone(),
157 self.catalog_manager.clone(),
158 ));
159
160 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
161 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
162
163 let query_engine = QueryEngineFactory::new_with_plugins(
164 self.catalog_manager.clone(),
165 Some(region_query_handler.clone()),
166 Some(table_mutation_handler),
167 Some(procedure_service_handler),
168 Some(Arc::new(flow_service)),
169 true,
170 plugins.clone(),
171 self.options.query.clone(),
172 )
173 .query_engine();
174
175 let statement_executor = Arc::new(StatementExecutor::new(
176 self.catalog_manager.clone(),
177 query_engine.clone(),
178 self.procedure_executor,
179 kv_backend.clone(),
180 local_cache_invalidator,
181 inserter.clone(),
182 table_route_cache,
183 Some(process_manager.clone()),
184 ));
185
186 let pipeline_operator = Arc::new(PipelineOperator::new(
187 inserter.clone(),
188 statement_executor.clone(),
189 self.catalog_manager.clone(),
190 query_engine.clone(),
191 ));
192
193 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
194
195 let slow_query_recorder = self.options.slow_query.and_then(|opts| {
196 opts.enable.then(|| {
197 SlowQueryRecorder::new(
198 opts.clone(),
199 inserter.clone(),
200 statement_executor.clone(),
201 self.catalog_manager.clone(),
202 )
203 })
204 });
205
206 let limiter = self
208 .options
209 .max_in_flight_write_bytes
210 .map(|max_in_flight_write_bytes| {
211 Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
212 });
213
214 Ok(Instance {
215 catalog_manager: self.catalog_manager,
216 pipeline_operator,
217 statement_executor,
218 query_engine,
219 plugins,
220 inserter,
221 deleter,
222 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
223 slow_query_recorder,
224 limiter,
225 process_manager,
226 })
227 }
228}