query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function::FunctionRef;
23use common_function::handlers::{
24    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::scalars::aggregate::AggregateFunctionMetaRef;
27use common_function::state::FunctionState;
28use common_telemetry::warn;
29use datafusion::dataframe::DataFrame;
30use datafusion::error::Result as DfResult;
31use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
32use datafusion::execution::runtime_env::RuntimeEnv;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
35use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
36use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
37use datafusion::physical_optimizer::PhysicalOptimizerRule;
38use datafusion::physical_plan::ExecutionPlan;
39use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
40use datafusion_expr::LogicalPlan as DfLogicalPlan;
41use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
42use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
43use datafusion_optimizer::optimizer::Optimizer;
44use promql::extension_plan::PromExtensionPlanner;
45use table::table::adapter::DfTableProviderAdapter;
46use table::TableRef;
47
48use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer, MergeSortExtensionPlanner};
49use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
50use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
51use crate::optimizer::parallelize_scan::ParallelizeScan;
52use crate::optimizer::pass_distribution::PassDistribution;
53use crate::optimizer::remove_duplicate::RemoveDuplicate;
54use crate::optimizer::scan_hint::ScanHintRule;
55use crate::optimizer::string_normalization::StringNormalizationRule;
56use crate::optimizer::transcribe_atat::TranscribeAtatRule;
57use crate::optimizer::type_conversion::TypeConversionRule;
58use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::options::QueryOptions as QueryOptionsNew;
61use crate::query_engine::options::QueryOptions;
62use crate::query_engine::DefaultSerializer;
63use crate::range_select::planner::RangeSelectPlanner;
64use crate::region_query::RegionQueryHandlerRef;
65use crate::QueryEngineContext;
66
67/// Query engine global state
68#[derive(Clone)]
69pub struct QueryEngineState {
70    df_context: SessionContext,
71    catalog_manager: CatalogManagerRef,
72    function_state: Arc<FunctionState>,
73    udf_functions: Arc<RwLock<HashMap<String, FunctionRef>>>,
74    aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
75    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
76    plugins: Plugins,
77}
78
79impl fmt::Debug for QueryEngineState {
80    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81        f.debug_struct("QueryEngineState")
82            .field("state", &self.df_context.state())
83            .finish()
84    }
85}
86
87impl QueryEngineState {
88    #[allow(clippy::too_many_arguments)]
89    pub fn new(
90        catalog_list: CatalogManagerRef,
91        region_query_handler: Option<RegionQueryHandlerRef>,
92        table_mutation_handler: Option<TableMutationHandlerRef>,
93        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
94        flow_service_handler: Option<FlowServiceHandlerRef>,
95        with_dist_planner: bool,
96        plugins: Plugins,
97        options: QueryOptionsNew,
98    ) -> Self {
99        let runtime_env = Arc::new(RuntimeEnv::default());
100        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
101        if options.parallelism > 0 {
102            session_config = session_config.with_target_partitions(options.parallelism);
103        }
104
105        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
106        // and we can add that check back once we upgrade datafusion.
107        session_config
108            .options_mut()
109            .execution
110            .skip_physical_aggregate_schema_check = true;
111
112        // Apply extension rules
113        let mut extension_rules = Vec::new();
114
115        // The [`TypeConversionRule`] must be at first
116        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
117
118        // Apply the datafusion rules
119        let mut analyzer = Analyzer::new();
120        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
121        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
122
123        // Use our custom rule instead to optimize the count(*) query
124        Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name());
125        analyzer
126            .rules
127            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
128
129        if with_dist_planner {
130            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
131        }
132
133        let mut optimizer = Optimizer::new();
134        optimizer.rules.push(Arc::new(ScanHintRule));
135
136        // add physical optimizer
137        let mut physical_optimizer = PhysicalOptimizer::new();
138        // Change TableScan's partition at first
139        physical_optimizer
140            .rules
141            .insert(0, Arc::new(ParallelizeScan));
142        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
143        physical_optimizer
144            .rules
145            .insert(1, Arc::new(PassDistribution));
146        physical_optimizer
147            .rules
148            .insert(2, Arc::new(EnforceSorting {}));
149        // Add rule for windowed sort
150        physical_optimizer
151            .rules
152            .push(Arc::new(WindowedSortPhysicalRule));
153        physical_optimizer
154            .rules
155            .push(Arc::new(MatchesConstantTermOptimizer));
156        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
157        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
158        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
159        Self::remove_physical_optimizer_rule(
160            &mut physical_optimizer.rules,
161            SanityCheckPlan {}.name(),
162        );
163        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
164
165        let session_state = SessionStateBuilder::new()
166            .with_config(session_config)
167            .with_runtime_env(runtime_env)
168            .with_default_features()
169            .with_analyzer_rules(analyzer.rules)
170            .with_serializer_registry(Arc::new(DefaultSerializer))
171            .with_query_planner(Arc::new(DfQueryPlanner::new(
172                catalog_list.clone(),
173                region_query_handler,
174            )))
175            .with_optimizer_rules(optimizer.rules)
176            .with_physical_optimizer_rules(physical_optimizer.rules)
177            .build();
178
179        let df_context = SessionContext::new_with_state(session_state);
180
181        Self {
182            df_context,
183            catalog_manager: catalog_list,
184            function_state: Arc::new(FunctionState {
185                table_mutation_handler,
186                procedure_service_handler,
187                flow_service_handler,
188            }),
189            aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
190            extension_rules,
191            plugins,
192            udf_functions: Arc::new(RwLock::new(HashMap::new())),
193        }
194    }
195
196    fn remove_analyzer_rule(rules: &mut Vec<Arc<dyn AnalyzerRule + Send + Sync>>, name: &str) {
197        rules.retain(|rule| rule.name() != name);
198    }
199
200    fn remove_physical_optimizer_rule(
201        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
202        name: &str,
203    ) {
204        rules.retain(|rule| rule.name() != name);
205    }
206
207    /// Optimize the logical plan by the extension anayzer rules.
208    pub fn optimize_by_extension_rules(
209        &self,
210        plan: DfLogicalPlan,
211        context: &QueryEngineContext,
212    ) -> DfResult<DfLogicalPlan> {
213        self.extension_rules
214            .iter()
215            .try_fold(plan, |acc_plan, rule| {
216                rule.analyze(acc_plan, context, self.session_state().config_options())
217            })
218    }
219
220    /// Run the full logical plan optimize phase for the given plan.
221    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
222        self.session_state().optimize(&plan)
223    }
224
225    /// Register an udf function.
226    /// Will override if the function with same name is already registered.
227    pub fn register_function(&self, func: FunctionRef) {
228        let name = func.name().to_string();
229        let x = self
230            .udf_functions
231            .write()
232            .unwrap()
233            .insert(name.clone(), func);
234
235        if x.is_some() {
236            warn!("Already registered udf function '{name}'");
237        }
238    }
239
240    /// Retrieve the udf function by name
241    pub fn udf_function(&self, function_name: &str) -> Option<FunctionRef> {
242        self.udf_functions
243            .read()
244            .unwrap()
245            .get(function_name)
246            .cloned()
247    }
248
249    /// Retrieve udf function names.
250    pub fn udf_names(&self) -> Vec<String> {
251        self.udf_functions.read().unwrap().keys().cloned().collect()
252    }
253
254    /// Retrieve the aggregate function by name
255    pub fn aggregate_function(&self, function_name: &str) -> Option<AggregateFunctionMetaRef> {
256        self.aggregate_functions
257            .read()
258            .unwrap()
259            .get(function_name)
260            .cloned()
261    }
262
263    /// Retrieve aggregate function names.
264    pub fn udaf_names(&self) -> Vec<String> {
265        self.aggregate_functions
266            .read()
267            .unwrap()
268            .keys()
269            .cloned()
270            .collect()
271    }
272
273    /// Register an aggregate function.
274    ///
275    /// # Panics
276    /// Will panic if the function with same name is already registered.
277    ///
278    /// Panicking consideration: currently the aggregated functions are all statically registered,
279    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
280    /// invariant is broken in the future, we should return an error instead of panicking.
281    pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
282        let name = func.name();
283        let x = self
284            .aggregate_functions
285            .write()
286            .unwrap()
287            .insert(name.clone(), func);
288        assert!(
289            x.is_none(),
290            "Already registered aggregate function '{name}'"
291        );
292    }
293
294    pub fn catalog_manager(&self) -> &CatalogManagerRef {
295        &self.catalog_manager
296    }
297
298    pub fn function_state(&self) -> Arc<FunctionState> {
299        self.function_state.clone()
300    }
301
302    /// Returns the [`TableMutationHandlerRef`] in state.
303    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
304        self.function_state.table_mutation_handler.as_ref()
305    }
306
307    /// Returns the [`ProcedureServiceHandlerRef`] in state.
308    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
309        self.function_state.procedure_service_handler.as_ref()
310    }
311
312    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
313        self.plugins
314            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
315            .unwrap_or(false)
316    }
317
318    pub fn session_state(&self) -> SessionState {
319        self.df_context.state()
320    }
321
322    /// Create a DataFrame for a table
323    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
324        self.df_context
325            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
326    }
327}
328
329struct DfQueryPlanner {
330    physical_planner: DefaultPhysicalPlanner,
331}
332
333impl fmt::Debug for DfQueryPlanner {
334    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
335        f.debug_struct("DfQueryPlanner").finish()
336    }
337}
338
339#[async_trait]
340impl QueryPlanner for DfQueryPlanner {
341    async fn create_physical_plan(
342        &self,
343        logical_plan: &DfLogicalPlan,
344        session_state: &SessionState,
345    ) -> DfResult<Arc<dyn ExecutionPlan>> {
346        self.physical_planner
347            .create_physical_plan(logical_plan, session_state)
348            .await
349    }
350}
351
352impl DfQueryPlanner {
353    fn new(
354        catalog_manager: CatalogManagerRef,
355        region_query_handler: Option<RegionQueryHandlerRef>,
356    ) -> Self {
357        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
358            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
359        if let Some(region_query_handler) = region_query_handler {
360            planners.push(Arc::new(DistExtensionPlanner::new(
361                catalog_manager,
362                region_query_handler,
363            )));
364            planners.push(Arc::new(MergeSortExtensionPlanner {}));
365        }
366        Self {
367            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
368        }
369    }
370}