Skip to main content

query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::execution::{SessionState, TaskContext};
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::filter_pushdown::{
30    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
31};
32use datafusion::physical_plan::metrics::{
33    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
34};
35use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
36use datafusion::physical_plan::{
37    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
38    SendableRecordBatchStream,
39};
40use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use futures_util::StreamExt;
45use greptime_proto::v1::region::RegionRequestHeader;
46use meter_core::data::ReadItem;
47use meter_macros::read_meter;
48use session::context::QueryContextRef;
49use store_api::storage::RegionId;
50use table::table_name::TableName;
51use tokio::time::Instant;
52use tracing::{Instrument, Span};
53
54use crate::dist_plan::analyzer::AliasMapping;
55use crate::dist_plan::analyzer::utils::patch_batch_timezone;
56use crate::dist_plan::dyn_filter_bridge::{
57    CapturedDynFilter, capture_remote_dyn_filters_for_pushdown,
58    query_context_with_initial_dyn_filter_regs, register_dyn_filters_for_region,
59};
60use crate::dist_plan::{RemoteDynFilterProducerId, RemoteDynFilterRegistryLease};
61use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
62use crate::options::{FlowQueryExtensions, remote_dyn_filter_pushdown_enabled_from_extensions};
63use crate::query_engine::QueryEngineState;
64use crate::region_query::RegionQueryHandlerRef;
65
66fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
67    context.session_config().get_extension()
68}
69
70fn remote_dyn_filter_enabled(query_ctx: &QueryContextRef) -> Result<bool> {
71    remote_dyn_filter_pushdown_enabled_from_extensions(&query_ctx.extensions())
72        .map_err(|err| DataFusionError::External(Box::new(err)))
73}
74
75fn acquire_remote_dyn_filter_registry_lease(
76    context: &TaskContext,
77    query_ctx: &QueryContextRef,
78    captured_dyn_filters: &[CapturedDynFilter],
79) -> Option<RemoteDynFilterRegistryLease> {
80    if captured_dyn_filters.is_empty() {
81        return None;
82    }
83
84    let query_id = query_ctx.remote_query_id_value()?;
85    let query_engine_state = query_engine_state_from_task_context(context)?;
86    Some(
87        query_engine_state
88            .dyn_filter_registry_manager()
89            .acquire_lease(query_id),
90    )
91}
92
93fn query_context_for_remote_dyn_filter_region(
94    query_ctx: &QueryContextRef,
95    region_id: RegionId,
96    remote_dyn_filter_registry_lease: Option<&RemoteDynFilterRegistryLease>,
97    captured_dyn_filters: &[CapturedDynFilter],
98) -> session::context::QueryContext {
99    if let Some(remote_dyn_filter_registry_lease) = remote_dyn_filter_registry_lease {
100        register_dyn_filters_for_region(
101            remote_dyn_filter_registry_lease.registry(),
102            region_id,
103            captured_dyn_filters,
104        );
105    }
106
107    query_context_with_initial_dyn_filter_regs(query_ctx, region_id, captured_dyn_filters)
108}
109
110#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
111pub struct MergeScanLogicalPlan {
112    /// In logical plan phase it only contains one input
113    input: LogicalPlan,
114    /// If this plan is a placeholder
115    is_placeholder: bool,
116    partition_cols: AliasMapping,
117    /// Assigned after dist-plan rewriting so rewriters only deal with plan shape.
118    remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
119}
120
121impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
122    fn name(&self) -> &str {
123        Self::name()
124    }
125
126    // Prevent further optimization.
127    // The input can be retrieved by `self.input()`
128    fn inputs(&self) -> Vec<&LogicalPlan> {
129        vec![]
130    }
131
132    fn schema(&self) -> &datafusion_common::DFSchemaRef {
133        self.input.schema()
134    }
135
136    // Prevent further optimization
137    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
138        vec![]
139    }
140
141    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
142        write!(
143            f,
144            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
145            self.is_placeholder, self.input
146        )
147    }
148
149    fn with_exprs_and_inputs(
150        &self,
151        _exprs: Vec<datafusion::prelude::Expr>,
152        _inputs: Vec<LogicalPlan>,
153    ) -> Result<Self> {
154        Ok(self.clone())
155    }
156}
157
158impl MergeScanLogicalPlan {
159    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
160        Self {
161            input,
162            is_placeholder,
163            partition_cols,
164            remote_dyn_filter_producer_id: None,
165        }
166    }
167
168    pub(crate) fn with_remote_dyn_filter_producer_id(
169        mut self,
170        remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
171    ) -> Self {
172        self.remote_dyn_filter_producer_id = Some(remote_dyn_filter_producer_id);
173        self
174    }
175
176    pub fn name() -> &'static str {
177        "MergeScan"
178    }
179
180    /// Create a [LogicalPlan::Extension] node from this merge scan plan
181    pub fn into_logical_plan(self) -> LogicalPlan {
182        LogicalPlan::Extension(Extension {
183            node: Arc::new(self),
184        })
185    }
186
187    pub fn is_placeholder(&self) -> bool {
188        self.is_placeholder
189    }
190
191    pub fn input(&self) -> &LogicalPlan {
192        &self.input
193    }
194
195    pub fn partition_cols(&self) -> &AliasMapping {
196        &self.partition_cols
197    }
198
199    pub fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
200        self.remote_dyn_filter_producer_id
201    }
202}
203
204#[derive(Clone)]
205pub struct MergeScanExec {
206    table: TableName,
207    regions: Vec<RegionId>,
208    plan: LogicalPlan,
209    arrow_schema: ArrowSchemaRef,
210    region_query_handler: RegionQueryHandlerRef,
211    metric: ExecutionPlanMetricsSet,
212    properties: Arc<PlanProperties>,
213    /// Metrics from sub stages
214    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
215    /// Metrics for each partition
216    partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
217    query_ctx: QueryContextRef,
218    /// Optional because RDF must fail open: missing ids skip RDF but keep normal query execution.
219    remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
220    captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedDynFilter>>>,
221    target_partition: usize,
222    partition_cols: AliasMapping,
223}
224
225impl std::fmt::Debug for MergeScanExec {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.debug_struct("MergeScanExec")
228            .field("table", &self.table)
229            .field("regions", &self.regions)
230            .field("plan", &self.plan)
231            .finish()
232    }
233}
234
235impl MergeScanExec {
236    #[allow(clippy::too_many_arguments)]
237    pub fn new(
238        session_state: &SessionState,
239        table: TableName,
240        regions: Vec<RegionId>,
241        plan: LogicalPlan,
242        arrow_schema: &ArrowSchema,
243        region_query_handler: RegionQueryHandlerRef,
244        query_ctx: QueryContextRef,
245        target_partition: usize,
246        partition_cols: AliasMapping,
247        remote_dyn_filter_producer_id: Option<RemoteDynFilterProducerId>,
248    ) -> Result<Self> {
249        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
250        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
251        // Reconsider if it's possible to remove it.
252        let arrow_schema = Arc::new(arrow_schema.clone());
253
254        // States the output ordering of the plan.
255        //
256        // When the input plan is a sort, we can use the sort ordering as the output ordering
257        // if the target partition is greater than the number of regions, which means we won't
258        // break the ordering on merging (of MergeScan).
259        //
260        // Otherwise, we need to use the default ordering.
261        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
262            && target_partition >= regions.len()
263        {
264            let lex_ordering = sort
265                .expr
266                .iter()
267                .map(|sort_expr| {
268                    let physical_expr = session_state
269                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
270                    Ok(PhysicalSortExpr::new(
271                        physical_expr,
272                        SortOptions {
273                            descending: !sort_expr.asc,
274                            nulls_first: sort_expr.nulls_first,
275                        },
276                    ))
277                })
278                .collect::<Result<Vec<_>>>()?;
279            EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
280        } else {
281            EquivalenceProperties::new(arrow_schema.clone())
282        };
283
284        let partition_exprs = partition_cols
285            .iter()
286            .filter_map(|col| {
287                if let Some(first_alias) = col.1.first() {
288                    session_state
289                        .create_physical_expr(
290                            Expr::Column(ColumnExpr::new_unqualified(
291                                first_alias.name().to_string(),
292                            )),
293                            plan.schema(),
294                        )
295                        .ok()
296                } else {
297                    None
298                }
299            })
300            .collect();
301        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
302
303        let properties = Arc::new(PlanProperties::new(
304            eq_properties,
305            partitioning,
306            EmissionType::Incremental,
307            Boundedness::Bounded,
308        ));
309        Ok(Self {
310            table,
311            regions,
312            plan,
313            arrow_schema,
314            region_query_handler,
315            metric: ExecutionPlanMetricsSet::new(),
316            sub_stage_metrics: Arc::default(),
317            partition_metrics: Arc::default(),
318            properties,
319            query_ctx,
320            remote_dyn_filter_producer_id,
321            captured_remote_dyn_filters: Arc::default(),
322            target_partition,
323            partition_cols,
324        })
325    }
326
327    pub fn to_stream(
328        &self,
329        context: Arc<TaskContext>,
330        partition: usize,
331    ) -> Result<SendableRecordBatchStream> {
332        // prepare states to move
333        let regions = self.regions.clone();
334        let region_query_handler = self.region_query_handler.clone();
335        let metric = MergeScanMetric::new(&self.metric);
336        let arrow_schema = self.arrow_schema.clone();
337        let query_ctx = self.query_ctx.clone();
338        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
339        let partition_metrics_moved = self.partition_metrics.clone();
340        let plan = self.plan.clone();
341        let target_partition = self.target_partition;
342        let remote_dyn_filter_enabled = remote_dyn_filter_enabled(&self.query_ctx)?;
343        let captured_remote_dyn_filters = if remote_dyn_filter_enabled {
344            self.captured_remote_dyn_filters()
345        } else {
346            Vec::new()
347        };
348        let dbname = context.task_id().unwrap_or_default();
349        let tracing_context = TracingContext::from_json(context.session_id().as_str());
350        let current_channel = self.query_ctx.channel();
351        let read_preference = self.query_ctx.read_preference();
352        let explain_verbose = self.query_ctx.explain_verbose();
353        let remote_dyn_filter_registry_lease = acquire_remote_dyn_filter_registry_lease(
354            context.as_ref(),
355            &query_ctx,
356            &captured_remote_dyn_filters,
357        );
358
359        let stream = Box::pin(stream!({
360            let remote_dyn_filter_registry_lease = remote_dyn_filter_registry_lease;
361            // only report metrics once for each MergeScan
362            if partition == 0 {
363                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
364            }
365
366            let _finish_timer = metric.finish_time().timer();
367            let mut ready_timer = metric.ready_time().timer();
368            let mut first_consume_timer = Some(metric.first_consume_time().timer());
369
370            // Per-partition timings, scoped to this partition's stream for `EXPLAIN VERBOSE`.
371            let partition_start = Instant::now();
372            let mut partition_ready_time: Option<Duration> = None;
373            let mut partition_first_consume_time: Option<Duration> = None;
374
375            for region_id in regions
376                .iter()
377                .skip(partition)
378                .step_by(target_partition)
379                .copied()
380            {
381                let region_span = tracing_context.attach(tracing::info_span!(
382                    parent: &Span::current(),
383                    "merge_scan_region",
384                    region_id = %region_id,
385                    partition = partition
386                ));
387                let region_query_ctx = query_context_for_remote_dyn_filter_region(
388                    &query_ctx,
389                    region_id,
390                    remote_dyn_filter_registry_lease.as_ref(),
391                    &captured_remote_dyn_filters,
392                );
393                let request = QueryRequest {
394                    header: Some(RegionRequestHeader {
395                        tracing_context: tracing_context.to_w3c(),
396                        dbname: dbname.clone(),
397                        query_context: Some((&region_query_ctx).into()),
398                    }),
399                    region_id,
400                    plan: plan.clone(),
401                };
402                let region_start = Instant::now();
403                let do_get_start = Instant::now();
404
405                if explain_verbose {
406                    common_telemetry::info!(
407                        "Merge scan one region, partition: {}, region_id: {}",
408                        partition,
409                        region_id
410                    );
411                }
412
413                let mut stream = region_query_handler
414                    .do_get(read_preference, request)
415                    .instrument(region_span.clone())
416                    .await
417                    .map_err(|e| {
418                        MERGE_SCAN_ERRORS_TOTAL.inc();
419                        DataFusionError::External(Box::new(e))
420                    })?;
421                let do_get_cost = do_get_start.elapsed();
422
423                if let Some(remote_dyn_filter_registry_lease) =
424                    remote_dyn_filter_registry_lease.as_ref()
425                {
426                    remote_dyn_filter_registry_lease
427                        .ensure_fanout_task(region_query_handler.clone());
428                }
429
430                ready_timer.stop();
431                if partition_ready_time.is_none() {
432                    partition_ready_time = Some(partition_start.elapsed());
433                }
434
435                let mut poll_duration = Duration::ZERO;
436                let mut poll_timer = Instant::now();
437                while let Some(batch) = stream.next().instrument(region_span.clone()).await {
438                    let poll_elapsed = poll_timer.elapsed();
439                    poll_duration += poll_elapsed;
440
441                    let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
442                    let batch = patch_batch_timezone(
443                        arrow_schema.clone(),
444                        batch.into_df_record_batch().columns().to_vec(),
445                    )?;
446                    metric.record_output_batch_rows(batch.num_rows());
447                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
448                        first_consume_timer.stop();
449                        partition_first_consume_time = Some(partition_start.elapsed());
450                    }
451
452                    if let Some(metrics) = stream.metrics() {
453                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
454                        sub_stage_metrics.insert(region_id, metrics);
455                    }
456
457                    yield Ok(batch);
458                    // reset poll timer
459                    poll_timer = Instant::now();
460                }
461                // Also stop on an exhausted stream that yielded no batch. The `take()`
462                // guard ensures it only records once, on the first such region.
463                if let Some(mut first_consume_timer) = first_consume_timer.take() {
464                    first_consume_timer.stop();
465                    partition_first_consume_time = Some(partition_start.elapsed());
466                }
467                let total_cost = region_start.elapsed();
468
469                // Record region metrics and push to global partition_metrics
470                let region_metrics = RegionMetrics {
471                    region_id,
472                    poll_duration,
473                    do_get_cost,
474                    total_cost,
475                };
476
477                // Push RegionMetrics to global partition_metrics immediately after scanning this region
478                {
479                    let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
480                    let partition_metrics = partition_metrics_guard
481                        .entry(partition)
482                        .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
483                    partition_metrics.add_region_metrics(region_metrics);
484                }
485
486                if explain_verbose {
487                    common_telemetry::info!(
488                        "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
489                        partition,
490                        region_id,
491                        poll_duration,
492                        metric.first_consume_time(),
493                        do_get_cost
494                    );
495                }
496
497                // process metrics after all data is drained.
498                if let Some(metrics) = stream.metrics() {
499                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
500                    let value = read_meter!(
501                        c,
502                        s,
503                        ReadItem {
504                            cpu_time: metrics.elapsed_compute as u64,
505                            table_scan: metrics.memory_usage as u64
506                        },
507                        current_channel as u8
508                    );
509                    metric.record_greptime_exec_cost(value as usize);
510
511                    // record metrics from sub sgates
512                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
513                    sub_stage_metrics.insert(region_id, metrics);
514                }
515
516                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
517            }
518
519            // Stop the global timers for partitions with no region, otherwise they keep
520            // running until drop and inflate the shared metrics. No-op otherwise.
521            ready_timer.stop();
522            if let Some(mut first_consume_timer) = first_consume_timer.take() {
523                first_consume_timer.stop();
524            }
525
526            // Finish partition metrics and log results
527            let partition_finish_time = partition_start.elapsed();
528            {
529                let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
530                if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
531                    partition_metrics.set_timings(
532                        partition_ready_time.unwrap_or_default(),
533                        partition_first_consume_time.unwrap_or_default(),
534                        partition_finish_time,
535                    );
536                    partition_metrics.finish();
537                }
538            }
539        }));
540
541        Ok(Box::pin(RecordBatchStreamAdapter::new(
542            self.arrow_schema.clone(),
543            stream,
544        )))
545    }
546
547    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
548        let Distribution::HashPartitioned(hash_exprs) = distribution else {
549            // not applicable
550            return None;
551        };
552
553        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
554            && curr_dist == &hash_exprs
555        {
556            // No need to change the distribution
557            return None;
558        }
559
560        let all_partition_col_aliases: HashSet<_> = self
561            .partition_cols
562            .values()
563            .flat_map(|aliases| aliases.iter().map(|c| c.name()))
564            .collect();
565        let overlaps: Vec<_> = hash_exprs
566            .iter()
567            .filter(|expr| {
568                expr.as_any()
569                    .downcast_ref::<Column>()
570                    .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
571            })
572            .cloned()
573            .collect();
574
575        if overlaps.is_empty() {
576            return None;
577        }
578
579        Some(Self {
580            table: self.table.clone(),
581            regions: self.regions.clone(),
582            plan: self.plan.clone(),
583            arrow_schema: self.arrow_schema.clone(),
584            region_query_handler: self.region_query_handler.clone(),
585            metric: self.metric.clone(),
586            properties: Arc::new(PlanProperties::new(
587                self.properties.eq_properties.clone(),
588                Partitioning::Hash(overlaps, self.target_partition),
589                self.properties.emission_type,
590                self.properties.boundedness,
591            )),
592            sub_stage_metrics: self.sub_stage_metrics.clone(),
593            partition_metrics: self.partition_metrics.clone(),
594            query_ctx: self.query_ctx.clone(),
595            remote_dyn_filter_producer_id: self.remote_dyn_filter_producer_id,
596            captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
597            target_partition: self.target_partition,
598            partition_cols: self.partition_cols.clone(),
599        })
600    }
601
602    fn captured_remote_dyn_filters(&self) -> Vec<CapturedDynFilter> {
603        self.captured_remote_dyn_filters.lock().unwrap().clone()
604    }
605
606    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
607        self.sub_stage_metrics
608            .lock()
609            .unwrap()
610            .values()
611            .cloned()
612            .collect()
613    }
614
615    pub fn regions(&self) -> &[RegionId] {
616        &self.regions
617    }
618
619    pub fn is_flow_sink_scan(&self) -> bool {
620        let Some(sink_table_id) =
621            FlowQueryExtensions::parse_flow_extensions(&self.query_ctx.extensions())
622                .ok()
623                .flatten()
624                .and_then(|extensions| extensions.sink_table_id)
625        else {
626            return false;
627        };
628
629        !self.regions.is_empty()
630            && self
631                .regions
632                .iter()
633                .all(|region_id| region_id.table_id() == sink_table_id)
634    }
635
636    pub fn partition_count(&self) -> usize {
637        self.target_partition
638    }
639
640    pub fn region_count(&self) -> usize {
641        self.regions.len()
642    }
643
644    fn partition_metrics(&self) -> Vec<PartitionMetrics> {
645        self.partition_metrics
646            .lock()
647            .unwrap()
648            .values()
649            .cloned()
650            .collect()
651    }
652}
653
654#[cfg(test)]
655impl MergeScanExec {
656    fn remote_dyn_filter_producer_id(&self) -> Option<RemoteDynFilterProducerId> {
657        self.remote_dyn_filter_producer_id
658    }
659}
660
661/// Metrics for a region of a partition.
662#[derive(Debug, Clone)]
663struct RegionMetrics {
664    region_id: RegionId,
665    poll_duration: Duration,
666    do_get_cost: Duration,
667    /// Total cost to scan the region.
668    total_cost: Duration,
669}
670
671/// Metrics for a partition of a MergeScanExec.
672#[derive(Debug, Clone)]
673struct PartitionMetrics {
674    partition: usize,
675    region_metrics: Vec<RegionMetrics>,
676    total_poll_duration: Duration,
677    total_do_get_cost: Duration,
678    total_regions: usize,
679    /// Time until this partition's scan is ready to emit data.
680    ready_time: Duration,
681    /// Time until this partition's first stream poll resolves (a batch or exhausted).
682    first_consume_time: Duration,
683    /// Time until this partition's scan finishes execution.
684    finish_time: Duration,
685    explain_verbose: bool,
686    finished: bool,
687}
688
689impl PartitionMetrics {
690    fn new(partition: usize, explain_verbose: bool) -> Self {
691        Self {
692            partition,
693            region_metrics: Vec::new(),
694            total_poll_duration: Duration::ZERO,
695            total_do_get_cost: Duration::ZERO,
696            total_regions: 0,
697            ready_time: Duration::ZERO,
698            first_consume_time: Duration::ZERO,
699            finish_time: Duration::ZERO,
700            explain_verbose,
701            finished: false,
702        }
703    }
704
705    fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
706        self.total_poll_duration += region_metrics.poll_duration;
707        self.total_do_get_cost += region_metrics.do_get_cost;
708        self.total_regions += 1;
709        self.region_metrics.push(region_metrics);
710    }
711
712    /// Set the per-partition timings captured during streaming.
713    fn set_timings(
714        &mut self,
715        ready_time: Duration,
716        first_consume_time: Duration,
717        finish_time: Duration,
718    ) {
719        self.ready_time = ready_time;
720        self.first_consume_time = first_consume_time;
721        self.finish_time = finish_time;
722    }
723
724    /// Finish the partition metrics and log the results.
725    fn finish(&mut self) {
726        if self.finished {
727            return;
728        }
729        self.finished = true;
730        self.log_metrics();
731    }
732
733    /// Log partition metrics based on explain_verbose level.
734    fn log_metrics(&self) {
735        if self.explain_verbose {
736            common_telemetry::info!(
737                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
738                self.partition,
739                self.total_regions,
740                self.total_poll_duration,
741                self.total_do_get_cost,
742                self.ready_time,
743                self.first_consume_time,
744                self.finish_time
745            );
746        } else {
747            common_telemetry::debug!(
748                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}, ready_time: {:?}, first_consume_time: {:?}, finish_time: {:?}",
749                self.partition,
750                self.total_regions,
751                self.total_poll_duration,
752                self.total_do_get_cost,
753                self.ready_time,
754                self.first_consume_time,
755                self.finish_time
756            );
757        }
758    }
759}
760
761impl Drop for PartitionMetrics {
762    fn drop(&mut self) {
763        if !self.finished {
764            self.log_metrics();
765        }
766    }
767}
768
769impl ExecutionPlan for MergeScanExec {
770    fn as_any(&self) -> &dyn Any {
771        self
772    }
773
774    fn schema(&self) -> ArrowSchemaRef {
775        self.arrow_schema.clone()
776    }
777
778    fn properties(&self) -> &Arc<PlanProperties> {
779        &self.properties
780    }
781
782    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
783        vec![]
784    }
785
786    // DataFusion will swap children unconditionally.
787    // But since this node is leaf node, it's safe to just return self.
788    fn with_new_children(
789        self: Arc<Self>,
790        _children: Vec<Arc<dyn ExecutionPlan>>,
791    ) -> Result<Arc<dyn ExecutionPlan>> {
792        Ok(self.clone())
793    }
794
795    fn handle_child_pushdown_result(
796        &self,
797        _phase: FilterPushdownPhase,
798        child_pushdown_result: ChildPushdownResult,
799        _config: &datafusion::config::ConfigOptions,
800    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
801        let parent_filters = child_pushdown_result
802            .parent_filters
803            .into_iter()
804            .map(|filter| filter.filter)
805            .collect::<Vec<_>>();
806
807        if !remote_dyn_filter_enabled(&self.query_ctx)? {
808            // Reject remote pushdown instead of pretending success: this keeps
809            // DataFusion/local dynamic filter semantics intact while disabling
810            // only FE -> DN remote dynamic filter propagation.
811            self.captured_remote_dyn_filters.lock().unwrap().clear();
812            let new_self = Arc::new(self.clone());
813
814            return Ok(FilterPushdownPropagation {
815                filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
816                updated_node: Some(new_self),
817            });
818        }
819
820        let Some(remote_dyn_filter_producer_id) = self.remote_dyn_filter_producer_id else {
821            // Missing RDF identity disables only RDF, not normal execution.
822            common_telemetry::warn!(
823                "MergeScan remote dynamic filter producer id is not assigned; skipping remote dynamic filter pushdown"
824            );
825            self.captured_remote_dyn_filters.lock().unwrap().clear();
826            let new_self = Arc::new(self.clone());
827
828            return Ok(FilterPushdownPropagation {
829                filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
830                updated_node: Some(new_self),
831            });
832        };
833        let remote_dyn_filter_pushdown =
834            capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
835        *self.captured_remote_dyn_filters.lock().unwrap() =
836            remote_dyn_filter_pushdown.captured_dyn_filters;
837        let new_self = Arc::new(self.clone());
838
839        Ok(FilterPushdownPropagation {
840            filters: remote_dyn_filter_pushdown
841                .pushed_down
842                .into_iter()
843                .map(|pushdown_ready| {
844                    if pushdown_ready {
845                        PushedDown::Yes
846                    } else {
847                        PushedDown::No
848                    }
849                })
850                .collect(),
851            updated_node: Some(new_self),
852        })
853    }
854
855    fn execute(
856        &self,
857        partition: usize,
858        context: Arc<TaskContext>,
859    ) -> Result<SendableRecordBatchStream> {
860        self.to_stream(context, partition)
861    }
862
863    fn metrics(&self) -> Option<MetricsSet> {
864        Some(self.metric.clone_inner())
865    }
866
867    fn name(&self) -> &str {
868        "MergeScanExec"
869    }
870}
871
872impl DisplayAs for MergeScanExec {
873    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
874        write!(f, "MergeScanExec: peers=[")?;
875        for region_id in self.regions.iter() {
876            write!(f, "{}, ", region_id)?;
877        }
878        write!(f, "]")?;
879
880        if matches!(t, DisplayFormatType::Verbose) {
881            let partition_metrics = self.partition_metrics();
882            if !partition_metrics.is_empty() {
883                write!(f, ", metrics={{")?;
884                for (i, pm) in partition_metrics.iter().enumerate() {
885                    if i > 0 {
886                        write!(f, ", ")?;
887                    }
888                    write!(
889                        f,
890                        "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"ready_time\":\"{:?}\",\"first_consume_time\":\"{:?}\",\"finish_time\":\"{:?}\",\"region_metrics\":[",
891                        pm.partition,
892                        pm.total_regions,
893                        pm.total_poll_duration,
894                        pm.total_do_get_cost,
895                        pm.ready_time,
896                        pm.first_consume_time,
897                        pm.finish_time
898                    )?;
899                    for (j, rm) in pm.region_metrics.iter().enumerate() {
900                        if j > 0 {
901                            write!(f, ",")?;
902                        }
903                        write!(
904                            f,
905                            "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
906                            rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
907                        )?;
908                    }
909                    write!(f, "]}}")?;
910                }
911                write!(f, "}}")?;
912            }
913        }
914
915        Ok(())
916    }
917}
918
919#[derive(Debug, Clone)]
920struct MergeScanMetric {
921    /// Nanosecond elapsed till the scan operator is ready to emit data
922    ready_time: Time,
923    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
924    first_consume_time: Time,
925    /// Nanosecond elapsed till the scan operator finished execution
926    finish_time: Time,
927    /// Count of rows fetched from remote
928    output_rows: Count,
929
930    /// Gauge for greptime plan execution cost metrics for output
931    greptime_exec_cost: Gauge,
932}
933
934impl MergeScanMetric {
935    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
936        Self {
937            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
938            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
939            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
940            output_rows: MetricBuilder::new(metric).output_rows(1),
941            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
942        }
943    }
944
945    pub fn ready_time(&self) -> &Time {
946        &self.ready_time
947    }
948
949    pub fn first_consume_time(&self) -> &Time {
950        &self.first_consume_time
951    }
952
953    pub fn finish_time(&self) -> &Time {
954        &self.finish_time
955    }
956
957    pub fn record_output_batch_rows(&self, num_rows: usize) {
958        self.output_rows.add(num_rows);
959    }
960
961    pub fn record_greptime_exec_cost(&self, metrics: usize) {
962        self.greptime_exec_cost.add(metrics);
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use std::collections::BTreeSet;
969
970    use async_trait::async_trait;
971    use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
972    use datafusion::config::ConfigOptions;
973    use datafusion::execution::SessionStateBuilder;
974    use datafusion::physical_plan::filter_pushdown::ChildFilterPushdownResult;
975    use datafusion_common::TableReference;
976    use datafusion_expr::{LogicalPlanBuilder, lit};
977    use datafusion_physical_expr::Distribution;
978    use datafusion_physical_expr::expressions::{
979        Column, DynamicFilterPhysicalExpr, lit as physical_lit,
980    };
981    use session::ReadPreference;
982    use session::context::QueryContext;
983    use session::query_id::QueryId;
984    use table::table_name::TableName;
985    use uuid::Uuid;
986
987    use super::*;
988    use crate::dist_plan::{DynFilterRegistryManager, Subscriber};
989    use crate::region_query::RegionQueryHandler;
990
991    fn test_query_id(value: u128) -> QueryId {
992        QueryId::from(Uuid::from_u128(value))
993    }
994
995    #[test]
996    fn remote_dyn_filter_region_query_context_registers_before_do_get() {
997        let registry_manager = Arc::new(DynFilterRegistryManager::default());
998        let query_ctx = QueryContext::arc();
999        let query_id = query_ctx
1000            .remote_query_id_value()
1001            .expect("query context must have remote query id");
1002        let lease = registry_manager.acquire_lease(query_id);
1003        let region_id = RegionId::new(1024, 7);
1004        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1005            vec![Arc::new(Column::new("host", 0)) as Arc<_>],
1006            physical_lit(true) as _,
1007        )) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
1008        let captured = capture_remote_dyn_filters_for_pushdown(
1009            RemoteDynFilterProducerId::new(42),
1010            vec![dyn_filter],
1011        );
1012        assert_eq!(captured.captured_dyn_filters.len(), 1);
1013
1014        let region_query_ctx = query_context_for_remote_dyn_filter_region(
1015            &query_ctx,
1016            region_id,
1017            Some(&lease),
1018            &captured.captured_dyn_filters,
1019        );
1020
1021        let entries = lease.registry().entries();
1022        assert_eq!(entries.len(), 1);
1023        assert_eq!(entries[0].subscribers(), vec![Subscriber::new(region_id)]);
1024        assert!(
1025            !entries[0].fanout_started_for_test(),
1026            "fanout must start only after do_get succeeds"
1027        );
1028        assert!(
1029            region_query_ctx
1030                .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
1031                .is_some(),
1032            "initial RDF registrations must be present in the do_get query context"
1033        );
1034    }
1035
1036    #[test]
1037    fn remote_dyn_filter_registry_cleanup_waits_for_last_query_scoped_stream_drop() {
1038        let registry_manager = Arc::new(DynFilterRegistryManager::default());
1039        let query_id = test_query_id(1);
1040
1041        let first = registry_manager.acquire_lease(query_id);
1042        let second = registry_manager.acquire_lease(query_id);
1043
1044        drop(first);
1045        assert_eq!(registry_manager.registry_count(), 1);
1046
1047        drop(second);
1048        assert_eq!(registry_manager.registry_count(), 0);
1049    }
1050
1051    #[test]
1052    fn remote_dyn_filter_registry_cleanup_shares_query_scope_across_independent_leases() {
1053        let registry_manager = Arc::new(DynFilterRegistryManager::default());
1054        let query_id = test_query_id(1);
1055
1056        let first_exec_like_lease = registry_manager.acquire_lease(query_id);
1057        let second_exec_like_lease = registry_manager.acquire_lease(query_id);
1058
1059        drop(first_exec_like_lease);
1060        assert_eq!(registry_manager.registry_count(), 1);
1061
1062        drop(second_exec_like_lease);
1063        assert_eq!(registry_manager.registry_count(), 0);
1064    }
1065
1066    struct TestRegionQueryHandler;
1067
1068    #[async_trait]
1069    impl RegionQueryHandler for TestRegionQueryHandler {
1070        async fn do_get(
1071            &self,
1072            _read_preference: ReadPreference,
1073            _request: common_query::request::QueryRequest,
1074        ) -> crate::error::Result<common_recordbatch::SendableRecordBatchStream> {
1075            unimplemented!("test only")
1076        }
1077
1078        async fn handle_remote_dyn_filter_update(
1079            &self,
1080            _region_id: RegionId,
1081            _query_id: String,
1082            _update: api::v1::region::RemoteDynFilterUpdate,
1083        ) -> crate::error::Result<()> {
1084            unimplemented!("test only")
1085        }
1086
1087        async fn handle_remote_dyn_filter_unregister(
1088            &self,
1089            _region_id: RegionId,
1090            _query_id: String,
1091            _unregister: api::v1::region::RemoteDynFilterUnregister,
1092        ) -> crate::error::Result<()> {
1093            unimplemented!("test only")
1094        }
1095    }
1096
1097    #[test]
1098    fn try_with_new_distribution_preserves_remote_dyn_filter_producer_id() {
1099        let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
1100
1101        // Build a plan whose schema contains "col1"
1102        let plan = LogicalPlanBuilder::empty(true)
1103            .project(vec![lit(1i32).alias("col1")])
1104            .unwrap()
1105            .build()
1106            .unwrap();
1107
1108        let schema = plan.schema().as_arrow().clone();
1109        let table = TableName::new("catalog", "schema", "table");
1110        let regions = vec![RegionId::new(1024, 1)];
1111        let query_ctx = QueryContext::arc();
1112
1113        // Non-empty partition_cols so try_with_new_distribution can detect an overlap
1114        let mut partition_cols = AliasMapping::new();
1115        partition_cols.insert(
1116            "col1".to_string(),
1117            BTreeSet::from([ColumnExpr::new(Some(TableReference::bare("table")), "col1")]),
1118        );
1119
1120        let session_state = SessionStateBuilder::new().build();
1121
1122        let handler = Arc::new(TestRegionQueryHandler);
1123        let target_partition = 2;
1124
1125        let exec = MergeScanExec::new(
1126            &session_state,
1127            table,
1128            regions,
1129            plan,
1130            &schema,
1131            handler,
1132            query_ctx,
1133            target_partition,
1134            partition_cols,
1135            Some(remote_dyn_filter_producer_id),
1136        )
1137        .unwrap();
1138
1139        assert_eq!(
1140            exec.remote_dyn_filter_producer_id(),
1141            Some(remote_dyn_filter_producer_id)
1142        );
1143
1144        // A distribution that differs from the current partitioning but shares a
1145        // column name present in partition_cols, so try_with_new_distribution
1146        // produces a clone instead of returning None.
1147        let new_dist = Distribution::HashPartitioned(vec![
1148            Arc::new(Column::new("col1", 0)),
1149            Arc::new(Column::new("col2", 1)),
1150        ]);
1151
1152        let cloned = exec
1153            .try_with_new_distribution(new_dist)
1154            .expect("expected a cloned exec with overlapping partition col");
1155
1156        assert_eq!(
1157            cloned.remote_dyn_filter_producer_id(),
1158            Some(remote_dyn_filter_producer_id),
1159            "try_with_new_distribution must preserve remote dynamic filter producer id"
1160        );
1161    }
1162
1163    #[test]
1164    fn remote_dyn_filter_preflight_removes_parent_filter_after_dn_runtime_is_ready() {
1165        let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
1166        let plan = LogicalPlanBuilder::empty(true)
1167            .project(vec![lit(1i32).alias("col1")])
1168            .unwrap()
1169            .build()
1170            .unwrap();
1171
1172        let schema = plan.schema().as_arrow().clone();
1173        let table = TableName::new("catalog", "schema", "table");
1174        let regions = vec![RegionId::new(1024, 1)];
1175        let query_ctx = QueryContext::arc();
1176        let session_state = SessionStateBuilder::new().build();
1177        let handler = Arc::new(TestRegionQueryHandler);
1178        let exec = MergeScanExec::new(
1179            &session_state,
1180            table,
1181            regions,
1182            plan,
1183            &schema,
1184            handler,
1185            query_ctx,
1186            1,
1187            AliasMapping::new(),
1188            Some(remote_dyn_filter_producer_id),
1189        )
1190        .unwrap();
1191        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1192            vec![Arc::new(Column::new("host", 0)) as Arc<_>],
1193            physical_lit(true) as _,
1194        )) as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
1195
1196        let propagation = exec
1197            .handle_child_pushdown_result(
1198                FilterPushdownPhase::Post,
1199                ChildPushdownResult {
1200                    parent_filters: vec![ChildFilterPushdownResult {
1201                        filter: dyn_filter,
1202                        child_results: vec![PushedDown::Yes],
1203                    }],
1204                    self_filters: Vec::new(),
1205                },
1206                &ConfigOptions::new(),
1207            )
1208            .unwrap();
1209
1210        assert_eq!(exec.captured_remote_dyn_filters().len(), 1);
1211        assert!(matches!(propagation.filters.as_slice(), [PushedDown::Yes]));
1212    }
1213}