query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function_factory::ScalarFunctionFactory;
23use common_function::handlers::{
24    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::state::FunctionState;
27use common_telemetry::warn;
28use datafusion::dataframe::DataFrame;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
31use datafusion::execution::runtime_env::RuntimeEnv;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
34use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
35use datafusion::physical_optimizer::PhysicalOptimizerRule;
36use datafusion::physical_plan::ExecutionPlan;
37use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
38use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
39use datafusion_optimizer::analyzer::Analyzer;
40use datafusion_optimizer::optimizer::Optimizer;
41use partition::manager::PartitionRuleManagerRef;
42use promql::extension_plan::PromExtensionPlanner;
43use table::table::adapter::DfTableProviderAdapter;
44use table::TableRef;
45
46use crate::dist_plan::{
47    DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
48};
49use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
50use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
51use crate::optimizer::parallelize_scan::ParallelizeScan;
52use crate::optimizer::pass_distribution::PassDistribution;
53use crate::optimizer::remove_duplicate::RemoveDuplicate;
54use crate::optimizer::scan_hint::ScanHintRule;
55use crate::optimizer::string_normalization::StringNormalizationRule;
56use crate::optimizer::transcribe_atat::TranscribeAtatRule;
57use crate::optimizer::type_conversion::TypeConversionRule;
58use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::options::QueryOptions as QueryOptionsNew;
61use crate::query_engine::options::QueryOptions;
62use crate::query_engine::DefaultSerializer;
63use crate::range_select::planner::RangeSelectPlanner;
64use crate::region_query::RegionQueryHandlerRef;
65use crate::QueryEngineContext;
66
67/// Query engine global state
68#[derive(Clone)]
69pub struct QueryEngineState {
70    df_context: SessionContext,
71    catalog_manager: CatalogManagerRef,
72    function_state: Arc<FunctionState>,
73    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
74    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
75    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
76    plugins: Plugins,
77}
78
79impl fmt::Debug for QueryEngineState {
80    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81        f.debug_struct("QueryEngineState")
82            .field("state", &self.df_context.state())
83            .finish()
84    }
85}
86
87impl QueryEngineState {
88    #[allow(clippy::too_many_arguments)]
89    pub fn new(
90        catalog_list: CatalogManagerRef,
91        partition_rule_manager: Option<PartitionRuleManagerRef>,
92        region_query_handler: Option<RegionQueryHandlerRef>,
93        table_mutation_handler: Option<TableMutationHandlerRef>,
94        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
95        flow_service_handler: Option<FlowServiceHandlerRef>,
96        with_dist_planner: bool,
97        plugins: Plugins,
98        options: QueryOptionsNew,
99    ) -> Self {
100        let runtime_env = Arc::new(RuntimeEnv::default());
101        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
102        if options.parallelism > 0 {
103            session_config = session_config.with_target_partitions(options.parallelism);
104        }
105        if options.allow_query_fallback {
106            session_config
107                .options_mut()
108                .extensions
109                .insert(DistPlannerOptions {
110                    allow_query_fallback: true,
111                });
112        }
113
114        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
115        // and we can add that check back once we upgrade datafusion.
116        session_config
117            .options_mut()
118            .execution
119            .skip_physical_aggregate_schema_check = true;
120
121        // Apply extension rules
122        let mut extension_rules = Vec::new();
123
124        // The [`TypeConversionRule`] must be at first
125        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
126
127        // Apply the datafusion rules
128        let mut analyzer = Analyzer::new();
129        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
130        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
131        analyzer
132            .rules
133            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
134
135        if with_dist_planner {
136            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
137        }
138
139        let mut optimizer = Optimizer::new();
140        optimizer.rules.push(Arc::new(ScanHintRule));
141
142        // add physical optimizer
143        let mut physical_optimizer = PhysicalOptimizer::new();
144        // Change TableScan's partition right before enforcing distribution
145        physical_optimizer
146            .rules
147            .insert(5, Arc::new(ParallelizeScan));
148        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
149        physical_optimizer
150            .rules
151            .insert(6, Arc::new(PassDistribution));
152        // Enforce sorting AFTER custom rules that modify the plan structure
153        physical_optimizer.rules.insert(
154            7,
155            Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
156        );
157        // Add rule for windowed sort
158        physical_optimizer
159            .rules
160            .push(Arc::new(WindowedSortPhysicalRule));
161        physical_optimizer
162            .rules
163            .push(Arc::new(MatchesConstantTermOptimizer));
164        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
165        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
166        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
167        Self::remove_physical_optimizer_rule(
168            &mut physical_optimizer.rules,
169            SanityCheckPlan {}.name(),
170        );
171        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
172
173        let session_state = SessionStateBuilder::new()
174            .with_config(session_config)
175            .with_runtime_env(runtime_env)
176            .with_default_features()
177            .with_analyzer_rules(analyzer.rules)
178            .with_serializer_registry(Arc::new(DefaultSerializer))
179            .with_query_planner(Arc::new(DfQueryPlanner::new(
180                catalog_list.clone(),
181                partition_rule_manager,
182                region_query_handler,
183            )))
184            .with_optimizer_rules(optimizer.rules)
185            .with_physical_optimizer_rules(physical_optimizer.rules)
186            .build();
187
188        let df_context = SessionContext::new_with_state(session_state);
189
190        Self {
191            df_context,
192            catalog_manager: catalog_list,
193            function_state: Arc::new(FunctionState {
194                table_mutation_handler,
195                procedure_service_handler,
196                flow_service_handler,
197            }),
198            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
199            extension_rules,
200            plugins,
201            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
202        }
203    }
204
205    fn remove_physical_optimizer_rule(
206        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
207        name: &str,
208    ) {
209        rules.retain(|rule| rule.name() != name);
210    }
211
212    /// Optimize the logical plan by the extension anayzer rules.
213    pub fn optimize_by_extension_rules(
214        &self,
215        plan: DfLogicalPlan,
216        context: &QueryEngineContext,
217    ) -> DfResult<DfLogicalPlan> {
218        self.extension_rules
219            .iter()
220            .try_fold(plan, |acc_plan, rule| {
221                rule.analyze(acc_plan, context, self.session_state().config_options())
222            })
223    }
224
225    /// Run the full logical plan optimize phase for the given plan.
226    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
227        self.session_state().optimize(&plan)
228    }
229
230    /// Retrieve the scalar function by name
231    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
232        self.scalar_functions
233            .read()
234            .unwrap()
235            .get(function_name)
236            .cloned()
237    }
238
239    /// Retrieve scalar function names.
240    pub fn scalar_names(&self) -> Vec<String> {
241        self.scalar_functions
242            .read()
243            .unwrap()
244            .keys()
245            .cloned()
246            .collect()
247    }
248
249    /// Retrieve the aggregate function by name
250    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
251        self.aggr_functions
252            .read()
253            .unwrap()
254            .get(function_name)
255            .cloned()
256    }
257
258    /// Retrieve aggregate function names.
259    pub fn aggr_names(&self) -> Vec<String> {
260        self.aggr_functions
261            .read()
262            .unwrap()
263            .keys()
264            .cloned()
265            .collect()
266    }
267
268    /// Register an scalar function.
269    /// Will override if the function with same name is already registered.
270    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
271        let name = func.name().to_string();
272        let x = self
273            .scalar_functions
274            .write()
275            .unwrap()
276            .insert(name.clone(), func);
277
278        if x.is_some() {
279            warn!("Already registered scalar function '{name}'");
280        }
281    }
282
283    /// Register an aggregate function.
284    ///
285    /// # Panics
286    /// Will panic if the function with same name is already registered.
287    ///
288    /// Panicking consideration: currently the aggregated functions are all statically registered,
289    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
290    /// invariant is broken in the future, we should return an error instead of panicking.
291    pub fn register_aggr_function(&self, func: AggregateUDF) {
292        let name = func.name().to_string();
293        let x = self
294            .aggr_functions
295            .write()
296            .unwrap()
297            .insert(name.clone(), func);
298        assert!(
299            x.is_none(),
300            "Already registered aggregate function '{name}'"
301        );
302    }
303
304    pub fn catalog_manager(&self) -> &CatalogManagerRef {
305        &self.catalog_manager
306    }
307
308    pub fn function_state(&self) -> Arc<FunctionState> {
309        self.function_state.clone()
310    }
311
312    /// Returns the [`TableMutationHandlerRef`] in state.
313    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
314        self.function_state.table_mutation_handler.as_ref()
315    }
316
317    /// Returns the [`ProcedureServiceHandlerRef`] in state.
318    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
319        self.function_state.procedure_service_handler.as_ref()
320    }
321
322    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
323        self.plugins
324            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
325            .unwrap_or(false)
326    }
327
328    pub fn session_state(&self) -> SessionState {
329        self.df_context.state()
330    }
331
332    /// Create a DataFrame for a table
333    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
334        self.df_context
335            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
336    }
337}
338
339struct DfQueryPlanner {
340    physical_planner: DefaultPhysicalPlanner,
341}
342
343impl fmt::Debug for DfQueryPlanner {
344    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
345        f.debug_struct("DfQueryPlanner").finish()
346    }
347}
348
349#[async_trait]
350impl QueryPlanner for DfQueryPlanner {
351    async fn create_physical_plan(
352        &self,
353        logical_plan: &DfLogicalPlan,
354        session_state: &SessionState,
355    ) -> DfResult<Arc<dyn ExecutionPlan>> {
356        self.physical_planner
357            .create_physical_plan(logical_plan, session_state)
358            .await
359    }
360}
361
362impl DfQueryPlanner {
363    fn new(
364        catalog_manager: CatalogManagerRef,
365        partition_rule_manager: Option<PartitionRuleManagerRef>,
366        region_query_handler: Option<RegionQueryHandlerRef>,
367    ) -> Self {
368        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
369            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
370        if let (Some(region_query_handler), Some(partition_rule_manager)) =
371            (region_query_handler, partition_rule_manager)
372        {
373            planners.push(Arc::new(DistExtensionPlanner::new(
374                catalog_manager,
375                partition_rule_manager,
376                region_query_handler,
377            )));
378            planners.push(Arc::new(MergeSortExtensionPlanner {}));
379        }
380        Self {
381            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
382        }
383    }
384}