Skip to main content

query/dist_plan/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! [ExtensionPlanner] implementation for distributed planner
16
17use std::sync::Arc;
18
19use ahash::HashMap;
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
23use common_telemetry::debug;
24use datafusion::common::Result;
25use datafusion::datasource::DefaultTableSource;
26use datafusion::execution::context::SessionState;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
30use datafusion_common::{DataFusionError, TableReference};
31use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
32use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
33use session::context::QueryContext;
34use snafu::{OptionExt, ResultExt};
35use store_api::storage::RegionId;
36pub use table::metadata::TableType;
37use table::table::adapter::DfTableProviderAdapter;
38use table::table_name::TableName;
39
40use crate::dist_plan::PredicateExtractor;
41use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
42use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
43use crate::dist_plan::region_pruner::ConstraintPruner;
44use crate::error::{CatalogSnafu, PartitionRuleManagerSnafu, TableNotFoundSnafu};
45use crate::region_query::RegionQueryHandlerRef;
46
47/// Planner for convert merge sort logical plan to physical plan
48///
49/// it is currently a fallback to sort, and doesn't change the execution plan:
50/// `MergeSort(MergeScan) -> Sort(MergeScan) - to physical plan -> ...`
51/// It should be applied after `DistExtensionPlanner`
52///
53/// (Later when actually impl this merge sort)
54///
55/// We should ensure the number of partition is not smaller than the number of region at present. Otherwise this would result in incorrect output.
56pub struct MergeSortExtensionPlanner {}
57
58#[async_trait]
59impl ExtensionPlanner for MergeSortExtensionPlanner {
60    async fn plan_extension(
61        &self,
62        planner: &dyn PhysicalPlanner,
63        node: &dyn UserDefinedLogicalNode,
64        _logical_inputs: &[&LogicalPlan],
65        physical_inputs: &[Arc<dyn ExecutionPlan>],
66        session_state: &SessionState,
67    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
68        if let Some(merge_sort) = node.as_any().downcast_ref::<MergeSortLogicalPlan>() {
69            if let LogicalPlan::Extension(ext) = &merge_sort.input.as_ref()
70                && ext
71                    .node
72                    .as_any()
73                    .downcast_ref::<MergeScanLogicalPlan>()
74                    .is_some()
75            {
76                let merge_scan_exec = physical_inputs
77                    .first()
78                    .and_then(|p| p.as_any().downcast_ref::<MergeScanExec>())
79                    .ok_or(DataFusionError::Internal(format!(
80                        "Expect MergeSort's input is a MergeScanExec, found {:?}",
81                        physical_inputs
82                    )))?;
83
84                let partition_cnt = merge_scan_exec.partition_count();
85                let region_cnt = merge_scan_exec.region_count();
86                // if partition >= region, we know that every partition stream of merge scan is ordered
87                // and we only need to do a merge sort, otherwise fallback to quick sort
88                let can_merge_sort = partition_cnt >= region_cnt;
89                if can_merge_sort {
90                    // TODO(discord9): use `SortPreservingMergeExec here`
91                }
92                // for now merge sort only exist in logical plan, and have the same effect as `Sort`
93                // doesn't change the execution plan, this will change in the future
94                let ret = planner
95                    .create_physical_plan(&merge_sort.clone().into_sort(), session_state)
96                    .await?;
97                Ok(Some(ret))
98            } else {
99                Ok(None)
100            }
101        } else {
102            Ok(None)
103        }
104    }
105}
106
107pub struct DistExtensionPlanner {
108    catalog_manager: CatalogManagerRef,
109    partition_rule_manager: PartitionRuleManagerRef,
110    region_query_handler: RegionQueryHandlerRef,
111}
112
113impl DistExtensionPlanner {
114    pub fn new(
115        catalog_manager: CatalogManagerRef,
116        partition_rule_manager: PartitionRuleManagerRef,
117        region_query_handler: RegionQueryHandlerRef,
118    ) -> Self {
119        Self {
120            catalog_manager,
121            partition_rule_manager,
122            region_query_handler,
123        }
124    }
125}
126
127#[async_trait]
128impl ExtensionPlanner for DistExtensionPlanner {
129    async fn plan_extension(
130        &self,
131        planner: &dyn PhysicalPlanner,
132        node: &dyn UserDefinedLogicalNode,
133        _logical_inputs: &[&LogicalPlan],
134        _physical_inputs: &[Arc<dyn ExecutionPlan>],
135        session_state: &SessionState,
136    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
137        let Some(merge_scan) = node.as_any().downcast_ref::<MergeScanLogicalPlan>() else {
138            return Ok(None);
139        };
140
141        let input_plan = merge_scan.input();
142        let fallback = |logical_plan| async move {
143            let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?;
144            planner
145                .create_physical_plan(&optimized_plan, session_state)
146                .await
147                .map(Some)
148        };
149
150        if merge_scan.is_placeholder() {
151            // ignore placeholder
152            return fallback(input_plan).await;
153        }
154
155        let optimized_plan = input_plan;
156        let Some(table_name) = Self::extract_full_table_name(input_plan)? else {
157            // no relation found in input plan, going to execute them locally
158            return fallback(optimized_plan).await;
159        };
160
161        let Ok(regions) = self.get_regions(&table_name, input_plan).await else {
162            // no peers found, going to execute them locally
163            return fallback(optimized_plan).await;
164        };
165
166        // TODO(ruihang): generate different execution plans for different variant merge operation
167        let schema = optimized_plan.schema().as_arrow();
168        let query_ctx = session_state
169            .config()
170            .get_extension()
171            .unwrap_or_else(QueryContext::arc);
172        let merge_scan_plan = MergeScanExec::new(
173            session_state,
174            table_name,
175            regions,
176            input_plan.clone(),
177            schema,
178            self.region_query_handler.clone(),
179            query_ctx,
180            session_state.config().target_partitions(),
181            merge_scan.partition_cols().clone(),
182            merge_scan.remote_dyn_filter_producer_id(),
183        )?;
184        Ok(Some(Arc::new(merge_scan_plan) as _))
185    }
186}
187
188impl DistExtensionPlanner {
189    /// Extract fully resolved table name from logical plan
190    fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {
191        let mut extractor = TableNameExtractor::default();
192        let _ = plan.visit(&mut extractor)?;
193        Ok(extractor.table_name)
194    }
195
196    async fn get_regions(
197        &self,
198        table_name: &TableName,
199        logical_plan: &LogicalPlan,
200    ) -> Result<Vec<RegionId>> {
201        let table = self
202            .catalog_manager
203            .table(
204                &table_name.catalog_name,
205                &table_name.schema_name,
206                &table_name.table_name,
207                None,
208            )
209            .await
210            .context(CatalogSnafu)?
211            .with_context(|| TableNotFoundSnafu {
212                table: table_name.to_string(),
213            })?;
214
215        let table_info = table.table_info();
216        let physical_table_route = self
217            .partition_rule_manager
218            .find_physical_table_route(table_info.table_id())
219            .await
220            .context(PartitionRuleManagerSnafu)?;
221        let all_regions = physical_table_route
222            .region_routes
223            .iter()
224            .map(|r| RegionId::new(table_info.table_id(), r.region.id.region_number()))
225            .collect::<Vec<_>>();
226        // Extract partition columns
227        let partition_columns: Vec<String> =
228            table_info.meta.partition_column_names().cloned().collect();
229        debug!(
230            "DistExtensionPlanner: loaded table partition metadata, table: {}, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}, all_regions: {:?}",
231            table_name,
232            table_info.table_id(),
233            table_info.meta.partition_key_indices,
234            partition_columns,
235            all_regions,
236        );
237        if partition_columns.is_empty() {
238            return Ok(all_regions);
239        }
240        let partition_column_types = partition_columns
241            .iter()
242            .map(|col_name| {
243                let data_type = table_info
244                    .meta
245                    .schema
246                    .column_schema_by_name(col_name)
247                    // Safety: names are retrieved above from the same table
248                    .unwrap()
249                    .data_type
250                    .clone();
251                (col_name.clone(), data_type)
252            })
253            .collect::<HashMap<_, _>>();
254
255        // Extract predicates from logical plan
256        let partition_expressions = match PredicateExtractor::extract_partition_expressions(
257            logical_plan,
258            &partition_columns,
259        ) {
260            Ok(expressions) => expressions,
261            Err(err) => {
262                common_telemetry::debug!(
263                    "Failed to extract partition expressions for table {} (id: {}), using all regions: {:?}",
264                    table_name,
265                    table.table_info().table_id(),
266                    err
267                );
268                return Ok(all_regions);
269            }
270        };
271
272        if partition_expressions.is_empty() {
273            return Ok(all_regions);
274        }
275
276        // Get partition information for the table if partition rule manager is available
277        let partitions = match create_partitions_from_region_routes(
278            table_info.table_id(),
279            &physical_table_route.region_routes,
280        ) {
281            Ok(partitions) => partitions,
282            Err(err) => {
283                common_telemetry::debug!(
284                    "Failed to get partition information for table {}, using all regions: {:?}",
285                    table_name,
286                    err
287                );
288                return Ok(all_regions);
289            }
290        };
291        if partitions.is_empty() {
292            return Ok(all_regions);
293        }
294
295        // Apply region pruning based on partition rules
296        let pruned_regions = match ConstraintPruner::prune_regions(
297            &partition_expressions,
298            &partitions,
299            partition_column_types,
300        ) {
301            Ok(regions) => regions,
302            Err(err) => {
303                common_telemetry::debug!(
304                    "Failed to prune regions for table {}, using all regions: {:?}",
305                    table_name,
306                    err
307                );
308                return Ok(all_regions);
309            }
310        };
311
312        common_telemetry::debug!(
313            "Region pruning for table {}: {} partition expressions applied, pruned from {} to {} regions",
314            table_name,
315            partition_expressions.len(),
316            all_regions.len(),
317            pruned_regions.len()
318        );
319
320        Ok(pruned_regions)
321    }
322
323    /// Input logical plan is analyzed. Thus only call logical optimizer to optimize it.
324    fn optimize_input_logical_plan(
325        &self,
326        session_state: &SessionState,
327        plan: &LogicalPlan,
328    ) -> Result<LogicalPlan> {
329        let state = session_state.clone();
330        state.optimizer().optimize(plan.clone(), &state, |_, _| {})
331    }
332}
333
334/// Visitor to extract table name from logical plan (TableScan node)
335#[derive(Default)]
336struct TableNameExtractor {
337    pub table_name: Option<TableName>,
338}
339
340impl TreeNodeVisitor<'_> for TableNameExtractor {
341    type Node = LogicalPlan;
342
343    fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
344        match node {
345            LogicalPlan::TableScan(scan) => {
346                if let Some(source) = scan.source.as_any().downcast_ref::<DefaultTableSource>()
347                    && let Some(provider) = source
348                        .table_provider
349                        .as_any()
350                        .downcast_ref::<DfTableProviderAdapter>()
351                {
352                    if provider.table().table_type() == TableType::Base {
353                        let info = provider.table().table_info();
354                        self.table_name = Some(TableName::new(
355                            info.catalog_name.clone(),
356                            info.schema_name.clone(),
357                            info.name.clone(),
358                        ));
359                    }
360                    return Ok(TreeNodeRecursion::Stop);
361                }
362                match &scan.table_name {
363                    TableReference::Full {
364                        catalog,
365                        schema,
366                        table,
367                    } => {
368                        self.table_name = Some(TableName::new(
369                            catalog.to_string(),
370                            schema.to_string(),
371                            table.to_string(),
372                        ));
373                        Ok(TreeNodeRecursion::Stop)
374                    }
375                    // TODO(ruihang): Maybe the following two cases should not be valid
376                    TableReference::Partial { schema, table } => {
377                        self.table_name = Some(TableName::new(
378                            DEFAULT_CATALOG_NAME.to_string(),
379                            schema.to_string(),
380                            table.to_string(),
381                        ));
382                        Ok(TreeNodeRecursion::Stop)
383                    }
384                    TableReference::Bare { table } => {
385                        self.table_name = Some(TableName::new(
386                            DEFAULT_CATALOG_NAME.to_string(),
387                            DEFAULT_SCHEMA_NAME.to_string(),
388                            table.to_string(),
389                        ));
390                        Ok(TreeNodeRecursion::Stop)
391                    }
392                }
393            }
394            _ => Ok(TreeNodeRecursion::Continue),
395        }
396    }
397}