1use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::CatalogManagerRef;
19use common_base::Plugins;
20use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
21use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
22use common_meta::ddl::ProcedureExecutorRef;
23use common_meta::key::flow::FlowMetadataManager;
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::KvBackendRef;
26use common_meta::node_manager::NodeManagerRef;
27use operator::delete::Deleter;
28use operator::flow::FlowServiceOperator;
29use operator::insert::Inserter;
30use operator::procedure::ProcedureServiceOperator;
31use operator::request::Requester;
32use operator::statement::{StatementExecutor, StatementExecutorRef};
33use operator::table::TableMutationOperator;
34use partition::manager::PartitionRuleManager;
35use pipeline::pipeline_operator::PipelineOperator;
36use query::region_query::RegionQueryHandlerFactoryRef;
37use query::QueryEngineFactory;
38use snafu::OptionExt;
39
40use crate::error::{self, Result};
41use crate::frontend::FrontendOptions;
42use crate::instance::region_query::FrontendRegionQueryHandler;
43use crate::instance::Instance;
44use crate::limiter::Limiter;
45use crate::slow_query_recorder::SlowQueryRecorder;
46
47pub struct FrontendBuilder {
49 options: FrontendOptions,
50 kv_backend: KvBackendRef,
51 layered_cache_registry: LayeredCacheRegistryRef,
52 local_cache_invalidator: Option<CacheInvalidatorRef>,
53 catalog_manager: CatalogManagerRef,
54 node_manager: NodeManagerRef,
55 plugins: Option<Plugins>,
56 procedure_executor: ProcedureExecutorRef,
57}
58
59impl FrontendBuilder {
60 pub fn new(
61 options: FrontendOptions,
62 kv_backend: KvBackendRef,
63 layered_cache_registry: LayeredCacheRegistryRef,
64 catalog_manager: CatalogManagerRef,
65 node_manager: NodeManagerRef,
66 procedure_executor: ProcedureExecutorRef,
67 ) -> Self {
68 Self {
69 options,
70 kv_backend,
71 layered_cache_registry,
72 local_cache_invalidator: None,
73 catalog_manager,
74 node_manager,
75 plugins: None,
76 procedure_executor,
77 }
78 }
79
80 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
81 Self {
82 local_cache_invalidator: Some(cache_invalidator),
83 ..self
84 }
85 }
86
87 pub fn with_plugin(self, plugins: Plugins) -> Self {
88 Self {
89 plugins: Some(plugins),
90 ..self
91 }
92 }
93
94 pub async fn try_build(self) -> Result<Instance> {
95 let kv_backend = self.kv_backend;
96 let node_manager = self.node_manager;
97 let plugins = self.plugins.unwrap_or_default();
98
99 let table_route_cache: TableRouteCacheRef =
100 self.layered_cache_registry
101 .get()
102 .context(error::CacheRequiredSnafu {
103 name: TABLE_ROUTE_CACHE_NAME,
104 })?;
105 let partition_manager = Arc::new(PartitionRuleManager::new(
106 kv_backend.clone(),
107 table_route_cache.clone(),
108 ));
109
110 let local_cache_invalidator = self
111 .local_cache_invalidator
112 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
113
114 let region_query_handler =
115 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
116 factory.build(partition_manager.clone(), node_manager.clone())
117 } else {
118 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
119 };
120
121 let table_flownode_cache =
122 self.layered_cache_registry
123 .get()
124 .context(error::CacheRequiredSnafu {
125 name: TABLE_FLOWNODE_SET_CACHE_NAME,
126 })?;
127
128 let inserter = Arc::new(Inserter::new(
129 self.catalog_manager.clone(),
130 partition_manager.clone(),
131 node_manager.clone(),
132 table_flownode_cache,
133 ));
134 let deleter = Arc::new(Deleter::new(
135 self.catalog_manager.clone(),
136 partition_manager.clone(),
137 node_manager.clone(),
138 ));
139 let requester = Arc::new(Requester::new(
140 self.catalog_manager.clone(),
141 partition_manager,
142 node_manager.clone(),
143 ));
144 let table_mutation_handler = Arc::new(TableMutationOperator::new(
145 inserter.clone(),
146 deleter.clone(),
147 requester,
148 ));
149
150 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
151 self.procedure_executor.clone(),
152 self.catalog_manager.clone(),
153 ));
154
155 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
156 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
157
158 let query_engine = QueryEngineFactory::new_with_plugins(
159 self.catalog_manager.clone(),
160 Some(region_query_handler.clone()),
161 Some(table_mutation_handler),
162 Some(procedure_service_handler),
163 Some(Arc::new(flow_service)),
164 true,
165 plugins.clone(),
166 self.options.query.clone(),
167 )
168 .query_engine();
169
170 let statement_executor = Arc::new(StatementExecutor::new(
171 self.catalog_manager.clone(),
172 query_engine.clone(),
173 self.procedure_executor,
174 kv_backend.clone(),
175 local_cache_invalidator,
176 inserter.clone(),
177 table_route_cache,
178 ));
179
180 let pipeline_operator = Arc::new(PipelineOperator::new(
181 inserter.clone(),
182 statement_executor.clone(),
183 self.catalog_manager.clone(),
184 query_engine.clone(),
185 ));
186
187 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
188
189 let slow_query_recorder = self.options.slow_query.and_then(|opts| {
190 opts.enable.then(|| {
191 SlowQueryRecorder::new(
192 opts.clone(),
193 inserter.clone(),
194 statement_executor.clone(),
195 self.catalog_manager.clone(),
196 )
197 })
198 });
199
200 let limiter = self
202 .options
203 .max_in_flight_write_bytes
204 .map(|max_in_flight_write_bytes| {
205 Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
206 });
207
208 Ok(Instance {
209 catalog_manager: self.catalog_manager,
210 pipeline_operator,
211 statement_executor,
212 query_engine,
213 plugins,
214 inserter,
215 deleter,
216 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
217 slow_query_recorder,
218 limiter,
219 })
220 }
221}