query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function_factory::ScalarFunctionFactory;
23use common_function::handlers::{
24    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::state::FunctionState;
27use common_telemetry::warn;
28use datafusion::dataframe::DataFrame;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
31use datafusion::execution::runtime_env::RuntimeEnv;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
34use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
35use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
36use datafusion::physical_optimizer::PhysicalOptimizerRule;
37use datafusion::physical_plan::ExecutionPlan;
38use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
39use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
40use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
41use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
42use datafusion_optimizer::optimizer::Optimizer;
43use promql::extension_plan::PromExtensionPlanner;
44use table::table::adapter::DfTableProviderAdapter;
45use table::TableRef;
46
47use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer, MergeSortExtensionPlanner};
48use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
49use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
50use crate::optimizer::parallelize_scan::ParallelizeScan;
51use crate::optimizer::pass_distribution::PassDistribution;
52use crate::optimizer::remove_duplicate::RemoveDuplicate;
53use crate::optimizer::scan_hint::ScanHintRule;
54use crate::optimizer::string_normalization::StringNormalizationRule;
55use crate::optimizer::transcribe_atat::TranscribeAtatRule;
56use crate::optimizer::type_conversion::TypeConversionRule;
57use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
58use crate::optimizer::ExtensionAnalyzerRule;
59use crate::options::QueryOptions as QueryOptionsNew;
60use crate::query_engine::options::QueryOptions;
61use crate::query_engine::DefaultSerializer;
62use crate::range_select::planner::RangeSelectPlanner;
63use crate::region_query::RegionQueryHandlerRef;
64use crate::QueryEngineContext;
65
66/// Query engine global state
67#[derive(Clone)]
68pub struct QueryEngineState {
69    df_context: SessionContext,
70    catalog_manager: CatalogManagerRef,
71    function_state: Arc<FunctionState>,
72    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
73    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
74    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
75    plugins: Plugins,
76}
77
78impl fmt::Debug for QueryEngineState {
79    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80        f.debug_struct("QueryEngineState")
81            .field("state", &self.df_context.state())
82            .finish()
83    }
84}
85
86impl QueryEngineState {
87    #[allow(clippy::too_many_arguments)]
88    pub fn new(
89        catalog_list: CatalogManagerRef,
90        region_query_handler: Option<RegionQueryHandlerRef>,
91        table_mutation_handler: Option<TableMutationHandlerRef>,
92        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
93        flow_service_handler: Option<FlowServiceHandlerRef>,
94        with_dist_planner: bool,
95        plugins: Plugins,
96        options: QueryOptionsNew,
97    ) -> Self {
98        let runtime_env = Arc::new(RuntimeEnv::default());
99        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
100        if options.parallelism > 0 {
101            session_config = session_config.with_target_partitions(options.parallelism);
102        }
103
104        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
105        // and we can add that check back once we upgrade datafusion.
106        session_config
107            .options_mut()
108            .execution
109            .skip_physical_aggregate_schema_check = true;
110
111        // Apply extension rules
112        let mut extension_rules = Vec::new();
113
114        // The [`TypeConversionRule`] must be at first
115        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
116
117        // Apply the datafusion rules
118        let mut analyzer = Analyzer::new();
119        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
120        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
121
122        // Use our custom rule instead to optimize the count(*) query
123        Self::remove_analyzer_rule(&mut analyzer.rules, CountWildcardRule {}.name());
124        analyzer
125            .rules
126            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
127
128        if with_dist_planner {
129            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
130        }
131
132        let mut optimizer = Optimizer::new();
133        optimizer.rules.push(Arc::new(ScanHintRule));
134
135        // add physical optimizer
136        let mut physical_optimizer = PhysicalOptimizer::new();
137        // Change TableScan's partition at first
138        physical_optimizer
139            .rules
140            .insert(0, Arc::new(ParallelizeScan));
141        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
142        physical_optimizer
143            .rules
144            .insert(1, Arc::new(PassDistribution));
145        physical_optimizer
146            .rules
147            .insert(2, Arc::new(EnforceSorting {}));
148        // Add rule for windowed sort
149        physical_optimizer
150            .rules
151            .push(Arc::new(WindowedSortPhysicalRule));
152        physical_optimizer
153            .rules
154            .push(Arc::new(MatchesConstantTermOptimizer));
155        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
156        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
157        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
158        Self::remove_physical_optimizer_rule(
159            &mut physical_optimizer.rules,
160            SanityCheckPlan {}.name(),
161        );
162        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
163
164        let session_state = SessionStateBuilder::new()
165            .with_config(session_config)
166            .with_runtime_env(runtime_env)
167            .with_default_features()
168            .with_analyzer_rules(analyzer.rules)
169            .with_serializer_registry(Arc::new(DefaultSerializer))
170            .with_query_planner(Arc::new(DfQueryPlanner::new(
171                catalog_list.clone(),
172                region_query_handler,
173            )))
174            .with_optimizer_rules(optimizer.rules)
175            .with_physical_optimizer_rules(physical_optimizer.rules)
176            .build();
177
178        let df_context = SessionContext::new_with_state(session_state);
179
180        Self {
181            df_context,
182            catalog_manager: catalog_list,
183            function_state: Arc::new(FunctionState {
184                table_mutation_handler,
185                procedure_service_handler,
186                flow_service_handler,
187            }),
188            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
189            extension_rules,
190            plugins,
191            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
192        }
193    }
194
195    fn remove_analyzer_rule(rules: &mut Vec<Arc<dyn AnalyzerRule + Send + Sync>>, name: &str) {
196        rules.retain(|rule| rule.name() != name);
197    }
198
199    fn remove_physical_optimizer_rule(
200        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
201        name: &str,
202    ) {
203        rules.retain(|rule| rule.name() != name);
204    }
205
206    /// Optimize the logical plan by the extension anayzer rules.
207    pub fn optimize_by_extension_rules(
208        &self,
209        plan: DfLogicalPlan,
210        context: &QueryEngineContext,
211    ) -> DfResult<DfLogicalPlan> {
212        self.extension_rules
213            .iter()
214            .try_fold(plan, |acc_plan, rule| {
215                rule.analyze(acc_plan, context, self.session_state().config_options())
216            })
217    }
218
219    /// Run the full logical plan optimize phase for the given plan.
220    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
221        self.session_state().optimize(&plan)
222    }
223
224    /// Retrieve the scalar function by name
225    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
226        self.scalar_functions
227            .read()
228            .unwrap()
229            .get(function_name)
230            .cloned()
231    }
232
233    /// Retrieve scalar function names.
234    pub fn scalar_names(&self) -> Vec<String> {
235        self.scalar_functions
236            .read()
237            .unwrap()
238            .keys()
239            .cloned()
240            .collect()
241    }
242
243    /// Retrieve the aggregate function by name
244    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
245        self.aggr_functions
246            .read()
247            .unwrap()
248            .get(function_name)
249            .cloned()
250    }
251
252    /// Retrieve aggregate function names.
253    pub fn aggr_names(&self) -> Vec<String> {
254        self.aggr_functions
255            .read()
256            .unwrap()
257            .keys()
258            .cloned()
259            .collect()
260    }
261
262    /// Register an scalar function.
263    /// Will override if the function with same name is already registered.
264    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
265        let name = func.name().to_string();
266        let x = self
267            .scalar_functions
268            .write()
269            .unwrap()
270            .insert(name.clone(), func);
271
272        if x.is_some() {
273            warn!("Already registered scalar function '{name}'");
274        }
275    }
276
277    /// Register an aggregate function.
278    ///
279    /// # Panics
280    /// Will panic if the function with same name is already registered.
281    ///
282    /// Panicking consideration: currently the aggregated functions are all statically registered,
283    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
284    /// invariant is broken in the future, we should return an error instead of panicking.
285    pub fn register_aggr_function(&self, func: AggregateUDF) {
286        let name = func.name().to_string();
287        let x = self
288            .aggr_functions
289            .write()
290            .unwrap()
291            .insert(name.clone(), func);
292        assert!(
293            x.is_none(),
294            "Already registered aggregate function '{name}'"
295        );
296    }
297
298    pub fn catalog_manager(&self) -> &CatalogManagerRef {
299        &self.catalog_manager
300    }
301
302    pub fn function_state(&self) -> Arc<FunctionState> {
303        self.function_state.clone()
304    }
305
306    /// Returns the [`TableMutationHandlerRef`] in state.
307    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
308        self.function_state.table_mutation_handler.as_ref()
309    }
310
311    /// Returns the [`ProcedureServiceHandlerRef`] in state.
312    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
313        self.function_state.procedure_service_handler.as_ref()
314    }
315
316    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
317        self.plugins
318            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
319            .unwrap_or(false)
320    }
321
322    pub fn session_state(&self) -> SessionState {
323        self.df_context.state()
324    }
325
326    /// Create a DataFrame for a table
327    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
328        self.df_context
329            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
330    }
331}
332
333struct DfQueryPlanner {
334    physical_planner: DefaultPhysicalPlanner,
335}
336
337impl fmt::Debug for DfQueryPlanner {
338    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
339        f.debug_struct("DfQueryPlanner").finish()
340    }
341}
342
343#[async_trait]
344impl QueryPlanner for DfQueryPlanner {
345    async fn create_physical_plan(
346        &self,
347        logical_plan: &DfLogicalPlan,
348        session_state: &SessionState,
349    ) -> DfResult<Arc<dyn ExecutionPlan>> {
350        self.physical_planner
351            .create_physical_plan(logical_plan, session_state)
352            .await
353    }
354}
355
356impl DfQueryPlanner {
357    fn new(
358        catalog_manager: CatalogManagerRef,
359        region_query_handler: Option<RegionQueryHandlerRef>,
360    ) -> Self {
361        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
362            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
363        if let Some(region_query_handler) = region_query_handler {
364            planners.push(Arc::new(DistExtensionPlanner::new(
365                catalog_manager,
366                region_query_handler,
367            )));
368            planners.push(Arc::new(MergeSortExtensionPlanner {}));
369        }
370        Self {
371            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
372        }
373    }
374}