1use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function::FunctionRef;
23use common_function::handlers::{
24 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::scalars::aggregate::AggregateFunctionMetaRef;
27use common_function::state::FunctionState;
28use common_telemetry::warn;
29use datafusion::dataframe::DataFrame;
30use datafusion::error::Result as DfResult;
31use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
32use datafusion::execution::runtime_env::RuntimeEnv;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
35use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
36use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
37use datafusion::physical_optimizer::PhysicalOptimizerRule;
38use datafusion::physical_plan::ExecutionPlan;
39use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
40use datafusion_expr::LogicalPlan as DfLogicalPlan;
41use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
42use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
43use datafusion_optimizer::optimizer::Optimizer;
44use promql::extension_plan::PromExtensionPlanner;
45use table::table::adapter::DfTableProviderAdapter;
46use table::TableRef;
47
48use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer, MergeSortExtensionPlanner};
49use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
50use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
51use crate::optimizer::parallelize_scan::ParallelizeScan;
52use crate::optimizer::pass_distribution::PassDistribution;
53use crate::optimizer::remove_duplicate::RemoveDuplicate;
54use crate::optimizer::scan_hint::ScanHintRule;
55use crate::optimizer::string_normalization::StringNormalizationRule;
56use crate::optimizer::transcribe_atat::TranscribeAtatRule;
57use crate::optimizer::type_conversion::TypeConversionRule;
58use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::options::QueryOptions as QueryOptionsNew;
61use crate::query_engine::options::QueryOptions;
62use crate::query_engine::DefaultSerializer;
63use crate::range_select::planner::RangeSelectPlanner;
64use crate::region_query::RegionQueryHandlerRef;
65use crate::QueryEngineContext;
66
67#[derive(Clone)]
69pub struct QueryEngineState {
70 df_context: SessionContext,
71 catalog_manager: CatalogManagerRef,
72 function_state: Arc<FunctionState>,
73 udf_functions: Arc<RwLock<HashMap<String, FunctionRef>>>,
74 aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
75 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
76 plugins: Plugins,
77}
78
79impl fmt::Debug for QueryEngineState {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 f.debug_struct("QueryEngineState")
82 .field("state", &self.df_context.state())
83 .finish()
84 }
85}
86
87impl QueryEngineState {
88 #[allow(clippy::too_many_arguments)]
89 pub fn new(
90 catalog_list: CatalogManagerRef,
91 region_query_handler: Option<RegionQueryHandlerRef>,
92 table_mutation_handler: Option<TableMutationHandlerRef>,
93 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
94 flow_service_handler: Option<FlowServiceHandlerRef>,
95 with_dist_planner: bool,
96 plugins: Plugins,
97 options: QueryOptionsNew,
98 ) -> Self {
99 let runtime_env = Arc::new(RuntimeEnv::default());
100 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
101 if options.parallelism > 0 {
102 session_config = session_config.with_target_partitions(options.parallelism);
103 }
104
105 session_config
108 .options_mut()
109 .execution
110 .skip_physical_aggregate_schema_check = true;
111
112 let mut extension_rules = Vec::new();
114
115 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
117
118 let mut analyzer = Analyzer::new();
120 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
121 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
122
123 Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name());
125 analyzer
126 .rules
127 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
128
129 if with_dist_planner {
130 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
131 }
132
133 let mut optimizer = Optimizer::new();
134 optimizer.rules.push(Arc::new(ScanHintRule));
135
136 let mut physical_optimizer = PhysicalOptimizer::new();
138 physical_optimizer
140 .rules
141 .insert(0, Arc::new(ParallelizeScan));
142 physical_optimizer
144 .rules
145 .insert(1, Arc::new(PassDistribution));
146 physical_optimizer
147 .rules
148 .insert(2, Arc::new(EnforceSorting {}));
149 physical_optimizer
151 .rules
152 .push(Arc::new(WindowedSortPhysicalRule));
153 physical_optimizer
154 .rules
155 .push(Arc::new(MatchesConstantTermOptimizer));
156 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
158 Self::remove_physical_optimizer_rule(
160 &mut physical_optimizer.rules,
161 SanityCheckPlan {}.name(),
162 );
163 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
164
165 let session_state = SessionStateBuilder::new()
166 .with_config(session_config)
167 .with_runtime_env(runtime_env)
168 .with_default_features()
169 .with_analyzer_rules(analyzer.rules)
170 .with_serializer_registry(Arc::new(DefaultSerializer))
171 .with_query_planner(Arc::new(DfQueryPlanner::new(
172 catalog_list.clone(),
173 region_query_handler,
174 )))
175 .with_optimizer_rules(optimizer.rules)
176 .with_physical_optimizer_rules(physical_optimizer.rules)
177 .build();
178
179 let df_context = SessionContext::new_with_state(session_state);
180
181 Self {
182 df_context,
183 catalog_manager: catalog_list,
184 function_state: Arc::new(FunctionState {
185 table_mutation_handler,
186 procedure_service_handler,
187 flow_service_handler,
188 }),
189 aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
190 extension_rules,
191 plugins,
192 udf_functions: Arc::new(RwLock::new(HashMap::new())),
193 }
194 }
195
196 fn remove_analyzer_rule(rules: &mut Vec<Arc<dyn AnalyzerRule + Send + Sync>>, name: &str) {
197 rules.retain(|rule| rule.name() != name);
198 }
199
200 fn remove_physical_optimizer_rule(
201 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
202 name: &str,
203 ) {
204 rules.retain(|rule| rule.name() != name);
205 }
206
207 pub fn optimize_by_extension_rules(
209 &self,
210 plan: DfLogicalPlan,
211 context: &QueryEngineContext,
212 ) -> DfResult<DfLogicalPlan> {
213 self.extension_rules
214 .iter()
215 .try_fold(plan, |acc_plan, rule| {
216 rule.analyze(acc_plan, context, self.session_state().config_options())
217 })
218 }
219
220 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
222 self.session_state().optimize(&plan)
223 }
224
225 pub fn register_function(&self, func: FunctionRef) {
228 let name = func.name().to_string();
229 let x = self
230 .udf_functions
231 .write()
232 .unwrap()
233 .insert(name.clone(), func);
234
235 if x.is_some() {
236 warn!("Already registered udf function '{name}'");
237 }
238 }
239
240 pub fn udf_function(&self, function_name: &str) -> Option<FunctionRef> {
242 self.udf_functions
243 .read()
244 .unwrap()
245 .get(function_name)
246 .cloned()
247 }
248
249 pub fn udf_names(&self) -> Vec<String> {
251 self.udf_functions.read().unwrap().keys().cloned().collect()
252 }
253
254 pub fn aggregate_function(&self, function_name: &str) -> Option<AggregateFunctionMetaRef> {
256 self.aggregate_functions
257 .read()
258 .unwrap()
259 .get(function_name)
260 .cloned()
261 }
262
263 pub fn udaf_names(&self) -> Vec<String> {
265 self.aggregate_functions
266 .read()
267 .unwrap()
268 .keys()
269 .cloned()
270 .collect()
271 }
272
273 pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
282 let name = func.name();
283 let x = self
284 .aggregate_functions
285 .write()
286 .unwrap()
287 .insert(name.clone(), func);
288 assert!(
289 x.is_none(),
290 "Already registered aggregate function '{name}'"
291 );
292 }
293
294 pub fn catalog_manager(&self) -> &CatalogManagerRef {
295 &self.catalog_manager
296 }
297
298 pub fn function_state(&self) -> Arc<FunctionState> {
299 self.function_state.clone()
300 }
301
302 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
304 self.function_state.table_mutation_handler.as_ref()
305 }
306
307 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
309 self.function_state.procedure_service_handler.as_ref()
310 }
311
312 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
313 self.plugins
314 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
315 .unwrap_or(false)
316 }
317
318 pub fn session_state(&self) -> SessionState {
319 self.df_context.state()
320 }
321
322 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
324 self.df_context
325 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
326 }
327}
328
329struct DfQueryPlanner {
330 physical_planner: DefaultPhysicalPlanner,
331}
332
333impl fmt::Debug for DfQueryPlanner {
334 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
335 f.debug_struct("DfQueryPlanner").finish()
336 }
337}
338
339#[async_trait]
340impl QueryPlanner for DfQueryPlanner {
341 async fn create_physical_plan(
342 &self,
343 logical_plan: &DfLogicalPlan,
344 session_state: &SessionState,
345 ) -> DfResult<Arc<dyn ExecutionPlan>> {
346 self.physical_planner
347 .create_physical_plan(logical_plan, session_state)
348 .await
349 }
350}
351
352impl DfQueryPlanner {
353 fn new(
354 catalog_manager: CatalogManagerRef,
355 region_query_handler: Option<RegionQueryHandlerRef>,
356 ) -> Self {
357 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
358 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
359 if let Some(region_query_handler) = region_query_handler {
360 planners.push(Arc::new(DistExtensionPlanner::new(
361 catalog_manager,
362 region_query_handler,
363 )));
364 planners.push(Arc::new(MergeSortExtensionPlanner {}));
365 }
366 Self {
367 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
368 }
369 }
370}