query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::arrow::record_batch::RecordBatch;
28use datafusion::execution::{SessionState, TaskContext};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::metrics::{
31    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
32};
33use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
36    SendableRecordBatchStream,
37};
38use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
39use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
40use datafusion_physical_expr::expressions::Column;
41use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
42use futures_util::StreamExt;
43use greptime_proto::v1::region::RegionRequestHeader;
44use meter_core::data::ReadItem;
45use meter_macros::read_meter;
46use session::context::QueryContextRef;
47use store_api::storage::RegionId;
48use table::table_name::TableName;
49use tokio::time::Instant;
50
51use crate::dist_plan::analyzer::AliasMapping;
52use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
53use crate::region_query::RegionQueryHandlerRef;
54
55#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
56pub struct MergeScanLogicalPlan {
57    /// In logical plan phase it only contains one input
58    input: LogicalPlan,
59    /// If this plan is a placeholder
60    is_placeholder: bool,
61    partition_cols: AliasMapping,
62}
63
64impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
65    fn name(&self) -> &str {
66        Self::name()
67    }
68
69    // Prevent further optimization.
70    // The input can be retrieved by `self.input()`
71    fn inputs(&self) -> Vec<&LogicalPlan> {
72        vec![]
73    }
74
75    fn schema(&self) -> &datafusion_common::DFSchemaRef {
76        self.input.schema()
77    }
78
79    // Prevent further optimization
80    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
81        vec![]
82    }
83
84    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
85        write!(
86            f,
87            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
88            self.is_placeholder, self.input
89        )
90    }
91
92    fn with_exprs_and_inputs(
93        &self,
94        _exprs: Vec<datafusion::prelude::Expr>,
95        _inputs: Vec<LogicalPlan>,
96    ) -> Result<Self> {
97        Ok(self.clone())
98    }
99}
100
101impl MergeScanLogicalPlan {
102    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
103        Self {
104            input,
105            is_placeholder,
106            partition_cols,
107        }
108    }
109
110    pub fn name() -> &'static str {
111        "MergeScan"
112    }
113
114    /// Create a [LogicalPlan::Extension] node from this merge scan plan
115    pub fn into_logical_plan(self) -> LogicalPlan {
116        LogicalPlan::Extension(Extension {
117            node: Arc::new(self),
118        })
119    }
120
121    pub fn is_placeholder(&self) -> bool {
122        self.is_placeholder
123    }
124
125    pub fn input(&self) -> &LogicalPlan {
126        &self.input
127    }
128
129    pub fn partition_cols(&self) -> &AliasMapping {
130        &self.partition_cols
131    }
132}
133
134pub struct MergeScanExec {
135    table: TableName,
136    regions: Vec<RegionId>,
137    plan: LogicalPlan,
138    arrow_schema: ArrowSchemaRef,
139    region_query_handler: RegionQueryHandlerRef,
140    metric: ExecutionPlanMetricsSet,
141    properties: PlanProperties,
142    /// Metrics from sub stages
143    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
144    /// Metrics for each partition
145    partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
146    query_ctx: QueryContextRef,
147    target_partition: usize,
148    partition_cols: AliasMapping,
149}
150
151impl std::fmt::Debug for MergeScanExec {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("MergeScanExec")
154            .field("table", &self.table)
155            .field("regions", &self.regions)
156            .field("plan", &self.plan)
157            .finish()
158    }
159}
160
161impl MergeScanExec {
162    #[allow(clippy::too_many_arguments)]
163    pub fn new(
164        session_state: &SessionState,
165        table: TableName,
166        regions: Vec<RegionId>,
167        plan: LogicalPlan,
168        arrow_schema: &ArrowSchema,
169        region_query_handler: RegionQueryHandlerRef,
170        query_ctx: QueryContextRef,
171        target_partition: usize,
172        partition_cols: AliasMapping,
173    ) -> Result<Self> {
174        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
175        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
176        // Reconsider if it's possible to remove it.
177        let arrow_schema = Arc::new(arrow_schema.clone());
178
179        // States the output ordering of the plan.
180        //
181        // When the input plan is a sort, we can use the sort ordering as the output ordering
182        // if the target partition is greater than the number of regions, which means we won't
183        // break the ordering on merging (of MergeScan).
184        //
185        // Otherwise, we need to use the default ordering.
186        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
187            && target_partition >= regions.len()
188        {
189            let lex_ordering = sort
190                .expr
191                .iter()
192                .map(|sort_expr| {
193                    let physical_expr = session_state
194                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
195                    Ok(PhysicalSortExpr::new(
196                        physical_expr,
197                        SortOptions {
198                            descending: !sort_expr.asc,
199                            nulls_first: sort_expr.nulls_first,
200                        },
201                    ))
202                })
203                .collect::<Result<Vec<_>>>()?;
204            EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
205        } else {
206            EquivalenceProperties::new(arrow_schema.clone())
207        };
208
209        let partition_exprs = partition_cols
210            .iter()
211            .filter_map(|col| {
212                if let Some(first_alias) = col.1.first() {
213                    session_state
214                        .create_physical_expr(
215                            Expr::Column(ColumnExpr::new_unqualified(
216                                first_alias.name().to_string(),
217                            )),
218                            plan.schema(),
219                        )
220                        .ok()
221                } else {
222                    None
223                }
224            })
225            .collect();
226        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
227
228        let properties = PlanProperties::new(
229            eq_properties,
230            partitioning,
231            EmissionType::Incremental,
232            Boundedness::Bounded,
233        );
234        Ok(Self {
235            table,
236            regions,
237            plan,
238            arrow_schema,
239            region_query_handler,
240            metric: ExecutionPlanMetricsSet::new(),
241            sub_stage_metrics: Arc::default(),
242            partition_metrics: Arc::default(),
243            properties,
244            query_ctx,
245            target_partition,
246            partition_cols,
247        })
248    }
249
250    pub fn to_stream(
251        &self,
252        context: Arc<TaskContext>,
253        partition: usize,
254    ) -> Result<SendableRecordBatchStream> {
255        // prepare states to move
256        let regions = self.regions.clone();
257        let region_query_handler = self.region_query_handler.clone();
258        let metric = MergeScanMetric::new(&self.metric);
259        let arrow_schema = self.arrow_schema.clone();
260        let query_ctx = self.query_ctx.clone();
261        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
262        let partition_metrics_moved = self.partition_metrics.clone();
263        let plan = self.plan.clone();
264        let target_partition = self.target_partition;
265        let dbname = context.task_id().unwrap_or_default();
266        let tracing_context = TracingContext::from_json(context.session_id().as_str());
267        let current_channel = self.query_ctx.channel();
268        let read_preference = self.query_ctx.read_preference();
269        let explain_verbose = self.query_ctx.explain_verbose();
270
271        let stream = Box::pin(stream!({
272            // only report metrics once for each MergeScan
273            if partition == 0 {
274                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
275            }
276
277            let _finish_timer = metric.finish_time().timer();
278            let mut ready_timer = metric.ready_time().timer();
279            let mut first_consume_timer = Some(metric.first_consume_time().timer());
280
281            for region_id in regions
282                .iter()
283                .skip(partition)
284                .step_by(target_partition)
285                .copied()
286            {
287                let request = QueryRequest {
288                    header: Some(RegionRequestHeader {
289                        tracing_context: tracing_context.to_w3c(),
290                        dbname: dbname.clone(),
291                        query_context: Some(query_ctx.as_ref().into()),
292                    }),
293                    region_id,
294                    plan: plan.clone(),
295                };
296                let region_start = Instant::now();
297                let do_get_start = Instant::now();
298
299                if explain_verbose {
300                    common_telemetry::info!(
301                        "Merge scan one region, partition: {}, region_id: {}",
302                        partition,
303                        region_id
304                    );
305                }
306
307                let mut stream = region_query_handler
308                    .do_get(read_preference, request)
309                    .await
310                    .map_err(|e| {
311                        MERGE_SCAN_ERRORS_TOTAL.inc();
312                        DataFusionError::External(Box::new(e))
313                    })?;
314                let do_get_cost = do_get_start.elapsed();
315
316                ready_timer.stop();
317
318                let mut poll_duration = Duration::ZERO;
319                let mut poll_timer = Instant::now();
320                while let Some(batch) = stream.next().await {
321                    let poll_elapsed = poll_timer.elapsed();
322                    poll_duration += poll_elapsed;
323
324                    let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
325                    let batch = RecordBatch::try_new(
326                        arrow_schema.clone(),
327                        batch.into_df_record_batch().columns().to_vec(),
328                    )?;
329                    metric.record_output_batch_rows(batch.num_rows());
330                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
331                        first_consume_timer.stop();
332                    }
333
334                    if let Some(metrics) = stream.metrics() {
335                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
336                        sub_stage_metrics.insert(region_id, metrics);
337                    }
338
339                    yield Ok(batch);
340                    // reset poll timer
341                    poll_timer = Instant::now();
342                }
343                let total_cost = region_start.elapsed();
344
345                // Record region metrics and push to global partition_metrics
346                let region_metrics = RegionMetrics {
347                    region_id,
348                    poll_duration,
349                    do_get_cost,
350                    total_cost,
351                };
352
353                // Push RegionMetrics to global partition_metrics immediately after scanning this region
354                {
355                    let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
356                    let partition_metrics = partition_metrics_guard
357                        .entry(partition)
358                        .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
359                    partition_metrics.add_region_metrics(region_metrics);
360                }
361
362                if explain_verbose {
363                    common_telemetry::info!(
364                        "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
365                        partition,
366                        region_id,
367                        poll_duration,
368                        metric.first_consume_time(),
369                        do_get_cost
370                    );
371                }
372
373                // process metrics after all data is drained.
374                if let Some(metrics) = stream.metrics() {
375                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
376                    let value = read_meter!(
377                        c,
378                        s,
379                        ReadItem {
380                            cpu_time: metrics.elapsed_compute as u64,
381                            table_scan: metrics.memory_usage as u64
382                        },
383                        current_channel as u8
384                    );
385                    metric.record_greptime_exec_cost(value as usize);
386
387                    // record metrics from sub sgates
388                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
389                    sub_stage_metrics.insert(region_id, metrics);
390                }
391
392                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
393            }
394
395            // Finish partition metrics and log results
396            {
397                let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
398                if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
399                    partition_metrics.finish();
400                }
401            }
402        }));
403
404        Ok(Box::pin(RecordBatchStreamAdapter::new(
405            self.arrow_schema.clone(),
406            stream,
407        )))
408    }
409
410    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
411        let Distribution::HashPartitioned(hash_exprs) = distribution else {
412            // not applicable
413            return None;
414        };
415
416        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
417            && curr_dist == &hash_exprs
418        {
419            // No need to change the distribution
420            return None;
421        }
422
423        let all_partition_col_aliases: HashSet<_> = self
424            .partition_cols
425            .values()
426            .flat_map(|aliases| aliases.iter().map(|c| c.name()))
427            .collect();
428        let mut overlaps = vec![];
429        for expr in &hash_exprs {
430            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
431                && all_partition_col_aliases.contains(col_expr.name())
432            {
433                overlaps.push(expr.clone());
434            }
435        }
436
437        if overlaps.is_empty() {
438            return None;
439        }
440
441        Some(Self {
442            table: self.table.clone(),
443            regions: self.regions.clone(),
444            plan: self.plan.clone(),
445            arrow_schema: self.arrow_schema.clone(),
446            region_query_handler: self.region_query_handler.clone(),
447            metric: self.metric.clone(),
448            properties: PlanProperties::new(
449                self.properties.eq_properties.clone(),
450                Partitioning::Hash(overlaps, self.target_partition),
451                self.properties.emission_type,
452                self.properties.boundedness,
453            ),
454            sub_stage_metrics: self.sub_stage_metrics.clone(),
455            partition_metrics: self.partition_metrics.clone(),
456            query_ctx: self.query_ctx.clone(),
457            target_partition: self.target_partition,
458            partition_cols: self.partition_cols.clone(),
459        })
460    }
461
462    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
463        self.sub_stage_metrics
464            .lock()
465            .unwrap()
466            .values()
467            .cloned()
468            .collect()
469    }
470
471    pub fn partition_count(&self) -> usize {
472        self.target_partition
473    }
474
475    pub fn region_count(&self) -> usize {
476        self.regions.len()
477    }
478
479    fn partition_metrics(&self) -> Vec<PartitionMetrics> {
480        self.partition_metrics
481            .lock()
482            .unwrap()
483            .values()
484            .cloned()
485            .collect()
486    }
487}
488
489/// Metrics for a region of a partition.
490#[derive(Debug, Clone)]
491struct RegionMetrics {
492    region_id: RegionId,
493    poll_duration: Duration,
494    do_get_cost: Duration,
495    /// Total cost to scan the region.
496    total_cost: Duration,
497}
498
499/// Metrics for a partition of a MergeScanExec.
500#[derive(Debug, Clone)]
501struct PartitionMetrics {
502    partition: usize,
503    region_metrics: Vec<RegionMetrics>,
504    total_poll_duration: Duration,
505    total_do_get_cost: Duration,
506    total_regions: usize,
507    explain_verbose: bool,
508    finished: bool,
509}
510
511impl PartitionMetrics {
512    fn new(partition: usize, explain_verbose: bool) -> Self {
513        Self {
514            partition,
515            region_metrics: Vec::new(),
516            total_poll_duration: Duration::ZERO,
517            total_do_get_cost: Duration::ZERO,
518            total_regions: 0,
519            explain_verbose,
520            finished: false,
521        }
522    }
523
524    fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
525        self.total_poll_duration += region_metrics.poll_duration;
526        self.total_do_get_cost += region_metrics.do_get_cost;
527        self.total_regions += 1;
528        self.region_metrics.push(region_metrics);
529    }
530
531    /// Finish the partition metrics and log the results.
532    fn finish(&mut self) {
533        if self.finished {
534            return;
535        }
536        self.finished = true;
537        self.log_metrics();
538    }
539
540    /// Log partition metrics based on explain_verbose level.
541    fn log_metrics(&self) {
542        if self.explain_verbose {
543            common_telemetry::info!(
544                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
545                self.partition,
546                self.total_regions,
547                self.total_poll_duration,
548                self.total_do_get_cost
549            );
550        } else {
551            common_telemetry::debug!(
552                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
553                self.partition,
554                self.total_regions,
555                self.total_poll_duration,
556                self.total_do_get_cost
557            );
558        }
559    }
560}
561
562impl Drop for PartitionMetrics {
563    fn drop(&mut self) {
564        if !self.finished {
565            self.log_metrics();
566        }
567    }
568}
569
570impl ExecutionPlan for MergeScanExec {
571    fn as_any(&self) -> &dyn Any {
572        self
573    }
574
575    fn schema(&self) -> ArrowSchemaRef {
576        self.arrow_schema.clone()
577    }
578
579    fn properties(&self) -> &PlanProperties {
580        &self.properties
581    }
582
583    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
584        vec![]
585    }
586
587    // DataFusion will swap children unconditionally.
588    // But since this node is leaf node, it's safe to just return self.
589    fn with_new_children(
590        self: Arc<Self>,
591        _children: Vec<Arc<dyn ExecutionPlan>>,
592    ) -> Result<Arc<dyn ExecutionPlan>> {
593        Ok(self.clone())
594    }
595
596    fn execute(
597        &self,
598        partition: usize,
599        context: Arc<TaskContext>,
600    ) -> Result<SendableRecordBatchStream> {
601        self.to_stream(context, partition)
602    }
603
604    fn metrics(&self) -> Option<MetricsSet> {
605        Some(self.metric.clone_inner())
606    }
607
608    fn name(&self) -> &str {
609        "MergeScanExec"
610    }
611}
612
613impl DisplayAs for MergeScanExec {
614    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
615        write!(f, "MergeScanExec: peers=[")?;
616        for region_id in self.regions.iter() {
617            write!(f, "{}, ", region_id)?;
618        }
619        write!(f, "]")?;
620
621        if matches!(t, DisplayFormatType::Verbose) {
622            let partition_metrics = self.partition_metrics();
623            if !partition_metrics.is_empty() {
624                write!(f, ", metrics={{")?;
625                for (i, pm) in partition_metrics.iter().enumerate() {
626                    if i > 0 {
627                        write!(f, ", ")?;
628                    }
629                    write!(
630                        f,
631                        "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
632                        pm.partition,
633                        pm.total_regions,
634                        pm.total_poll_duration,
635                        pm.total_do_get_cost
636                    )?;
637                    for (j, rm) in pm.region_metrics.iter().enumerate() {
638                        if j > 0 {
639                            write!(f, ",")?;
640                        }
641                        write!(
642                            f,
643                            "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
644                            rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
645                        )?;
646                    }
647                    write!(f, "]}}")?;
648                }
649                write!(f, "}}")?;
650            }
651        }
652
653        Ok(())
654    }
655}
656
657#[derive(Debug, Clone)]
658struct MergeScanMetric {
659    /// Nanosecond elapsed till the scan operator is ready to emit data
660    ready_time: Time,
661    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
662    first_consume_time: Time,
663    /// Nanosecond elapsed till the scan operator finished execution
664    finish_time: Time,
665    /// Count of rows fetched from remote
666    output_rows: Count,
667
668    /// Gauge for greptime plan execution cost metrics for output
669    greptime_exec_cost: Gauge,
670}
671
672impl MergeScanMetric {
673    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
674        Self {
675            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
676            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
677            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
678            output_rows: MetricBuilder::new(metric).output_rows(1),
679            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
680        }
681    }
682
683    pub fn ready_time(&self) -> &Time {
684        &self.ready_time
685    }
686
687    pub fn first_consume_time(&self) -> &Time {
688        &self.first_consume_time
689    }
690
691    pub fn finish_time(&self) -> &Time {
692        &self.finish_time
693    }
694
695    pub fn record_output_batch_rows(&self, num_rows: usize) {
696        self.output_rows.add(num_rows);
697    }
698
699    pub fn record_greptime_exec_cost(&self, metrics: usize) {
700        self.greptime_exec_cost.add(metrics);
701    }
702}