query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::execution::{SessionState, TaskContext};
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::metrics::{
30    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
31};
32use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion::physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
35    SendableRecordBatchStream,
36};
37use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
38use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
39use datafusion_physical_expr::expressions::Column;
40use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
41use futures_util::StreamExt;
42use greptime_proto::v1::region::RegionRequestHeader;
43use meter_core::data::ReadItem;
44use meter_macros::read_meter;
45use session::context::QueryContextRef;
46use store_api::storage::RegionId;
47use table::table_name::TableName;
48use tokio::time::Instant;
49use tracing::{Instrument, Span};
50
51use crate::dist_plan::analyzer::AliasMapping;
52use crate::dist_plan::analyzer::utils::patch_batch_timezone;
53use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
54use crate::region_query::RegionQueryHandlerRef;
55
56#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
57pub struct MergeScanLogicalPlan {
58    /// In logical plan phase it only contains one input
59    input: LogicalPlan,
60    /// If this plan is a placeholder
61    is_placeholder: bool,
62    partition_cols: AliasMapping,
63}
64
65impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
66    fn name(&self) -> &str {
67        Self::name()
68    }
69
70    // Prevent further optimization.
71    // The input can be retrieved by `self.input()`
72    fn inputs(&self) -> Vec<&LogicalPlan> {
73        vec![]
74    }
75
76    fn schema(&self) -> &datafusion_common::DFSchemaRef {
77        self.input.schema()
78    }
79
80    // Prevent further optimization
81    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
82        vec![]
83    }
84
85    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
86        write!(
87            f,
88            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
89            self.is_placeholder, self.input
90        )
91    }
92
93    fn with_exprs_and_inputs(
94        &self,
95        _exprs: Vec<datafusion::prelude::Expr>,
96        _inputs: Vec<LogicalPlan>,
97    ) -> Result<Self> {
98        Ok(self.clone())
99    }
100}
101
102impl MergeScanLogicalPlan {
103    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
104        Self {
105            input,
106            is_placeholder,
107            partition_cols,
108        }
109    }
110
111    pub fn name() -> &'static str {
112        "MergeScan"
113    }
114
115    /// Create a [LogicalPlan::Extension] node from this merge scan plan
116    pub fn into_logical_plan(self) -> LogicalPlan {
117        LogicalPlan::Extension(Extension {
118            node: Arc::new(self),
119        })
120    }
121
122    pub fn is_placeholder(&self) -> bool {
123        self.is_placeholder
124    }
125
126    pub fn input(&self) -> &LogicalPlan {
127        &self.input
128    }
129
130    pub fn partition_cols(&self) -> &AliasMapping {
131        &self.partition_cols
132    }
133}
134
135pub struct MergeScanExec {
136    table: TableName,
137    regions: Vec<RegionId>,
138    plan: LogicalPlan,
139    arrow_schema: ArrowSchemaRef,
140    region_query_handler: RegionQueryHandlerRef,
141    metric: ExecutionPlanMetricsSet,
142    properties: PlanProperties,
143    /// Metrics from sub stages
144    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
145    /// Metrics for each partition
146    partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
147    query_ctx: QueryContextRef,
148    target_partition: usize,
149    partition_cols: AliasMapping,
150}
151
152impl std::fmt::Debug for MergeScanExec {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("MergeScanExec")
155            .field("table", &self.table)
156            .field("regions", &self.regions)
157            .field("plan", &self.plan)
158            .finish()
159    }
160}
161
162impl MergeScanExec {
163    #[allow(clippy::too_many_arguments)]
164    pub fn new(
165        session_state: &SessionState,
166        table: TableName,
167        regions: Vec<RegionId>,
168        plan: LogicalPlan,
169        arrow_schema: &ArrowSchema,
170        region_query_handler: RegionQueryHandlerRef,
171        query_ctx: QueryContextRef,
172        target_partition: usize,
173        partition_cols: AliasMapping,
174    ) -> Result<Self> {
175        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
176        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
177        // Reconsider if it's possible to remove it.
178        let arrow_schema = Arc::new(arrow_schema.clone());
179
180        // States the output ordering of the plan.
181        //
182        // When the input plan is a sort, we can use the sort ordering as the output ordering
183        // if the target partition is greater than the number of regions, which means we won't
184        // break the ordering on merging (of MergeScan).
185        //
186        // Otherwise, we need to use the default ordering.
187        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
188            && target_partition >= regions.len()
189        {
190            let lex_ordering = sort
191                .expr
192                .iter()
193                .map(|sort_expr| {
194                    let physical_expr = session_state
195                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
196                    Ok(PhysicalSortExpr::new(
197                        physical_expr,
198                        SortOptions {
199                            descending: !sort_expr.asc,
200                            nulls_first: sort_expr.nulls_first,
201                        },
202                    ))
203                })
204                .collect::<Result<Vec<_>>>()?;
205            EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
206        } else {
207            EquivalenceProperties::new(arrow_schema.clone())
208        };
209
210        let partition_exprs = partition_cols
211            .iter()
212            .filter_map(|col| {
213                if let Some(first_alias) = col.1.first() {
214                    session_state
215                        .create_physical_expr(
216                            Expr::Column(ColumnExpr::new_unqualified(
217                                first_alias.name().to_string(),
218                            )),
219                            plan.schema(),
220                        )
221                        .ok()
222                } else {
223                    None
224                }
225            })
226            .collect();
227        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
228
229        let properties = PlanProperties::new(
230            eq_properties,
231            partitioning,
232            EmissionType::Incremental,
233            Boundedness::Bounded,
234        );
235        Ok(Self {
236            table,
237            regions,
238            plan,
239            arrow_schema,
240            region_query_handler,
241            metric: ExecutionPlanMetricsSet::new(),
242            sub_stage_metrics: Arc::default(),
243            partition_metrics: Arc::default(),
244            properties,
245            query_ctx,
246            target_partition,
247            partition_cols,
248        })
249    }
250
251    pub fn to_stream(
252        &self,
253        context: Arc<TaskContext>,
254        partition: usize,
255    ) -> Result<SendableRecordBatchStream> {
256        // prepare states to move
257        let regions = self.regions.clone();
258        let region_query_handler = self.region_query_handler.clone();
259        let metric = MergeScanMetric::new(&self.metric);
260        let arrow_schema = self.arrow_schema.clone();
261        let query_ctx = self.query_ctx.clone();
262        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
263        let partition_metrics_moved = self.partition_metrics.clone();
264        let plan = self.plan.clone();
265        let target_partition = self.target_partition;
266        let dbname = context.task_id().unwrap_or_default();
267        let tracing_context = TracingContext::from_json(context.session_id().as_str());
268        let current_channel = self.query_ctx.channel();
269        let read_preference = self.query_ctx.read_preference();
270        let explain_verbose = self.query_ctx.explain_verbose();
271
272        let stream = Box::pin(stream!({
273            // only report metrics once for each MergeScan
274            if partition == 0 {
275                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
276            }
277
278            let _finish_timer = metric.finish_time().timer();
279            let mut ready_timer = metric.ready_time().timer();
280            let mut first_consume_timer = Some(metric.first_consume_time().timer());
281
282            for region_id in regions
283                .iter()
284                .skip(partition)
285                .step_by(target_partition)
286                .copied()
287            {
288                let region_span = tracing_context.attach(tracing::info_span!(
289                    parent: &Span::current(),
290                    "merge_scan_region",
291                    region_id = %region_id,
292                    partition = partition
293                ));
294                let request = QueryRequest {
295                    header: Some(RegionRequestHeader {
296                        tracing_context: tracing_context.to_w3c(),
297                        dbname: dbname.clone(),
298                        query_context: Some(query_ctx.as_ref().into()),
299                    }),
300                    region_id,
301                    plan: plan.clone(),
302                };
303                let region_start = Instant::now();
304                let do_get_start = Instant::now();
305
306                if explain_verbose {
307                    common_telemetry::info!(
308                        "Merge scan one region, partition: {}, region_id: {}",
309                        partition,
310                        region_id
311                    );
312                }
313
314                let mut stream = region_query_handler
315                    .do_get(read_preference, request)
316                    .instrument(region_span.clone())
317                    .await
318                    .map_err(|e| {
319                        MERGE_SCAN_ERRORS_TOTAL.inc();
320                        DataFusionError::External(Box::new(e))
321                    })?;
322                let do_get_cost = do_get_start.elapsed();
323
324                ready_timer.stop();
325
326                let mut poll_duration = Duration::ZERO;
327                let mut poll_timer = Instant::now();
328                while let Some(batch) = stream.next().instrument(region_span.clone()).await {
329                    let poll_elapsed = poll_timer.elapsed();
330                    poll_duration += poll_elapsed;
331
332                    let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
333                    let batch = patch_batch_timezone(
334                        arrow_schema.clone(),
335                        batch.into_df_record_batch().columns().to_vec(),
336                    )?;
337                    metric.record_output_batch_rows(batch.num_rows());
338                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
339                        first_consume_timer.stop();
340                    }
341
342                    if let Some(metrics) = stream.metrics() {
343                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
344                        sub_stage_metrics.insert(region_id, metrics);
345                    }
346
347                    yield Ok(batch);
348                    // reset poll timer
349                    poll_timer = Instant::now();
350                }
351                let total_cost = region_start.elapsed();
352
353                // Record region metrics and push to global partition_metrics
354                let region_metrics = RegionMetrics {
355                    region_id,
356                    poll_duration,
357                    do_get_cost,
358                    total_cost,
359                };
360
361                // Push RegionMetrics to global partition_metrics immediately after scanning this region
362                {
363                    let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
364                    let partition_metrics = partition_metrics_guard
365                        .entry(partition)
366                        .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
367                    partition_metrics.add_region_metrics(region_metrics);
368                }
369
370                if explain_verbose {
371                    common_telemetry::info!(
372                        "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
373                        partition,
374                        region_id,
375                        poll_duration,
376                        metric.first_consume_time(),
377                        do_get_cost
378                    );
379                }
380
381                // process metrics after all data is drained.
382                if let Some(metrics) = stream.metrics() {
383                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
384                    let value = read_meter!(
385                        c,
386                        s,
387                        ReadItem {
388                            cpu_time: metrics.elapsed_compute as u64,
389                            table_scan: metrics.memory_usage as u64
390                        },
391                        current_channel as u8
392                    );
393                    metric.record_greptime_exec_cost(value as usize);
394
395                    // record metrics from sub sgates
396                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
397                    sub_stage_metrics.insert(region_id, metrics);
398                }
399
400                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
401            }
402
403            // Finish partition metrics and log results
404            {
405                let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
406                if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
407                    partition_metrics.finish();
408                }
409            }
410        }));
411
412        Ok(Box::pin(RecordBatchStreamAdapter::new(
413            self.arrow_schema.clone(),
414            stream,
415        )))
416    }
417
418    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
419        let Distribution::HashPartitioned(hash_exprs) = distribution else {
420            // not applicable
421            return None;
422        };
423
424        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
425            && curr_dist == &hash_exprs
426        {
427            // No need to change the distribution
428            return None;
429        }
430
431        let all_partition_col_aliases: HashSet<_> = self
432            .partition_cols
433            .values()
434            .flat_map(|aliases| aliases.iter().map(|c| c.name()))
435            .collect();
436        let mut overlaps = vec![];
437        for expr in &hash_exprs {
438            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
439                && all_partition_col_aliases.contains(col_expr.name())
440            {
441                overlaps.push(expr.clone());
442            }
443        }
444
445        if overlaps.is_empty() {
446            return None;
447        }
448
449        Some(Self {
450            table: self.table.clone(),
451            regions: self.regions.clone(),
452            plan: self.plan.clone(),
453            arrow_schema: self.arrow_schema.clone(),
454            region_query_handler: self.region_query_handler.clone(),
455            metric: self.metric.clone(),
456            properties: PlanProperties::new(
457                self.properties.eq_properties.clone(),
458                Partitioning::Hash(overlaps, self.target_partition),
459                self.properties.emission_type,
460                self.properties.boundedness,
461            ),
462            sub_stage_metrics: self.sub_stage_metrics.clone(),
463            partition_metrics: self.partition_metrics.clone(),
464            query_ctx: self.query_ctx.clone(),
465            target_partition: self.target_partition,
466            partition_cols: self.partition_cols.clone(),
467        })
468    }
469
470    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
471        self.sub_stage_metrics
472            .lock()
473            .unwrap()
474            .values()
475            .cloned()
476            .collect()
477    }
478
479    pub fn partition_count(&self) -> usize {
480        self.target_partition
481    }
482
483    pub fn region_count(&self) -> usize {
484        self.regions.len()
485    }
486
487    fn partition_metrics(&self) -> Vec<PartitionMetrics> {
488        self.partition_metrics
489            .lock()
490            .unwrap()
491            .values()
492            .cloned()
493            .collect()
494    }
495}
496
497/// Metrics for a region of a partition.
498#[derive(Debug, Clone)]
499struct RegionMetrics {
500    region_id: RegionId,
501    poll_duration: Duration,
502    do_get_cost: Duration,
503    /// Total cost to scan the region.
504    total_cost: Duration,
505}
506
507/// Metrics for a partition of a MergeScanExec.
508#[derive(Debug, Clone)]
509struct PartitionMetrics {
510    partition: usize,
511    region_metrics: Vec<RegionMetrics>,
512    total_poll_duration: Duration,
513    total_do_get_cost: Duration,
514    total_regions: usize,
515    explain_verbose: bool,
516    finished: bool,
517}
518
519impl PartitionMetrics {
520    fn new(partition: usize, explain_verbose: bool) -> Self {
521        Self {
522            partition,
523            region_metrics: Vec::new(),
524            total_poll_duration: Duration::ZERO,
525            total_do_get_cost: Duration::ZERO,
526            total_regions: 0,
527            explain_verbose,
528            finished: false,
529        }
530    }
531
532    fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
533        self.total_poll_duration += region_metrics.poll_duration;
534        self.total_do_get_cost += region_metrics.do_get_cost;
535        self.total_regions += 1;
536        self.region_metrics.push(region_metrics);
537    }
538
539    /// Finish the partition metrics and log the results.
540    fn finish(&mut self) {
541        if self.finished {
542            return;
543        }
544        self.finished = true;
545        self.log_metrics();
546    }
547
548    /// Log partition metrics based on explain_verbose level.
549    fn log_metrics(&self) {
550        if self.explain_verbose {
551            common_telemetry::info!(
552                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
553                self.partition,
554                self.total_regions,
555                self.total_poll_duration,
556                self.total_do_get_cost
557            );
558        } else {
559            common_telemetry::debug!(
560                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
561                self.partition,
562                self.total_regions,
563                self.total_poll_duration,
564                self.total_do_get_cost
565            );
566        }
567    }
568}
569
570impl Drop for PartitionMetrics {
571    fn drop(&mut self) {
572        if !self.finished {
573            self.log_metrics();
574        }
575    }
576}
577
578impl ExecutionPlan for MergeScanExec {
579    fn as_any(&self) -> &dyn Any {
580        self
581    }
582
583    fn schema(&self) -> ArrowSchemaRef {
584        self.arrow_schema.clone()
585    }
586
587    fn properties(&self) -> &PlanProperties {
588        &self.properties
589    }
590
591    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
592        vec![]
593    }
594
595    // DataFusion will swap children unconditionally.
596    // But since this node is leaf node, it's safe to just return self.
597    fn with_new_children(
598        self: Arc<Self>,
599        _children: Vec<Arc<dyn ExecutionPlan>>,
600    ) -> Result<Arc<dyn ExecutionPlan>> {
601        Ok(self.clone())
602    }
603
604    fn execute(
605        &self,
606        partition: usize,
607        context: Arc<TaskContext>,
608    ) -> Result<SendableRecordBatchStream> {
609        self.to_stream(context, partition)
610    }
611
612    fn metrics(&self) -> Option<MetricsSet> {
613        Some(self.metric.clone_inner())
614    }
615
616    fn name(&self) -> &str {
617        "MergeScanExec"
618    }
619}
620
621impl DisplayAs for MergeScanExec {
622    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
623        write!(f, "MergeScanExec: peers=[")?;
624        for region_id in self.regions.iter() {
625            write!(f, "{}, ", region_id)?;
626        }
627        write!(f, "]")?;
628
629        if matches!(t, DisplayFormatType::Verbose) {
630            let partition_metrics = self.partition_metrics();
631            if !partition_metrics.is_empty() {
632                write!(f, ", metrics={{")?;
633                for (i, pm) in partition_metrics.iter().enumerate() {
634                    if i > 0 {
635                        write!(f, ", ")?;
636                    }
637                    write!(
638                        f,
639                        "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
640                        pm.partition,
641                        pm.total_regions,
642                        pm.total_poll_duration,
643                        pm.total_do_get_cost
644                    )?;
645                    for (j, rm) in pm.region_metrics.iter().enumerate() {
646                        if j > 0 {
647                            write!(f, ",")?;
648                        }
649                        write!(
650                            f,
651                            "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
652                            rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
653                        )?;
654                    }
655                    write!(f, "]}}")?;
656                }
657                write!(f, "}}")?;
658            }
659        }
660
661        Ok(())
662    }
663}
664
665#[derive(Debug, Clone)]
666struct MergeScanMetric {
667    /// Nanosecond elapsed till the scan operator is ready to emit data
668    ready_time: Time,
669    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
670    first_consume_time: Time,
671    /// Nanosecond elapsed till the scan operator finished execution
672    finish_time: Time,
673    /// Count of rows fetched from remote
674    output_rows: Count,
675
676    /// Gauge for greptime plan execution cost metrics for output
677    greptime_exec_cost: Gauge,
678}
679
680impl MergeScanMetric {
681    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
682        Self {
683            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
684            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
685            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
686            output_rows: MetricBuilder::new(metric).output_rows(1),
687            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
688        }
689    }
690
691    pub fn ready_time(&self) -> &Time {
692        &self.ready_time
693    }
694
695    pub fn first_consume_time(&self) -> &Time {
696        &self.first_consume_time
697    }
698
699    pub fn finish_time(&self) -> &Time {
700        &self.finish_time
701    }
702
703    pub fn record_output_batch_rows(&self, num_rows: usize) {
704        self.output_rows.add(num_rows);
705    }
706
707    pub fn record_greptime_exec_cost(&self, metrics: usize) {
708        self.greptime_exec_cost.add(metrics);
709    }
710}