1use std::sync::Arc;
18
19use ahash::HashMap;
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
23use common_telemetry::debug;
24use datafusion::common::Result;
25use datafusion::datasource::DefaultTableSource;
26use datafusion::execution::context::SessionState;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
30use datafusion_common::{DataFusionError, TableReference};
31use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
32use partition::manager::{PartitionRuleManagerRef, create_partitions_from_region_routes};
33use session::context::QueryContext;
34use snafu::{OptionExt, ResultExt};
35use store_api::storage::RegionId;
36pub use table::metadata::TableType;
37use table::table::adapter::DfTableProviderAdapter;
38use table::table_name::TableName;
39
40use crate::dist_plan::PredicateExtractor;
41use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
42use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
43use crate::dist_plan::region_pruner::ConstraintPruner;
44use crate::error::{CatalogSnafu, PartitionRuleManagerSnafu, TableNotFoundSnafu};
45use crate::region_query::RegionQueryHandlerRef;
46
47pub struct MergeSortExtensionPlanner {}
57
58#[async_trait]
59impl ExtensionPlanner for MergeSortExtensionPlanner {
60 async fn plan_extension(
61 &self,
62 planner: &dyn PhysicalPlanner,
63 node: &dyn UserDefinedLogicalNode,
64 _logical_inputs: &[&LogicalPlan],
65 physical_inputs: &[Arc<dyn ExecutionPlan>],
66 session_state: &SessionState,
67 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
68 if let Some(merge_sort) = node.as_any().downcast_ref::<MergeSortLogicalPlan>() {
69 if let LogicalPlan::Extension(ext) = &merge_sort.input.as_ref()
70 && ext
71 .node
72 .as_any()
73 .downcast_ref::<MergeScanLogicalPlan>()
74 .is_some()
75 {
76 let merge_scan_exec = physical_inputs
77 .first()
78 .and_then(|p| p.as_any().downcast_ref::<MergeScanExec>())
79 .ok_or(DataFusionError::Internal(format!(
80 "Expect MergeSort's input is a MergeScanExec, found {:?}",
81 physical_inputs
82 )))?;
83
84 let partition_cnt = merge_scan_exec.partition_count();
85 let region_cnt = merge_scan_exec.region_count();
86 let can_merge_sort = partition_cnt >= region_cnt;
89 if can_merge_sort {
90 }
92 let ret = planner
95 .create_physical_plan(&merge_sort.clone().into_sort(), session_state)
96 .await?;
97 Ok(Some(ret))
98 } else {
99 Ok(None)
100 }
101 } else {
102 Ok(None)
103 }
104 }
105}
106
107pub struct DistExtensionPlanner {
108 catalog_manager: CatalogManagerRef,
109 partition_rule_manager: PartitionRuleManagerRef,
110 region_query_handler: RegionQueryHandlerRef,
111}
112
113impl DistExtensionPlanner {
114 pub fn new(
115 catalog_manager: CatalogManagerRef,
116 partition_rule_manager: PartitionRuleManagerRef,
117 region_query_handler: RegionQueryHandlerRef,
118 ) -> Self {
119 Self {
120 catalog_manager,
121 partition_rule_manager,
122 region_query_handler,
123 }
124 }
125}
126
127#[async_trait]
128impl ExtensionPlanner for DistExtensionPlanner {
129 async fn plan_extension(
130 &self,
131 planner: &dyn PhysicalPlanner,
132 node: &dyn UserDefinedLogicalNode,
133 _logical_inputs: &[&LogicalPlan],
134 _physical_inputs: &[Arc<dyn ExecutionPlan>],
135 session_state: &SessionState,
136 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
137 let Some(merge_scan) = node.as_any().downcast_ref::<MergeScanLogicalPlan>() else {
138 return Ok(None);
139 };
140
141 let input_plan = merge_scan.input();
142 let fallback = |logical_plan| async move {
143 let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?;
144 planner
145 .create_physical_plan(&optimized_plan, session_state)
146 .await
147 .map(Some)
148 };
149
150 if merge_scan.is_placeholder() {
151 return fallback(input_plan).await;
153 }
154
155 let optimized_plan = input_plan;
156 let Some(table_name) = Self::extract_full_table_name(input_plan)? else {
157 return fallback(optimized_plan).await;
159 };
160
161 let Ok(regions) = self.get_regions(&table_name, input_plan).await else {
162 return fallback(optimized_plan).await;
164 };
165
166 let schema = optimized_plan.schema().as_arrow();
168 let query_ctx = session_state
169 .config()
170 .get_extension()
171 .unwrap_or_else(QueryContext::arc);
172 let merge_scan_plan = MergeScanExec::new(
173 session_state,
174 table_name,
175 regions,
176 input_plan.clone(),
177 schema,
178 self.region_query_handler.clone(),
179 query_ctx,
180 session_state.config().target_partitions(),
181 merge_scan.partition_cols().clone(),
182 merge_scan.remote_dyn_filter_producer_id(),
183 )?;
184 Ok(Some(Arc::new(merge_scan_plan) as _))
185 }
186}
187
188impl DistExtensionPlanner {
189 fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {
191 let mut extractor = TableNameExtractor::default();
192 let _ = plan.visit(&mut extractor)?;
193 Ok(extractor.table_name)
194 }
195
196 async fn get_regions(
197 &self,
198 table_name: &TableName,
199 logical_plan: &LogicalPlan,
200 ) -> Result<Vec<RegionId>> {
201 let table = self
202 .catalog_manager
203 .table(
204 &table_name.catalog_name,
205 &table_name.schema_name,
206 &table_name.table_name,
207 None,
208 )
209 .await
210 .context(CatalogSnafu)?
211 .with_context(|| TableNotFoundSnafu {
212 table: table_name.to_string(),
213 })?;
214
215 let table_info = table.table_info();
216 let physical_table_route = self
217 .partition_rule_manager
218 .find_physical_table_route(table_info.table_id())
219 .await
220 .context(PartitionRuleManagerSnafu)?;
221 let all_regions = physical_table_route
222 .region_routes
223 .iter()
224 .map(|r| RegionId::new(table_info.table_id(), r.region.id.region_number()))
225 .collect::<Vec<_>>();
226 let partition_columns: Vec<String> =
228 table_info.meta.partition_column_names().cloned().collect();
229 debug!(
230 "DistExtensionPlanner: loaded table partition metadata, table: {}, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}, all_regions: {:?}",
231 table_name,
232 table_info.table_id(),
233 table_info.meta.partition_key_indices,
234 partition_columns,
235 all_regions,
236 );
237 if partition_columns.is_empty() {
238 return Ok(all_regions);
239 }
240 let partition_column_types = partition_columns
241 .iter()
242 .map(|col_name| {
243 let data_type = table_info
244 .meta
245 .schema
246 .column_schema_by_name(col_name)
247 .unwrap()
249 .data_type
250 .clone();
251 (col_name.clone(), data_type)
252 })
253 .collect::<HashMap<_, _>>();
254
255 let partition_expressions = match PredicateExtractor::extract_partition_expressions(
257 logical_plan,
258 &partition_columns,
259 ) {
260 Ok(expressions) => expressions,
261 Err(err) => {
262 common_telemetry::debug!(
263 "Failed to extract partition expressions for table {} (id: {}), using all regions: {:?}",
264 table_name,
265 table.table_info().table_id(),
266 err
267 );
268 return Ok(all_regions);
269 }
270 };
271
272 if partition_expressions.is_empty() {
273 return Ok(all_regions);
274 }
275
276 let partitions = match create_partitions_from_region_routes(
278 table_info.table_id(),
279 &physical_table_route.region_routes,
280 ) {
281 Ok(partitions) => partitions,
282 Err(err) => {
283 common_telemetry::debug!(
284 "Failed to get partition information for table {}, using all regions: {:?}",
285 table_name,
286 err
287 );
288 return Ok(all_regions);
289 }
290 };
291 if partitions.is_empty() {
292 return Ok(all_regions);
293 }
294
295 let pruned_regions = match ConstraintPruner::prune_regions(
297 &partition_expressions,
298 &partitions,
299 partition_column_types,
300 ) {
301 Ok(regions) => regions,
302 Err(err) => {
303 common_telemetry::debug!(
304 "Failed to prune regions for table {}, using all regions: {:?}",
305 table_name,
306 err
307 );
308 return Ok(all_regions);
309 }
310 };
311
312 common_telemetry::debug!(
313 "Region pruning for table {}: {} partition expressions applied, pruned from {} to {} regions",
314 table_name,
315 partition_expressions.len(),
316 all_regions.len(),
317 pruned_regions.len()
318 );
319
320 Ok(pruned_regions)
321 }
322
323 fn optimize_input_logical_plan(
325 &self,
326 session_state: &SessionState,
327 plan: &LogicalPlan,
328 ) -> Result<LogicalPlan> {
329 let state = session_state.clone();
330 state.optimizer().optimize(plan.clone(), &state, |_, _| {})
331 }
332}
333
334#[derive(Default)]
336struct TableNameExtractor {
337 pub table_name: Option<TableName>,
338}
339
340impl TreeNodeVisitor<'_> for TableNameExtractor {
341 type Node = LogicalPlan;
342
343 fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
344 match node {
345 LogicalPlan::TableScan(scan) => {
346 if let Some(source) = scan.source.as_any().downcast_ref::<DefaultTableSource>()
347 && let Some(provider) = source
348 .table_provider
349 .as_any()
350 .downcast_ref::<DfTableProviderAdapter>()
351 {
352 if provider.table().table_type() == TableType::Base {
353 let info = provider.table().table_info();
354 self.table_name = Some(TableName::new(
355 info.catalog_name.clone(),
356 info.schema_name.clone(),
357 info.name.clone(),
358 ));
359 }
360 return Ok(TreeNodeRecursion::Stop);
361 }
362 match &scan.table_name {
363 TableReference::Full {
364 catalog,
365 schema,
366 table,
367 } => {
368 self.table_name = Some(TableName::new(
369 catalog.to_string(),
370 schema.to_string(),
371 table.to_string(),
372 ));
373 Ok(TreeNodeRecursion::Stop)
374 }
375 TableReference::Partial { schema, table } => {
377 self.table_name = Some(TableName::new(
378 DEFAULT_CATALOG_NAME.to_string(),
379 schema.to_string(),
380 table.to_string(),
381 ));
382 Ok(TreeNodeRecursion::Stop)
383 }
384 TableReference::Bare { table } => {
385 self.table_name = Some(TableName::new(
386 DEFAULT_CATALOG_NAME.to_string(),
387 DEFAULT_SCHEMA_NAME.to_string(),
388 table.to_string(),
389 ));
390 Ok(TreeNodeRecursion::Stop)
391 }
392 }
393 }
394 _ => Ok(TreeNodeRecursion::Continue),
395 }
396 }
397}