1use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function_factory::ScalarFunctionFactory;
23use common_function::handlers::{
24 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::state::FunctionState;
27use common_telemetry::warn;
28use datafusion::dataframe::DataFrame;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
31use datafusion::execution::runtime_env::RuntimeEnv;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
34use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
35use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
36use datafusion::physical_optimizer::PhysicalOptimizerRule;
37use datafusion::physical_plan::ExecutionPlan;
38use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
39use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
40use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
41use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
42use datafusion_optimizer::optimizer::Optimizer;
43use promql::extension_plan::PromExtensionPlanner;
44use table::table::adapter::DfTableProviderAdapter;
45use table::TableRef;
46
47use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer, MergeSortExtensionPlanner};
48use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
49use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
50use crate::optimizer::parallelize_scan::ParallelizeScan;
51use crate::optimizer::pass_distribution::PassDistribution;
52use crate::optimizer::remove_duplicate::RemoveDuplicate;
53use crate::optimizer::scan_hint::ScanHintRule;
54use crate::optimizer::string_normalization::StringNormalizationRule;
55use crate::optimizer::transcribe_atat::TranscribeAtatRule;
56use crate::optimizer::type_conversion::TypeConversionRule;
57use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
58use crate::optimizer::ExtensionAnalyzerRule;
59use crate::options::QueryOptions as QueryOptionsNew;
60use crate::query_engine::options::QueryOptions;
61use crate::query_engine::DefaultSerializer;
62use crate::range_select::planner::RangeSelectPlanner;
63use crate::region_query::RegionQueryHandlerRef;
64use crate::QueryEngineContext;
65
66#[derive(Clone)]
68pub struct QueryEngineState {
69 df_context: SessionContext,
70 catalog_manager: CatalogManagerRef,
71 function_state: Arc<FunctionState>,
72 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
73 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
74 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
75 plugins: Plugins,
76}
77
78impl fmt::Debug for QueryEngineState {
79 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80 f.debug_struct("QueryEngineState")
81 .field("state", &self.df_context.state())
82 .finish()
83 }
84}
85
86impl QueryEngineState {
87 #[allow(clippy::too_many_arguments)]
88 pub fn new(
89 catalog_list: CatalogManagerRef,
90 region_query_handler: Option<RegionQueryHandlerRef>,
91 table_mutation_handler: Option<TableMutationHandlerRef>,
92 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
93 flow_service_handler: Option<FlowServiceHandlerRef>,
94 with_dist_planner: bool,
95 plugins: Plugins,
96 options: QueryOptionsNew,
97 ) -> Self {
98 let runtime_env = Arc::new(RuntimeEnv::default());
99 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
100 if options.parallelism > 0 {
101 session_config = session_config.with_target_partitions(options.parallelism);
102 }
103
104 session_config
107 .options_mut()
108 .execution
109 .skip_physical_aggregate_schema_check = true;
110
111 let mut extension_rules = Vec::new();
113
114 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
116
117 let mut analyzer = Analyzer::new();
119 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
120 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
121
122 Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name());
124 analyzer
125 .rules
126 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
127
128 if with_dist_planner {
129 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
130 }
131
132 let mut optimizer = Optimizer::new();
133 optimizer.rules.push(Arc::new(ScanHintRule));
134
135 let mut physical_optimizer = PhysicalOptimizer::new();
137 physical_optimizer
139 .rules
140 .insert(0, Arc::new(ParallelizeScan));
141 physical_optimizer
143 .rules
144 .insert(1, Arc::new(PassDistribution));
145 physical_optimizer
146 .rules
147 .insert(2, Arc::new(EnforceSorting {}));
148 physical_optimizer
150 .rules
151 .push(Arc::new(WindowedSortPhysicalRule));
152 physical_optimizer
153 .rules
154 .push(Arc::new(MatchesConstantTermOptimizer));
155 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
157 Self::remove_physical_optimizer_rule(
159 &mut physical_optimizer.rules,
160 SanityCheckPlan {}.name(),
161 );
162 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
163
164 let session_state = SessionStateBuilder::new()
165 .with_config(session_config)
166 .with_runtime_env(runtime_env)
167 .with_default_features()
168 .with_analyzer_rules(analyzer.rules)
169 .with_serializer_registry(Arc::new(DefaultSerializer))
170 .with_query_planner(Arc::new(DfQueryPlanner::new(
171 catalog_list.clone(),
172 region_query_handler,
173 )))
174 .with_optimizer_rules(optimizer.rules)
175 .with_physical_optimizer_rules(physical_optimizer.rules)
176 .build();
177
178 let df_context = SessionContext::new_with_state(session_state);
179
180 Self {
181 df_context,
182 catalog_manager: catalog_list,
183 function_state: Arc::new(FunctionState {
184 table_mutation_handler,
185 procedure_service_handler,
186 flow_service_handler,
187 }),
188 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
189 extension_rules,
190 plugins,
191 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
192 }
193 }
194
195 fn remove_analyzer_rule(rules: &mut Vec<Arc<dyn AnalyzerRule + Send + Sync>>, name: &str) {
196 rules.retain(|rule| rule.name() != name);
197 }
198
199 fn remove_physical_optimizer_rule(
200 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
201 name: &str,
202 ) {
203 rules.retain(|rule| rule.name() != name);
204 }
205
206 pub fn optimize_by_extension_rules(
208 &self,
209 plan: DfLogicalPlan,
210 context: &QueryEngineContext,
211 ) -> DfResult<DfLogicalPlan> {
212 self.extension_rules
213 .iter()
214 .try_fold(plan, |acc_plan, rule| {
215 rule.analyze(acc_plan, context, self.session_state().config_options())
216 })
217 }
218
219 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
221 self.session_state().optimize(&plan)
222 }
223
224 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
226 self.scalar_functions
227 .read()
228 .unwrap()
229 .get(function_name)
230 .cloned()
231 }
232
233 pub fn scalar_names(&self) -> Vec<String> {
235 self.scalar_functions
236 .read()
237 .unwrap()
238 .keys()
239 .cloned()
240 .collect()
241 }
242
243 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
245 self.aggr_functions
246 .read()
247 .unwrap()
248 .get(function_name)
249 .cloned()
250 }
251
252 pub fn aggr_names(&self) -> Vec<String> {
254 self.aggr_functions
255 .read()
256 .unwrap()
257 .keys()
258 .cloned()
259 .collect()
260 }
261
262 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
265 let name = func.name().to_string();
266 let x = self
267 .scalar_functions
268 .write()
269 .unwrap()
270 .insert(name.clone(), func);
271
272 if x.is_some() {
273 warn!("Already registered scalar function '{name}'");
274 }
275 }
276
277 pub fn register_aggr_function(&self, func: AggregateUDF) {
286 let name = func.name().to_string();
287 let x = self
288 .aggr_functions
289 .write()
290 .unwrap()
291 .insert(name.clone(), func);
292 assert!(
293 x.is_none(),
294 "Already registered aggregate function '{name}'"
295 );
296 }
297
298 pub fn catalog_manager(&self) -> &CatalogManagerRef {
299 &self.catalog_manager
300 }
301
302 pub fn function_state(&self) -> Arc<FunctionState> {
303 self.function_state.clone()
304 }
305
306 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
308 self.function_state.table_mutation_handler.as_ref()
309 }
310
311 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
313 self.function_state.procedure_service_handler.as_ref()
314 }
315
316 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
317 self.plugins
318 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
319 .unwrap_or(false)
320 }
321
322 pub fn session_state(&self) -> SessionState {
323 self.df_context.state()
324 }
325
326 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
328 self.df_context
329 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
330 }
331}
332
333struct DfQueryPlanner {
334 physical_planner: DefaultPhysicalPlanner,
335}
336
337impl fmt::Debug for DfQueryPlanner {
338 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
339 f.debug_struct("DfQueryPlanner").finish()
340 }
341}
342
343#[async_trait]
344impl QueryPlanner for DfQueryPlanner {
345 async fn create_physical_plan(
346 &self,
347 logical_plan: &DfLogicalPlan,
348 session_state: &SessionState,
349 ) -> DfResult<Arc<dyn ExecutionPlan>> {
350 self.physical_planner
351 .create_physical_plan(logical_plan, session_state)
352 .await
353 }
354}
355
356impl DfQueryPlanner {
357 fn new(
358 catalog_manager: CatalogManagerRef,
359 region_query_handler: Option<RegionQueryHandlerRef>,
360 ) -> Self {
361 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
362 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
363 if let Some(region_query_handler) = region_query_handler {
364 planners.push(Arc::new(DistExtensionPlanner::new(
365 catalog_manager,
366 region_query_handler,
367 )));
368 planners.push(Arc::new(MergeSortExtensionPlanner {}));
369 }
370 Self {
371 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
372 }
373 }
374}