frontend/instance/
builder.rs1use std::sync::Arc;
16
17use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
18use catalog::CatalogManagerRef;
19use common_base::Plugins;
20use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
21use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
22use common_meta::ddl::ProcedureExecutorRef;
23use common_meta::key::flow::FlowMetadataManager;
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::KvBackendRef;
26use common_meta::node_manager::NodeManagerRef;
27use operator::delete::Deleter;
28use operator::flow::FlowServiceOperator;
29use operator::insert::Inserter;
30use operator::procedure::ProcedureServiceOperator;
31use operator::request::Requester;
32use operator::statement::{StatementExecutor, StatementExecutorRef};
33use operator::table::TableMutationOperator;
34use partition::manager::PartitionRuleManager;
35use pipeline::pipeline_operator::PipelineOperator;
36use query::region_query::RegionQueryHandlerFactoryRef;
37use query::stats::StatementStatistics;
38use query::QueryEngineFactory;
39use snafu::OptionExt;
40
41use crate::error::{self, Result};
42use crate::frontend::FrontendOptions;
43use crate::instance::region_query::FrontendRegionQueryHandler;
44use crate::instance::Instance;
45use crate::limiter::Limiter;
46
47pub struct FrontendBuilder {
49 options: FrontendOptions,
50 kv_backend: KvBackendRef,
51 layered_cache_registry: LayeredCacheRegistryRef,
52 local_cache_invalidator: Option<CacheInvalidatorRef>,
53 catalog_manager: CatalogManagerRef,
54 node_manager: NodeManagerRef,
55 plugins: Option<Plugins>,
56 procedure_executor: ProcedureExecutorRef,
57 stats: StatementStatistics,
58}
59
60impl FrontendBuilder {
61 pub fn new(
62 options: FrontendOptions,
63 kv_backend: KvBackendRef,
64 layered_cache_registry: LayeredCacheRegistryRef,
65 catalog_manager: CatalogManagerRef,
66 node_manager: NodeManagerRef,
67 procedure_executor: ProcedureExecutorRef,
68 stats: StatementStatistics,
69 ) -> Self {
70 Self {
71 options,
72 kv_backend,
73 layered_cache_registry,
74 local_cache_invalidator: None,
75 catalog_manager,
76 node_manager,
77 plugins: None,
78 procedure_executor,
79 stats,
80 }
81 }
82
83 pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
84 Self {
85 local_cache_invalidator: Some(cache_invalidator),
86 ..self
87 }
88 }
89
90 pub fn with_plugin(self, plugins: Plugins) -> Self {
91 Self {
92 plugins: Some(plugins),
93 ..self
94 }
95 }
96
97 pub async fn try_build(self) -> Result<Instance> {
98 let kv_backend = self.kv_backend;
99 let node_manager = self.node_manager;
100 let plugins = self.plugins.unwrap_or_default();
101
102 let table_route_cache: TableRouteCacheRef =
103 self.layered_cache_registry
104 .get()
105 .context(error::CacheRequiredSnafu {
106 name: TABLE_ROUTE_CACHE_NAME,
107 })?;
108 let partition_manager = Arc::new(PartitionRuleManager::new(
109 kv_backend.clone(),
110 table_route_cache.clone(),
111 ));
112
113 let local_cache_invalidator = self
114 .local_cache_invalidator
115 .unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
116
117 let region_query_handler =
118 if let Some(factory) = plugins.get::<RegionQueryHandlerFactoryRef>() {
119 factory.build(partition_manager.clone(), node_manager.clone())
120 } else {
121 FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone())
122 };
123
124 let table_flownode_cache =
125 self.layered_cache_registry
126 .get()
127 .context(error::CacheRequiredSnafu {
128 name: TABLE_FLOWNODE_SET_CACHE_NAME,
129 })?;
130
131 let inserter = Arc::new(Inserter::new(
132 self.catalog_manager.clone(),
133 partition_manager.clone(),
134 node_manager.clone(),
135 table_flownode_cache,
136 ));
137 let deleter = Arc::new(Deleter::new(
138 self.catalog_manager.clone(),
139 partition_manager.clone(),
140 node_manager.clone(),
141 ));
142 let requester = Arc::new(Requester::new(
143 self.catalog_manager.clone(),
144 partition_manager,
145 node_manager.clone(),
146 ));
147 let table_mutation_handler = Arc::new(TableMutationOperator::new(
148 inserter.clone(),
149 deleter.clone(),
150 requester,
151 ));
152
153 let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
154 self.procedure_executor.clone(),
155 self.catalog_manager.clone(),
156 ));
157
158 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
159 let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
160
161 let query_engine = QueryEngineFactory::new_with_plugins(
162 self.catalog_manager.clone(),
163 Some(region_query_handler.clone()),
164 Some(table_mutation_handler),
165 Some(procedure_service_handler),
166 Some(Arc::new(flow_service)),
167 true,
168 plugins.clone(),
169 self.options.query.clone(),
170 )
171 .query_engine();
172
173 let statement_executor = Arc::new(StatementExecutor::new(
174 self.catalog_manager.clone(),
175 query_engine.clone(),
176 self.procedure_executor,
177 kv_backend.clone(),
178 local_cache_invalidator,
179 inserter.clone(),
180 table_route_cache,
181 ));
182
183 let pipeline_operator = Arc::new(PipelineOperator::new(
184 inserter.clone(),
185 statement_executor.clone(),
186 self.catalog_manager.clone(),
187 query_engine.clone(),
188 ));
189
190 plugins.insert::<StatementExecutorRef>(statement_executor.clone());
191
192 let limiter = self
194 .options
195 .max_in_flight_write_bytes
196 .map(|max_in_flight_write_bytes| {
197 Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
198 });
199
200 Ok(Instance {
201 catalog_manager: self.catalog_manager,
202 pipeline_operator,
203 statement_executor,
204 query_engine,
205 plugins,
206 inserter,
207 deleter,
208 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
209 stats: self.stats,
210 limiter,
211 })
212 }
213}