1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::execution::{SessionState, TaskContext};
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::filter_pushdown::{
30 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
31};
32use datafusion::physical_plan::metrics::{
33 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
34};
35use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
36use datafusion::physical_plan::{
37 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
38 SendableRecordBatchStream,
39};
40use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use futures_util::StreamExt;
45use greptime_proto::v1::region::RegionRequestHeader;
46use meter_core::data::ReadItem;
47use meter_macros::read_meter;
48use session::context::QueryContextRef;
49use store_api::storage::RegionId;
50use table::table_name::TableName;
51use tokio::time::Instant;
52use tracing::{Instrument, Span};
53
54use crate::dist_plan::analyzer::AliasMapping;
55use crate::dist_plan::analyzer::utils::patch_batch_timezone;
56use crate::dist_plan::dyn_filter_bridge::{
57 CapturedDynFilter, capture_remote_dyn_filters_for_pushdown,
58 query_context_with_initial_dyn_filter_regs, register_dyn_filters_for_region,
59};
60use crate::dist_plan::{RemoteDynFilterProducerId, RemoteDynFilterRegistryLease};
61use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
62use crate::options::{FlowQueryExtensions, remote_dyn_filter_pushdown_enabled_from_extensions};
63use crate::query_engine::QueryEngineState;
64use crate::region_query::RegionQueryHandlerRef;
65
66fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
67 context.session_config().get_extension()
68}
69
70fn remote_dyn_filter_enabled(query_ctx: &QueryContextRef) -> Result<bool> {
71 remote_dyn_filter_pushdown_enabled_from_extensions(&query_ctx.extensions())
72 .map_err(|err| DataFusionError::External(Box::new(err)))
73}
74
75fn acquire_remote_dyn_filter_registry_lease(
76 context: &TaskContext,
77 query_ctx: &QueryContextRef,
78 captured_dyn_filters: &[CapturedDynFilter],
79) -> Option<RemoteDynFilterRegistryLease> {
80 if captured_dyn_filters.is_empty() {
81 return None;
82 }
83
84 let query_id = query_ctx.remote_query_id_value()?;
85 let query_engine_state = query_engine_state_from_task_context(context)?;
86 Some(
87 query_engine_state
88 .dyn_filter_registry_manager()
89 .acquire_lease(query_id),
90 )
91}
92
93fn query_context_for_remote_dyn_filter_region(
94 query_ctx: &QueryContextRef,
95 region_id: RegionId,
96 remote_dyn_filter_registry_lease: Option<&RemoteDynFilterRegistryLease>,
97 captured_dyn_filters: &[CapturedDynFilter],
98) -> session::context::QueryContext {
99 if let Some(remote_dyn_filter_registry_lease) = remote_dyn_filter_registry_lease {
100 register_dyn_filters_for_region(
101 remote_dyn_filter_registry_lease.registry(),
102 region_id,
103 captured_dyn_filters,
104 );
105 }
106
107 query_context_with_initial_dyn_filter_regs(query_ctx, region_id, captured_dyn_filters)
108}
109
110#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
111pub struct MergeScanLogicalPlan {
112 input: LogicalPlan,
114 is_placeholder: bool,
116 partition_cols: AliasMapping,
117 remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
119}
120
121impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
122 fn name(&self) -> &str {
123 Self::name()
124 }
125
126 fn inputs(&self) -> Vec<&LogicalPlan> {
129 vec![]
130 }
131
132 fn schema(&self) -> &datafusion_common::DFSchemaRef {
133 self.input.schema()
134 }
135
136 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
138 vec![]
139 }
140
141 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
142 write!(
143 f,
144 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
145 self.is_placeholder, self.input
146 )
147 }
148
149 fn with_exprs_and_inputs(
150 &self,
151 _exprs: Vec<datafusion::prelude::Expr>,
152 _inputs: Vec<LogicalPlan>,
153 ) -> Result<Self> {
154 Ok(self.clone())
155 }
156}
157
158impl MergeScanLogicalPlan {
159 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
160 Self {
161 input,
162 is_placeholder,
163 partition_cols,
164 remote_dyn_filter_producer_id: None,
165 }
166 }
167
168 pub(crate) fn with_remote_dyn_filter_producer_id(
169 mut self,
170 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
171 ) -> Self {
172 self.remote_dyn_filter_producer_id = Some(remote_dyn_filter_producer_id);
173 self
174 }
175
176 pub fn name() -> &'static str {
177 "MergeScan"
178 }
179
180 pub fn into_logical_plan(self) -> LogicalPlan {
182 LogicalPlan::Extension(Extension {
183 node: Arc::new(self),
184 })
185 }
186
187 pub fn is_placeholder(&self) -> bool {
188 self.is_placeholder
189 }
190
191 pub fn input(&self) -> &LogicalPlan {
192 &self.input
193 }
194
195 pub fn partition_cols(&self) -> &AliasMapping {
196 &self.partition_cols
197 }
198
199 pub fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
200 self.remote_dyn_filter_producer_id
201 }
202}
203
204#[derive(Clone)]
205pub struct MergeScanExec {
206 table: TableName,
207 regions: Vec<RegionId>,
208 plan: LogicalPlan,
209 arrow_schema: ArrowSchemaRef,
210 region_query_handler: RegionQueryHandlerRef,
211 metric: ExecutionPlanMetricsSet,
212 properties: Arc<PlanProperties>,
213 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
215 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
217 query_ctx: QueryContextRef,
218 remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
220 captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedDynFilter>>>,
221 target_partition: usize,
222 partition_cols: AliasMapping,
223}
224
225impl std::fmt::Debug for MergeScanExec {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.debug_struct("MergeScanExec")
228 .field("table", &self.table)
229 .field("regions", &self.regions)
230 .field("plan", &self.plan)
231 .finish()
232 }
233}
234
235impl MergeScanExec {
236 #[allow(clippy::too_many_arguments)]
237 pub fn new(
238 session_state: &SessionState,
239 table: TableName,
240 regions: Vec<RegionId>,
241 plan: LogicalPlan,
242 arrow_schema: &ArrowSchema,
243 region_query_handler: RegionQueryHandlerRef,
244 query_ctx: QueryContextRef,
245 target_partition: usize,
246 partition_cols: AliasMapping,
247 remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
248 ) -> Result<Self> {
249 let arrow_schema = Arc::new(arrow_schema.clone());
253
254 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
262 && target_partition >= regions.len()
263 {
264 let lex_ordering = sort
265 .expr
266 .iter()
267 .map(|sort_expr| {
268 let physical_expr = session_state
269 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
270 Ok(PhysicalSortExpr::new(
271 physical_expr,
272 SortOptions {
273 descending: !sort_expr.asc,
274 nulls_first: sort_expr.nulls_first,
275 },
276 ))
277 })
278 .collect::<Result<Vec<_>>>()?;
279 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
280 } else {
281 EquivalenceProperties::new(arrow_schema.clone())
282 };
283
284 let partition_exprs = partition_cols
285 .iter()
286 .filter_map(|col| {
287 if let Some(first_alias) = col.1.first() {
288 session_state
289 .create_physical_expr(
290 Expr::Column(ColumnExpr::new_unqualified(
291 first_alias.name().to_string(),
292 )),
293 plan.schema(),
294 )
295 .ok()
296 } else {
297 None
298 }
299 })
300 .collect();
301 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
302
303 let properties = Arc::new(PlanProperties::new(
304 eq_properties,
305 partitioning,
306 EmissionType::Incremental,
307 Boundedness::Bounded,
308 ));
309 Ok(Self {
310 table,
311 regions,
312 plan,
313 arrow_schema,
314 region_query_handler,
315 metric: ExecutionPlanMetricsSet::new(),
316 sub_stage_metrics: Arc::default(),
317 partition_metrics: Arc::default(),
318 properties,
319 query_ctx,
320 remote_dyn_filter_producer_id,
321 captured_remote_dyn_filters: Arc::default(),
322 target_partition,
323 partition_cols,
324 })
325 }
326
327 pub fn to_stream(
328 &self,
329 context: Arc<TaskContext>,
330 partition: usize,
331 ) -> Result<SendableRecordBatchStream> {
332 let regions = self.regions.clone();
334 let region_query_handler = self.region_query_handler.clone();
335 let metric = MergeScanMetric::new(&self.metric);
336 let arrow_schema = self.arrow_schema.clone();
337 let query_ctx = self.query_ctx.clone();
338 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
339 let partition_metrics_moved = self.partition_metrics.clone();
340 let plan = self.plan.clone();
341 let target_partition = self.target_partition;
342 let remote_dyn_filter_enabled = remote_dyn_filter_enabled(&self.query_ctx)?;
343 let captured_remote_dyn_filters = if remote_dyn_filter_enabled {
344 self.captured_remote_dyn_filters()
345 } else {
346 Vec::new()
347 };
348 let dbname = context.task_id().unwrap_or_default();
349 let tracing_context = TracingContext::from_json(context.session_id().as_str());
350 let current_channel = self.query_ctx.channel();
351 let read_preference = self.query_ctx.read_preference();
352 let explain_verbose = self.query_ctx.explain_verbose();
353 let remote_dyn_filter_registry_lease = acquire_remote_dyn_filter_registry_lease(
354 context.as_ref(),
355 &query_ctx,
356 &captured_remote_dyn_filters,
357 );
358
359 let stream = Box::pin(stream!({
360 let remote_dyn_filter_registry_lease = remote_dyn_filter_registry_lease;
361 if partition == 0 {
363 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
364 }
365
366 let _finish_timer = metric.finish_time().timer();
367 let mut ready_timer = metric.ready_time().timer();
368 let mut first_consume_timer = Some(metric.first_consume_time().timer());
369
370 let partition_start = Instant::now();
372 let mut partition_ready_time: Option<Duration> = None;
373 let mut partition_first_consume_time: Option<Duration> = None;
374
375 for region_id in regions
376 .iter()
377 .skip(partition)
378 .step_by(target_partition)
379 .copied()
380 {
381 let region_span = tracing_context.attach(tracing::info_span!(
382 parent: &Span::current(),
383 "merge_scan_region",
384 region_id = %region_id,
385 partition = partition
386 ));
387 let region_query_ctx = query_context_for_remote_dyn_filter_region(
388 &query_ctx,
389 region_id,
390 remote_dyn_filter_registry_lease.as_ref(),
391 &captured_remote_dyn_filters,
392 );
393 let request = QueryRequest {
394 header: Some(RegionRequestHeader {
395 tracing_context: tracing_context.to_w3c(),
396 dbname: dbname.clone(),
397 query_context: Some((®ion_query_ctx).into()),
398 }),
399 region_id,
400 plan: plan.clone(),
401 };
402 let region_start = Instant::now();
403 let do_get_start = Instant::now();
404
405 if explain_verbose {
406 common_telemetry::info!(
407 "Merge scan one region, partition: {}, region_id: {}",
408 partition,
409 region_id
410 );
411 }
412
413 let mut stream = region_query_handler
414 .do_get(read_preference, request)
415 .instrument(region_span.clone())
416 .await
417 .map_err(|e| {
418 MERGE_SCAN_ERRORS_TOTAL.inc();
419 DataFusionError::External(Box::new(e))
420 })?;
421 let do_get_cost = do_get_start.elapsed();
422
423 if let Some(remote_dyn_filter_registry_lease) =
424 remote_dyn_filter_registry_lease.as_ref()
425 {
426 remote_dyn_filter_registry_lease
427 .ensure_fanout_task(region_query_handler.clone());
428 }
429
430 ready_timer.stop();
431 if partition_ready_time.is_none() {
432 partition_ready_time = Some(partition_start.elapsed());
433 }
434
435 let mut poll_duration = Duration::ZERO;
436 let mut poll_timer = Instant::now();
437 while let Some(batch) = stream.next().instrument(region_span.clone()).await {
438 let poll_elapsed = poll_timer.elapsed();
439 poll_duration += poll_elapsed;
440
441 let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
442 let batch = patch_batch_timezone(
443 arrow_schema.clone(),
444 batch.into_df_record_batch().columns().to_vec(),
445 )?;
446 metric.record_output_batch_rows(batch.num_rows());
447 if let Some(mut first_consume_timer) = first_consume_timer.take() {
448 first_consume_timer.stop();
449 partition_first_consume_time = Some(partition_start.elapsed());
450 }
451
452 if let Some(metrics) = stream.metrics() {
453 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
454 sub_stage_metrics.insert(region_id, metrics);
455 }
456
457 yield Ok(batch);
458 poll_timer = Instant::now();
460 }
461 if let Some(mut first_consume_timer) = first_consume_timer.take() {
464 first_consume_timer.stop();
465 partition_first_consume_time = Some(partition_start.elapsed());
466 }
467 let total_cost = region_start.elapsed();
468
469 let region_metrics = RegionMetrics {
471 region_id,
472 poll_duration,
473 do_get_cost,
474 total_cost,
475 };
476
477 {
479 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
480 let partition_metrics = partition_metrics_guard
481 .entry(partition)
482 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
483 partition_metrics.add_region_metrics(region_metrics);
484 }
485
486 if explain_verbose {
487 common_telemetry::info!(
488 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
489 partition,
490 region_id,
491 poll_duration,
492 metric.first_consume_time(),
493 do_get_cost
494 );
495 }
496
497 if let Some(metrics) = stream.metrics() {
499 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
500 let value = read_meter!(
501 c,
502 s,
503 ReadItem {
504 cpu_time: metrics.elapsed_compute as u64,
505 table_scan: metrics.memory_usage as u64
506 },
507 current_channel as u8
508 );
509 metric.record_greptime_exec_cost(value as usize);
510
511 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
513 sub_stage_metrics.insert(region_id, metrics);
514 }
515
516 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
517 }
518
519 ready_timer.stop();
522 if let Some(mut first_consume_timer) = first_consume_timer.take() {
523 first_consume_timer.stop();
524 }
525
526 let partition_finish_time = partition_start.elapsed();
528 {
529 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
530 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
531 partition_metrics.set_timings(
532 partition_ready_time.unwrap_or_default(),
533 partition_first_consume_time.unwrap_or_default(),
534 partition_finish_time,
535 );
536 partition_metrics.finish();
537 }
538 }
539 }));
540
541 Ok(Box::pin(RecordBatchStreamAdapter::new(
542 self.arrow_schema.clone(),
543 stream,
544 )))
545 }
546
547 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
548 let Distribution::HashPartitioned(hash_exprs) = distribution else {
549 return None;
551 };
552
553 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
554 && curr_dist == &hash_exprs
555 {
556 return None;
558 }
559
560 let all_partition_col_aliases: HashSet<_> = self
561 .partition_cols
562 .values()
563 .flat_map(|aliases| aliases.iter().map(|c| c.name()))
564 .collect();
565 let overlaps: Vec<_> = hash_exprs
566 .iter()
567 .filter(|expr| {
568 expr.as_any()
569 .downcast_ref::<Column>()
570 .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
571 })
572 .cloned()
573 .collect();
574
575 if overlaps.is_empty() {
576 return None;
577 }
578
579 Some(Self {
580 table: self.table.clone(),
581 regions: self.regions.clone(),
582 plan: self.plan.clone(),
583 arrow_schema: self.arrow_schema.clone(),
584 region_query_handler: self.region_query_handler.clone(),
585 metric: self.metric.clone(),
586 properties: Arc::new(PlanProperties::new(
587 self.properties.eq_properties.clone(),
588 Partitioning::Hash(overlaps, self.target_partition),
589 self.properties.emission_type,
590 self.properties.boundedness,
591 )),
592 sub_stage_metrics: self.sub_stage_metrics.clone(),
593 partition_metrics: self.partition_metrics.clone(),
594 query_ctx: self.query_ctx.clone(),
595 remote_dyn_filter_producer_id: self.remote_dyn_filter_producer_id,
596 captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
597 target_partition: self.target_partition,
598 partition_cols: self.partition_cols.clone(),
599 })
600 }
601
602 fn captured_remote_dyn_filters(&self) -> Vec<CapturedDynFilter> {
603 self.captured_remote_dyn_filters.lock().unwrap().clone()
604 }
605
606 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
607 self.sub_stage_metrics
608 .lock()
609 .unwrap()
610 .values()
611 .cloned()
612 .collect()
613 }
614
615 pub fn regions(&self) -> &[RegionId] {
616 &self.regions
617 }
618
619 pub fn is_flow_sink_scan(&self) -> bool {
620 let Some(sink_table_id) =
621 FlowQueryExtensions::parse_flow_extensions(&self.query_ctx.extensions())
622 .ok()
623 .flatten()
624 .and_then(|extensions| extensions.sink_table_id)
625 else {
626 return false;
627 };
628
629 !self.regions.is_empty()
630 && self
631 .regions
632 .iter()
633 .all(|region_id| region_id.table_id() == sink_table_id)
634 }
635
636 pub fn partition_count(&self) -> usize {
637 self.target_partition
638 }
639
640 pub fn region_count(&self) -> usize {
641 self.regions.len()
642 }
643
644 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
645 self.partition_metrics
646 .lock()
647 .unwrap()
648 .values()
649 .cloned()
650 .collect()
651 }
652}
653
654#[cfg(test)]
655impl MergeScanExec {
656 fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
657 self.remote_dyn_filter_producer_id
658 }
659}
660
661#[derive(Debug, Clone)]
663struct RegionMetrics {
664 region_id: RegionId,
665 poll_duration: Duration,
666 do_get_cost: Duration,
667 total_cost: Duration,
669}
670
671#[derive(Debug, Clone)]
673struct PartitionMetrics {
674 partition: usize,
675 region_metrics: Vec<RegionMetrics>,
676 total_poll_duration: Duration,
677 total_do_get_cost: Duration,
678 total_regions: usize,
679 ready_time: Duration,
681 first_consume_time: Duration,
683 finish_time: Duration,
685 explain_verbose: bool,
686 finished: bool,
687}
688
689impl PartitionMetrics {
690 fn new(partition: usize, explain_verbose: bool) -> Self {
691 Self {
692 partition,
693 region_metrics: Vec::new(),
694 total_poll_duration: Duration::ZERO,
695 total_do_get_cost: Duration::ZERO,
696 total_regions: 0,
697 ready_time: Duration::ZERO,
698 first_consume_time: Duration::ZERO,
699 finish_time: Duration::ZERO,
700 explain_verbose,
701 finished: false,
702 }
703 }
704
705 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
706 self.total_poll_duration += region_metrics.poll_duration;
707 self.total_do_get_cost += region_metrics.do_get_cost;
708 self.total_regions += 1;
709 self.region_metrics.push(region_metrics);
710 }
711
712 fn set_timings(
714 &mut self,
715 ready_time: Duration,
716 first_consume_time: Duration,
717 finish_time: Duration,
718 ) {
719 self.ready_time = ready_time;
720 self.first_consume_time = first_consume_time;
721 self.finish_time = finish_time;
722 }
723
724 fn finish(&mut self) {
726 if self.finished {
727 return;
728 }
729 self.finished = true;
730 self.log_metrics();
731 }
732
733 fn log_metrics(&self) {
735 if self.explain_verbose {
736 common_telemetry::info!(
737 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
738 self.partition,
739 self.total_regions,
740 self.total_poll_duration,
741 self.total_do_get_cost,
742 self.ready_time,
743 self.first_consume_time,
744 self.finish_time
745 );
746 } else {
747 common_telemetry::debug!(
748 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
749 self.partition,
750 self.total_regions,
751 self.total_poll_duration,
752 self.total_do_get_cost,
753 self.ready_time,
754 self.first_consume_time,
755 self.finish_time
756 );
757 }
758 }
759}
760
761impl Drop for PartitionMetrics {
762 fn drop(&mut self) {
763 if !self.finished {
764 self.log_metrics();
765 }
766 }
767}
768
769impl ExecutionPlan for MergeScanExec {
770 fn as_any(&self) -> &dyn Any {
771 self
772 }
773
774 fn schema(&self) -> ArrowSchemaRef {
775 self.arrow_schema.clone()
776 }
777
778 fn properties(&self) -> &Arc<PlanProperties> {
779 &self.properties
780 }
781
782 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
783 vec![]
784 }
785
786 fn with_new_children(
789 self: Arc<Self>,
790 _children: Vec<Arc<dyn ExecutionPlan>>,
791 ) -> Result<Arc<dyn ExecutionPlan>> {
792 Ok(self.clone())
793 }
794
795 fn handle_child_pushdown_result(
796 &self,
797 _phase: FilterPushdownPhase,
798 child_pushdown_result: ChildPushdownResult,
799 _config: &datafusion::config::ConfigOptions,
800 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
801 let parent_filters = child_pushdown_result
802 .parent_filters
803 .into_iter()
804 .map(|filter| filter.filter)
805 .collect::<Vec<_>>();
806
807 if !remote_dyn_filter_enabled(&self.query_ctx)? {
808 self.captured_remote_dyn_filters.lock().unwrap().clear();
812 let new_self = Arc::new(self.clone());
813
814 return Ok(FilterPushdownPropagation {
815 filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
816 updated_node: Some(new_self),
817 });
818 }
819
820 let Some(remote_dyn_filter_producer_id) = self.remote_dyn_filter_producer_id else {
821 common_telemetry::warn!(
823 "MergeScan remote dynamic filter producer id is not assigned; skipping remote dynamic filter pushdown"
824 );
825 self.captured_remote_dyn_filters.lock().unwrap().clear();
826 let new_self = Arc::new(self.clone());
827
828 return Ok(FilterPushdownPropagation {
829 filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
830 updated_node: Some(new_self),
831 });
832 };
833 let remote_dyn_filter_pushdown =
834 capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
835 *self.captured_remote_dyn_filters.lock().unwrap() =
836 remote_dyn_filter_pushdown.captured_dyn_filters;
837 let new_self = Arc::new(self.clone());
838
839 Ok(FilterPushdownPropagation {
840 filters: remote_dyn_filter_pushdown
841 .pushed_down
842 .into_iter()
843 .map(|pushdown_ready| {
844 if pushdown_ready {
845 PushedDown::Yes
846 } else {
847 PushedDown::No
848 }
849 })
850 .collect(),
851 updated_node: Some(new_self),
852 })
853 }
854
855 fn execute(
856 &self,
857 partition: usize,
858 context: Arc<TaskContext>,
859 ) -> Result<SendableRecordBatchStream> {
860 self.to_stream(context, partition)
861 }
862
863 fn metrics(&self) -> Option<MetricsSet> {
864 Some(self.metric.clone_inner())
865 }
866
867 fn name(&self) -> &str {
868 "MergeScanExec"
869 }
870}
871
872impl DisplayAs for MergeScanExec {
873 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
874 write!(f, "MergeScanExec: peers=[")?;
875 for region_id in self.regions.iter() {
876 write!(f, "{}, ", region_id)?;
877 }
878 write!(f, "]")?;
879
880 if matches!(t, DisplayFormatType::Verbose) {
881 let partition_metrics = self.partition_metrics();
882 if !partition_metrics.is_empty() {
883 write!(f, ", metrics={{")?;
884 for (i, pm) in partition_metrics.iter().enumerate() {
885 if i > 0 {
886 write!(f, ", ")?;
887 }
888 write!(
889 f,
890 "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"ready_time\":\"{:?}\",\"first_consume_time\":\"{:?}\",\"finish_time\":\"{:?}\",\"region_metrics\":[",
891 pm.partition,
892 pm.total_regions,
893 pm.total_poll_duration,
894 pm.total_do_get_cost,
895 pm.ready_time,
896 pm.first_consume_time,
897 pm.finish_time
898 )?;
899 for (j, rm) in pm.region_metrics.iter().enumerate() {
900 if j > 0 {
901 write!(f, ",")?;
902 }
903 write!(
904 f,
905 "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
906 rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
907 )?;
908 }
909 write!(f, "]}}")?;
910 }
911 write!(f, "}}")?;
912 }
913 }
914
915 Ok(())
916 }
917}
918
919#[derive(Debug, Clone)]
920struct MergeScanMetric {
921 ready_time: Time,
923 first_consume_time: Time,
925 finish_time: Time,
927 output_rows: Count,
929
930 greptime_exec_cost: Gauge,
932}
933
934impl MergeScanMetric {
935 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
936 Self {
937 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
938 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
939 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
940 output_rows: MetricBuilder::new(metric).output_rows(1),
941 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
942 }
943 }
944
945 pub fn ready_time(&self) -> &Time {
946 &self.ready_time
947 }
948
949 pub fn first_consume_time(&self) -> &Time {
950 &self.first_consume_time
951 }
952
953 pub fn finish_time(&self) -> &Time {
954 &self.finish_time
955 }
956
957 pub fn record_output_batch_rows(&self, num_rows: usize) {
958 self.output_rows.add(num_rows);
959 }
960
961 pub fn record_greptime_exec_cost(&self, metrics: usize) {
962 self.greptime_exec_cost.add(metrics);
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use std::collections::BTreeSet;
969
970 use async_trait::async_trait;
971 use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
972 use datafusion::config::ConfigOptions;
973 use datafusion::execution::SessionStateBuilder;
974 use datafusion::physical_plan::filter_pushdown::ChildFilterPushdownResult;
975 use datafusion_common::TableReference;
976 use datafusion_expr::{LogicalPlanBuilder, lit};
977 use datafusion_physical_expr::Distribution;
978 use datafusion_physical_expr::expressions::{
979 Column, DynamicFilterPhysicalExpr, lit as physical_lit,
980 };
981 use session::ReadPreference;
982 use session::context::QueryContext;
983 use session::query_id::QueryId;
984 use table::table_name::TableName;
985 use uuid::Uuid;
986
987 use super::*;
988 use crate::dist_plan::{DynFilterRegistryManager, Subscriber};
989 use crate::region_query::RegionQueryHandler;
990
991 fn test_query_id(value: u128) -> QueryId {
992 QueryId::from(Uuid::from_u128(value))
993 }
994
995 #[test]
996 fn remote_dyn_filter_region_query_context_registers_before_do_get() {
997 let registry_manager = Arc::new(DynFilterRegistryManager::default());
998 let query_ctx = QueryContext::arc();
999 let query_id = query_ctx
1000 .remote_query_id_value()
1001 .expect("query context must have remote query id");
1002 let lease = registry_manager.acquire_lease(query_id);
1003 let region_id = RegionId::new(1024, 7);
1004 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1005 vec![Arc::new(Column::new("host", 0)) as Arc<_>],
1006 physical_lit(true) as _,
1007 )) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
1008 let captured = capture_remote_dyn_filters_for_pushdown(
1009 RemoteDynFilterProducerId::new(42),
1010 vec![dyn_filter],
1011 );
1012 assert_eq!(captured.captured_dyn_filters.len(), 1);
1013
1014 let region_query_ctx = query_context_for_remote_dyn_filter_region(
1015 &query_ctx,
1016 region_id,
1017 Some(&lease),
1018 &captured.captured_dyn_filters,
1019 );
1020
1021 let entries = lease.registry().entries();
1022 assert_eq!(entries.len(), 1);
1023 assert_eq!(entries[0].subscribers(), vec![Subscriber::new(region_id)]);
1024 assert!(
1025 !entries[0].fanout_started_for_test(),
1026 "fanout must start only after do_get succeeds"
1027 );
1028 assert!(
1029 region_query_ctx
1030 .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
1031 .is_some(),
1032 "initial RDF registrations must be present in the do_get query context"
1033 );
1034 }
1035
1036 #[test]
1037 fn remote_dyn_filter_registry_cleanup_waits_for_last_query_scoped_stream_drop() {
1038 let registry_manager = Arc::new(DynFilterRegistryManager::default());
1039 let query_id = test_query_id(1);
1040
1041 let first = registry_manager.acquire_lease(query_id);
1042 let second = registry_manager.acquire_lease(query_id);
1043
1044 drop(first);
1045 assert_eq!(registry_manager.registry_count(), 1);
1046
1047 drop(second);
1048 assert_eq!(registry_manager.registry_count(), 0);
1049 }
1050
1051 #[test]
1052 fn remote_dyn_filter_registry_cleanup_shares_query_scope_across_independent_leases() {
1053 let registry_manager = Arc::new(DynFilterRegistryManager::default());
1054 let query_id = test_query_id(1);
1055
1056 let first_exec_like_lease = registry_manager.acquire_lease(query_id);
1057 let second_exec_like_lease = registry_manager.acquire_lease(query_id);
1058
1059 drop(first_exec_like_lease);
1060 assert_eq!(registry_manager.registry_count(), 1);
1061
1062 drop(second_exec_like_lease);
1063 assert_eq!(registry_manager.registry_count(), 0);
1064 }
1065
1066 struct TestRegionQueryHandler;
1067
1068 #[async_trait]
1069 impl RegionQueryHandler for TestRegionQueryHandler {
1070 async fn do_get(
1071 &self,
1072 _read_preference: ReadPreference,
1073 _request: common_query::request::QueryRequest,
1074 ) -> crate::error::Result<common_recordbatch::SendableRecordBatchStream> {
1075 unimplemented!("test only")
1076 }
1077
1078 async fn handle_remote_dyn_filter_update(
1079 &self,
1080 _region_id: RegionId,
1081 _query_id: String,
1082 _update: api::v1::region::RemoteDynFilterUpdate,
1083 ) -> crate::error::Result<()> {
1084 unimplemented!("test only")
1085 }
1086
1087 async fn handle_remote_dyn_filter_unregister(
1088 &self,
1089 _region_id: RegionId,
1090 _query_id: String,
1091 _unregister: api::v1::region::RemoteDynFilterUnregister,
1092 ) -> crate::error::Result<()> {
1093 unimplemented!("test only")
1094 }
1095 }
1096
1097 #[test]
1098 fn try_with_new_distribution_preserves_remote_dyn_filter_producer_id() {
1099 let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
1100
1101 let plan = LogicalPlanBuilder::empty(true)
1103 .project(vec![lit(1i32).alias("col1")])
1104 .unwrap()
1105 .build()
1106 .unwrap();
1107
1108 let schema = plan.schema().as_arrow().clone();
1109 let table = TableName::new("catalog", "schema", "table");
1110 let regions = vec![RegionId::new(1024, 1)];
1111 let query_ctx = QueryContext::arc();
1112
1113 let mut partition_cols = AliasMapping::new();
1115 partition_cols.insert(
1116 "col1".to_string(),
1117 BTreeSet::from([ColumnExpr::new(Some(TableReference::bare("table")), "col1")]),
1118 );
1119
1120 let session_state = SessionStateBuilder::new().build();
1121
1122 let handler = Arc::new(TestRegionQueryHandler);
1123 let target_partition = 2;
1124
1125 let exec = MergeScanExec::new(
1126 &session_state,
1127 table,
1128 regions,
1129 plan,
1130 &schema,
1131 handler,
1132 query_ctx,
1133 target_partition,
1134 partition_cols,
1135 Some(remote_dyn_filter_producer_id),
1136 )
1137 .unwrap();
1138
1139 assert_eq!(
1140 exec.remote_dyn_filter_producer_id(),
1141 Some(remote_dyn_filter_producer_id)
1142 );
1143
1144 let new_dist = Distribution::HashPartitioned(vec![
1148 Arc::new(Column::new("col1", 0)),
1149 Arc::new(Column::new("col2", 1)),
1150 ]);
1151
1152 let cloned = exec
1153 .try_with_new_distribution(new_dist)
1154 .expect("expected a cloned exec with overlapping partition col");
1155
1156 assert_eq!(
1157 cloned.remote_dyn_filter_producer_id(),
1158 Some(remote_dyn_filter_producer_id),
1159 "try_with_new_distribution must preserve remote dynamic filter producer id"
1160 );
1161 }
1162
1163 #[test]
1164 fn remote_dyn_filter_preflight_removes_parent_filter_after_dn_runtime_is_ready() {
1165 let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
1166 let plan = LogicalPlanBuilder::empty(true)
1167 .project(vec![lit(1i32).alias("col1")])
1168 .unwrap()
1169 .build()
1170 .unwrap();
1171
1172 let schema = plan.schema().as_arrow().clone();
1173 let table = TableName::new("catalog", "schema", "table");
1174 let regions = vec![RegionId::new(1024, 1)];
1175 let query_ctx = QueryContext::arc();
1176 let session_state = SessionStateBuilder::new().build();
1177 let handler = Arc::new(TestRegionQueryHandler);
1178 let exec = MergeScanExec::new(
1179 &session_state,
1180 table,
1181 regions,
1182 plan,
1183 &schema,
1184 handler,
1185 query_ctx,
1186 1,
1187 AliasMapping::new(),
1188 Some(remote_dyn_filter_producer_id),
1189 )
1190 .unwrap();
1191 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1192 vec![Arc::new(Column::new("host", 0)) as Arc<_>],
1193 physical_lit(true) as _,
1194 )) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
1195
1196 let propagation = exec
1197 .handle_child_pushdown_result(
1198 FilterPushdownPhase::Post,
1199 ChildPushdownResult {
1200 parent_filters: vec![ChildFilterPushdownResult {
1201 filter: dyn_filter,
1202 child_results: vec![PushedDown::Yes],
1203 }],
1204 self_filters: Vec::new(),
1205 },
1206 &ConfigOptions::new(),
1207 )
1208 .unwrap();
1209
1210 assert_eq!(exec.captured_remote_dyn_filters().len(), 1);
1211 assert!(matches!(propagation.filters.as_slice(), [PushedDown::Yes]));
1212 }
1213}