1use std::sync::Arc;
18
19use ahash::HashMap;
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
23use datafusion::common::Result;
24use datafusion::datasource::DefaultTableSource;
25use datafusion::execution::context::SessionState;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
28use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
29use datafusion_common::{DataFusionError, TableReference};
30use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
31use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
32use session::context::QueryContext;
33use snafu::{OptionExt, ResultExt};
34use store_api::storage::RegionId;
35pub use table::metadata::TableType;
36use table::table::adapter::DfTableProviderAdapter;
37use table::table_name::TableName;
38
39use crate::dist_plan::PredicateExtractor;
40use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
41use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
42use crate::dist_plan::region_pruner::ConstraintPruner;
43use crate::error::{CatalogSnafu, PartitionRuleManagerSnafu, TableNotFoundSnafu};
44use crate::region_query::RegionQueryHandlerRef;
45
46pub struct MergeSortExtensionPlanner {}
56
57#[async_trait]
58impl ExtensionPlanner for MergeSortExtensionPlanner {
59 async fn plan_extension(
60 &self,
61 planner: &dyn PhysicalPlanner,
62 node: &dyn UserDefinedLogicalNode,
63 _logical_inputs: &[&LogicalPlan],
64 physical_inputs: &[Arc<dyn ExecutionPlan>],
65 session_state: &SessionState,
66 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
67 if let Some(merge_sort) = node.as_any().downcast_ref::<MergeSortLogicalPlan>() {
68 if let LogicalPlan::Extension(ext) = &merge_sort.input.as_ref()
69 && ext
70 .node
71 .as_any()
72 .downcast_ref::<MergeScanLogicalPlan>()
73 .is_some()
74 {
75 let merge_scan_exec = physical_inputs
76 .first()
77 .and_then(|p| p.as_any().downcast_ref::<MergeScanExec>())
78 .ok_or(DataFusionError::Internal(format!(
79 "Expect MergeSort's input is a MergeScanExec, found {:?}",
80 physical_inputs
81 )))?;
82
83 let partition_cnt = merge_scan_exec.partition_count();
84 let region_cnt = merge_scan_exec.region_count();
85 let can_merge_sort = partition_cnt >= region_cnt;
88 if can_merge_sort {
89 }
91 let ret = planner
94 .create_physical_plan(&merge_sort.clone().into_sort(), session_state)
95 .await?;
96 Ok(Some(ret))
97 } else {
98 Ok(None)
99 }
100 } else {
101 Ok(None)
102 }
103 }
104}
105
106pub struct DistExtensionPlanner {
107 catalog_manager: CatalogManagerRef,
108 partition_rule_manager: PartitionRuleManagerRef,
109 region_query_handler: RegionQueryHandlerRef,
110}
111
112impl DistExtensionPlanner {
113 pub fn new(
114 catalog_manager: CatalogManagerRef,
115 partition_rule_manager: PartitionRuleManagerRef,
116 region_query_handler: RegionQueryHandlerRef,
117 ) -> Self {
118 Self {
119 catalog_manager,
120 partition_rule_manager,
121 region_query_handler,
122 }
123 }
124}
125
126#[async_trait]
127impl ExtensionPlanner for DistExtensionPlanner {
128 async fn plan_extension(
129 &self,
130 planner: &dyn PhysicalPlanner,
131 node: &dyn UserDefinedLogicalNode,
132 _logical_inputs: &[&LogicalPlan],
133 _physical_inputs: &[Arc<dyn ExecutionPlan>],
134 session_state: &SessionState,
135 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
136 let Some(merge_scan) = node.as_any().downcast_ref::<MergeScanLogicalPlan>() else {
137 return Ok(None);
138 };
139
140 let input_plan = merge_scan.input();
141 let fallback = |logical_plan| async move {
142 let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?;
143 planner
144 .create_physical_plan(&optimized_plan, session_state)
145 .await
146 .map(Some)
147 };
148
149 if merge_scan.is_placeholder() {
150 return fallback(input_plan).await;
152 }
153
154 let optimized_plan = input_plan;
155 let Some(table_name) = Self::extract_full_table_name(input_plan)? else {
156 return fallback(optimized_plan).await;
158 };
159
160 let Ok(regions) = self.get_regions(&table_name, input_plan).await else {
161 return fallback(optimized_plan).await;
163 };
164
165 let schema = optimized_plan.schema().as_arrow();
167 let query_ctx = session_state
168 .config()
169 .get_extension()
170 .unwrap_or_else(QueryContext::arc);
171 let merge_scan_plan = MergeScanExec::new(
172 session_state,
173 table_name,
174 regions,
175 input_plan.clone(),
176 schema,
177 self.region_query_handler.clone(),
178 query_ctx,
179 session_state.config().target_partitions(),
180 merge_scan.partition_cols().clone(),
181 )?;
182 Ok(Some(Arc::new(merge_scan_plan) as _))
183 }
184}
185
186impl DistExtensionPlanner {
187 fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {
189 let mut extractor = TableNameExtractor::default();
190 let _ = plan.visit(&mut extractor)?;
191 Ok(extractor.table_name)
192 }
193
194 async fn get_regions(
195 &self,
196 table_name: &TableName,
197 logical_plan: &LogicalPlan,
198 ) -> Result<Vec<RegionId>> {
199 let table = self
200 .catalog_manager
201 .table(
202 &table_name.catalog_name,
203 &table_name.schema_name,
204 &table_name.table_name,
205 None,
206 )
207 .await
208 .context(CatalogSnafu)?
209 .with_context(|| TableNotFoundSnafu {
210 table: table_name.to_string(),
211 })?;
212
213 let table_info = table.table_info();
214 let physical_table_route = self
215 .partition_rule_manager
216 .find_physical_table_route(table_info.table_id())
217 .await
218 .context(PartitionRuleManagerSnafu)?;
219 let all_regions = physical_table_route
220 .region_routes
221 .iter()
222 .map(|r| RegionId::new(table_info.table_id(), r.region.id.region_number()))
223 .collect::<Vec<_>>();
224 let partition_columns: Vec<String> =
226 table_info.meta.partition_column_names().cloned().collect();
227 if partition_columns.is_empty() {
228 return Ok(all_regions);
229 }
230 let partition_column_types = partition_columns
231 .iter()
232 .map(|col_name| {
233 let data_type = table_info
234 .meta
235 .schema
236 .column_schema_by_name(col_name)
237 .unwrap()
239 .data_type
240 .clone();
241 (col_name.clone(), data_type)
242 })
243 .collect::<HashMap<_, _>>();
244
245 let partition_expressions = match PredicateExtractor::extract_partition_expressions(
247 logical_plan,
248 &partition_columns,
249 ) {
250 Ok(expressions) => expressions,
251 Err(err) => {
252 common_telemetry::debug!(
253 "Failed to extract partition expressions for table {} (id: {}), using all regions: {:?}",
254 table_name,
255 table.table_info().table_id(),
256 err
257 );
258 return Ok(all_regions);
259 }
260 };
261
262 if partition_expressions.is_empty() {
263 return Ok(all_regions);
264 }
265
266 let partitions = match create_partitions_from_region_routes(
268 table_info.table_id(),
269 &physical_table_route.region_routes,
270 ) {
271 Ok(partitions) => partitions,
272 Err(err) => {
273 common_telemetry::debug!(
274 "Failed to get partition information for table {}, using all regions: {:?}",
275 table_name,
276 err
277 );
278 return Ok(all_regions);
279 }
280 };
281 if partitions.is_empty() {
282 return Ok(all_regions);
283 }
284
285 let pruned_regions = match ConstraintPruner::prune_regions(
287 &partition_expressions,
288 &partitions,
289 partition_column_types,
290 ) {
291 Ok(regions) => regions,
292 Err(err) => {
293 common_telemetry::debug!(
294 "Failed to prune regions for table {}, using all regions: {:?}",
295 table_name,
296 err
297 );
298 return Ok(all_regions);
299 }
300 };
301
302 common_telemetry::debug!(
303 "Region pruning for table {}: {} partition expressions applied, pruned from {} to {} regions",
304 table_name,
305 partition_expressions.len(),
306 all_regions.len(),
307 pruned_regions.len()
308 );
309
310 Ok(pruned_regions)
311 }
312
313 fn optimize_input_logical_plan(
315 &self,
316 session_state: &SessionState,
317 plan: &LogicalPlan,
318 ) -> Result<LogicalPlan> {
319 let state = session_state.clone();
320 state.optimizer().optimize(plan.clone(), &state, |_, _| {})
321 }
322}
323
324#[derive(Default)]
326struct TableNameExtractor {
327 pub table_name: Option<TableName>,
328}
329
330impl TreeNodeVisitor<'_> for TableNameExtractor {
331 type Node = LogicalPlan;
332
333 fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
334 match node {
335 LogicalPlan::TableScan(scan) => {
336 if let Some(source) = scan.source.as_any().downcast_ref::<DefaultTableSource>()
337 && let Some(provider) = source
338 .table_provider
339 .as_any()
340 .downcast_ref::<DfTableProviderAdapter>()
341 {
342 if provider.table().table_type() == TableType::Base {
343 let info = provider.table().table_info();
344 self.table_name = Some(TableName::new(
345 info.catalog_name.clone(),
346 info.schema_name.clone(),
347 info.name.clone(),
348 ));
349 }
350 return Ok(TreeNodeRecursion::Stop);
351 }
352 match &scan.table_name {
353 TableReference::Full {
354 catalog,
355 schema,
356 table,
357 } => {
358 self.table_name = Some(TableName::new(
359 catalog.to_string(),
360 schema.to_string(),
361 table.to_string(),
362 ));
363 Ok(TreeNodeRecursion::Stop)
364 }
365 TableReference::Partial { schema, table } => {
367 self.table_name = Some(TableName::new(
368 DEFAULT_CATALOG_NAME.to_string(),
369 schema.to_string(),
370 table.to_string(),
371 ));
372 Ok(TreeNodeRecursion::Stop)
373 }
374 TableReference::Bare { table } => {
375 self.table_name = Some(TableName::new(
376 DEFAULT_CATALOG_NAME.to_string(),
377 DEFAULT_SCHEMA_NAME.to_string(),
378 table.to_string(),
379 ));
380 Ok(TreeNodeRecursion::Stop)
381 }
382 }
383 }
384 _ => Ok(TreeNodeRecursion::Continue),
385 }
386 }
387}