query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29    DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use datatypes::schema::{Schema, SchemaRef};
45use futures_util::StreamExt;
46use greptime_proto::v1::region::RegionRequestHeader;
47use meter_core::data::ReadItem;
48use meter_macros::read_meter;
49use session::context::QueryContextRef;
50use snafu::ResultExt;
51use store_api::storage::RegionId;
52use table::table_name::TableName;
53use tokio::time::Instant;
54
55use crate::error::ConvertSchemaSnafu;
56use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
57use crate::region_query::RegionQueryHandlerRef;
58
59#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
60pub struct MergeScanLogicalPlan {
61    /// In logical plan phase it only contains one input
62    input: LogicalPlan,
63    /// If this plan is a placeholder
64    is_placeholder: bool,
65    partition_cols: Vec<String>,
66}
67
68impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
69    fn name(&self) -> &str {
70        Self::name()
71    }
72
73    // Prevent further optimization.
74    // The input can be retrieved by `self.input()`
75    fn inputs(&self) -> Vec<&LogicalPlan> {
76        vec![]
77    }
78
79    fn schema(&self) -> &datafusion_common::DFSchemaRef {
80        self.input.schema()
81    }
82
83    // Prevent further optimization
84    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
85        vec![]
86    }
87
88    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
89        write!(
90            f,
91            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
92            self.is_placeholder, self.input
93        )
94    }
95
96    fn with_exprs_and_inputs(
97        &self,
98        _exprs: Vec<datafusion::prelude::Expr>,
99        _inputs: Vec<LogicalPlan>,
100    ) -> Result<Self> {
101        Ok(self.clone())
102    }
103}
104
105impl MergeScanLogicalPlan {
106    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
107        Self {
108            input,
109            is_placeholder,
110            partition_cols,
111        }
112    }
113
114    pub fn name() -> &'static str {
115        "MergeScan"
116    }
117
118    /// Create a [LogicalPlan::Extension] node from this merge scan plan
119    pub fn into_logical_plan(self) -> LogicalPlan {
120        LogicalPlan::Extension(Extension {
121            node: Arc::new(self),
122        })
123    }
124
125    pub fn is_placeholder(&self) -> bool {
126        self.is_placeholder
127    }
128
129    pub fn input(&self) -> &LogicalPlan {
130        &self.input
131    }
132
133    pub fn partition_cols(&self) -> &[String] {
134        &self.partition_cols
135    }
136}
137
138pub struct MergeScanExec {
139    table: TableName,
140    regions: Vec<RegionId>,
141    plan: LogicalPlan,
142    schema: SchemaRef,
143    arrow_schema: ArrowSchemaRef,
144    region_query_handler: RegionQueryHandlerRef,
145    metric: ExecutionPlanMetricsSet,
146    properties: PlanProperties,
147    /// Metrics from sub stages
148    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
149    /// Metrics for each partition
150    partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
151    query_ctx: QueryContextRef,
152    target_partition: usize,
153    partition_cols: Vec<String>,
154}
155
156impl std::fmt::Debug for MergeScanExec {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("MergeScanExec")
159            .field("table", &self.table)
160            .field("regions", &self.regions)
161            .field("schema", &self.schema)
162            .field("plan", &self.plan)
163            .finish()
164    }
165}
166
167impl MergeScanExec {
168    #[allow(clippy::too_many_arguments)]
169    pub fn new(
170        session_state: &SessionState,
171        table: TableName,
172        regions: Vec<RegionId>,
173        plan: LogicalPlan,
174        arrow_schema: &ArrowSchema,
175        region_query_handler: RegionQueryHandlerRef,
176        query_ctx: QueryContextRef,
177        target_partition: usize,
178        partition_cols: Vec<String>,
179    ) -> Result<Self> {
180        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
181        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
182        // Reconsider if it's possible to remove it.
183        let arrow_schema = Arc::new(arrow_schema.clone());
184
185        // States the output ordering of the plan.
186        //
187        // When the input plan is a sort, we can use the sort ordering as the output ordering
188        // if the target partition is greater than the number of regions, which means we won't
189        // break the ordering on merging (of MergeScan).
190        //
191        // Otherwise, we need to use the default ordering.
192        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
193            && target_partition >= regions.len()
194        {
195            let lex_ordering = sort
196                .expr
197                .iter()
198                .map(|sort_expr| {
199                    let physical_expr = session_state
200                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
201                    Ok(PhysicalSortExpr::new(
202                        physical_expr,
203                        SortOptions {
204                            descending: !sort_expr.asc,
205                            nulls_first: sort_expr.nulls_first,
206                        },
207                    ))
208                })
209                .collect::<Result<Vec<_>>>()?;
210            EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
211        } else {
212            EquivalenceProperties::new(arrow_schema.clone())
213        };
214
215        let partition_exprs = partition_cols
216            .iter()
217            .filter_map(|col| {
218                session_state
219                    .create_physical_expr(
220                        Expr::Column(ColumnExpr::new_unqualified(col)),
221                        plan.schema(),
222                    )
223                    .ok()
224            })
225            .collect();
226        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
227
228        let properties = PlanProperties::new(
229            eq_properties,
230            partitioning,
231            EmissionType::Incremental,
232            Boundedness::Bounded,
233        );
234        let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
235        Ok(Self {
236            table,
237            regions,
238            plan,
239            schema,
240            arrow_schema,
241            region_query_handler,
242            metric: ExecutionPlanMetricsSet::new(),
243            sub_stage_metrics: Arc::default(),
244            partition_metrics: Arc::default(),
245            properties,
246            query_ctx,
247            target_partition,
248            partition_cols,
249        })
250    }
251
252    pub fn to_stream(
253        &self,
254        context: Arc<TaskContext>,
255        partition: usize,
256    ) -> Result<SendableRecordBatchStream> {
257        // prepare states to move
258        let regions = self.regions.clone();
259        let region_query_handler = self.region_query_handler.clone();
260        let metric = MergeScanMetric::new(&self.metric);
261        let schema = self.schema.clone();
262        let query_ctx = self.query_ctx.clone();
263        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
264        let partition_metrics_moved = self.partition_metrics.clone();
265        let plan = self.plan.clone();
266        let target_partition = self.target_partition;
267        let dbname = context.task_id().unwrap_or_default();
268        let tracing_context = TracingContext::from_json(context.session_id().as_str());
269        let current_channel = self.query_ctx.channel();
270        let read_preference = self.query_ctx.read_preference();
271        let explain_verbose = self.query_ctx.explain_verbose();
272
273        let stream = Box::pin(stream!({
274            // only report metrics once for each MergeScan
275            if partition == 0 {
276                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
277            }
278
279            let _finish_timer = metric.finish_time().timer();
280            let mut ready_timer = metric.ready_time().timer();
281            let mut first_consume_timer = Some(metric.first_consume_time().timer());
282
283            for region_id in regions
284                .iter()
285                .skip(partition)
286                .step_by(target_partition)
287                .copied()
288            {
289                let request = QueryRequest {
290                    header: Some(RegionRequestHeader {
291                        tracing_context: tracing_context.to_w3c(),
292                        dbname: dbname.clone(),
293                        query_context: Some(query_ctx.as_ref().into()),
294                    }),
295                    region_id,
296                    plan: plan.clone(),
297                };
298                let region_start = Instant::now();
299                let do_get_start = Instant::now();
300
301                if explain_verbose {
302                    common_telemetry::info!(
303                        "Merge scan one region, partition: {}, region_id: {}",
304                        partition,
305                        region_id
306                    );
307                }
308
309                let mut stream = region_query_handler
310                    .do_get(read_preference, request)
311                    .await
312                    .map_err(|e| {
313                        MERGE_SCAN_ERRORS_TOTAL.inc();
314                        BoxedError::new(e)
315                    })
316                    .context(ExternalSnafu)?;
317                let do_get_cost = do_get_start.elapsed();
318
319                ready_timer.stop();
320
321                let mut poll_duration = Duration::ZERO;
322                let mut poll_timer = Instant::now();
323                while let Some(batch) = stream.next().await {
324                    let poll_elapsed = poll_timer.elapsed();
325                    poll_duration += poll_elapsed;
326
327                    let batch = batch?;
328                    // reconstruct batch using `self.schema`
329                    // to remove metadata and correct column name
330                    let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
331                    metric.record_output_batch_rows(batch.num_rows());
332                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
333                        first_consume_timer.stop();
334                    }
335
336                    if let Some(metrics) = stream.metrics() {
337                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
338                        sub_stage_metrics.insert(region_id, metrics);
339                    }
340
341                    yield Ok(batch);
342                    // reset poll timer
343                    poll_timer = Instant::now();
344                }
345                let total_cost = region_start.elapsed();
346
347                // Record region metrics and push to global partition_metrics
348                let region_metrics = RegionMetrics {
349                    region_id,
350                    poll_duration,
351                    do_get_cost,
352                    total_cost,
353                };
354
355                // Push RegionMetrics to global partition_metrics immediately after scanning this region
356                {
357                    let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
358                    let partition_metrics = partition_metrics_guard
359                        .entry(partition)
360                        .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
361                    partition_metrics.add_region_metrics(region_metrics);
362                }
363
364                if explain_verbose {
365                    common_telemetry::info!(
366                        "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
367                        partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
368                    );
369                }
370
371                // process metrics after all data is drained.
372                if let Some(metrics) = stream.metrics() {
373                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
374                    let value = read_meter!(
375                        c,
376                        s,
377                        ReadItem {
378                            cpu_time: metrics.elapsed_compute as u64,
379                            table_scan: metrics.memory_usage as u64
380                        },
381                        current_channel as u8
382                    );
383                    metric.record_greptime_exec_cost(value as usize);
384
385                    // record metrics from sub sgates
386                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
387                    sub_stage_metrics.insert(region_id, metrics);
388                }
389
390                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
391            }
392
393            // Finish partition metrics and log results
394            {
395                let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
396                if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
397                    partition_metrics.finish();
398                }
399            }
400        }));
401
402        Ok(Box::pin(RecordBatchStreamWrapper {
403            schema: self.schema.clone(),
404            stream,
405            output_ordering: None,
406            metrics: Default::default(),
407        }))
408    }
409
410    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
411        let Distribution::HashPartitioned(hash_exprs) = distribution else {
412            // not applicable
413            return None;
414        };
415
416        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
417            && curr_dist == &hash_exprs
418        {
419            // No need to change the distribution
420            return None;
421        }
422
423        let mut hash_cols = HashSet::default();
424        for expr in &hash_exprs {
425            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
426                hash_cols.insert(col_expr.name());
427            }
428        }
429        for col in &self.partition_cols {
430            if !hash_cols.contains(col.as_str()) {
431                // The partitioning columns are not the same
432                return None;
433            }
434        }
435
436        Some(Self {
437            table: self.table.clone(),
438            regions: self.regions.clone(),
439            plan: self.plan.clone(),
440            schema: self.schema.clone(),
441            arrow_schema: self.arrow_schema.clone(),
442            region_query_handler: self.region_query_handler.clone(),
443            metric: self.metric.clone(),
444            properties: PlanProperties::new(
445                self.properties.eq_properties.clone(),
446                Partitioning::Hash(hash_exprs, self.target_partition),
447                self.properties.emission_type,
448                self.properties.boundedness,
449            ),
450            sub_stage_metrics: self.sub_stage_metrics.clone(),
451            partition_metrics: self.partition_metrics.clone(),
452            query_ctx: self.query_ctx.clone(),
453            target_partition: self.target_partition,
454            partition_cols: self.partition_cols.clone(),
455        })
456    }
457
458    fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
459        let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
460        Ok(Arc::new(schema))
461    }
462
463    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
464        self.sub_stage_metrics
465            .lock()
466            .unwrap()
467            .values()
468            .cloned()
469            .collect()
470    }
471
472    pub fn partition_count(&self) -> usize {
473        self.target_partition
474    }
475
476    pub fn region_count(&self) -> usize {
477        self.regions.len()
478    }
479
480    fn partition_metrics(&self) -> Vec<PartitionMetrics> {
481        self.partition_metrics
482            .lock()
483            .unwrap()
484            .values()
485            .cloned()
486            .collect()
487    }
488}
489
490/// Metrics for a region of a partition.
491#[derive(Debug, Clone)]
492struct RegionMetrics {
493    region_id: RegionId,
494    poll_duration: Duration,
495    do_get_cost: Duration,
496    /// Total cost to scan the region.
497    total_cost: Duration,
498}
499
500/// Metrics for a partition of a MergeScanExec.
501#[derive(Debug, Clone)]
502struct PartitionMetrics {
503    partition: usize,
504    region_metrics: Vec<RegionMetrics>,
505    total_poll_duration: Duration,
506    total_do_get_cost: Duration,
507    total_regions: usize,
508    explain_verbose: bool,
509    finished: bool,
510}
511
512impl PartitionMetrics {
513    fn new(partition: usize, explain_verbose: bool) -> Self {
514        Self {
515            partition,
516            region_metrics: Vec::new(),
517            total_poll_duration: Duration::ZERO,
518            total_do_get_cost: Duration::ZERO,
519            total_regions: 0,
520            explain_verbose,
521            finished: false,
522        }
523    }
524
525    fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
526        self.total_poll_duration += region_metrics.poll_duration;
527        self.total_do_get_cost += region_metrics.do_get_cost;
528        self.total_regions += 1;
529        self.region_metrics.push(region_metrics);
530    }
531
532    /// Finish the partition metrics and log the results.
533    fn finish(&mut self) {
534        if self.finished {
535            return;
536        }
537        self.finished = true;
538        self.log_metrics();
539    }
540
541    /// Log partition metrics based on explain_verbose level.
542    fn log_metrics(&self) {
543        if self.explain_verbose {
544            common_telemetry::info!(
545                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
546                self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
547            );
548        } else {
549            common_telemetry::debug!(
550                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
551                self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
552            );
553        }
554    }
555}
556
557impl Drop for PartitionMetrics {
558    fn drop(&mut self) {
559        if !self.finished {
560            self.log_metrics();
561        }
562    }
563}
564
565impl ExecutionPlan for MergeScanExec {
566    fn as_any(&self) -> &dyn Any {
567        self
568    }
569
570    fn schema(&self) -> ArrowSchemaRef {
571        self.arrow_schema.clone()
572    }
573
574    fn properties(&self) -> &PlanProperties {
575        &self.properties
576    }
577
578    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
579        vec![]
580    }
581
582    // DataFusion will swap children unconditionally.
583    // But since this node is leaf node, it's safe to just return self.
584    fn with_new_children(
585        self: Arc<Self>,
586        _children: Vec<Arc<dyn ExecutionPlan>>,
587    ) -> Result<Arc<dyn ExecutionPlan>> {
588        Ok(self.clone())
589    }
590
591    fn execute(
592        &self,
593        partition: usize,
594        context: Arc<TaskContext>,
595    ) -> Result<DfSendableRecordBatchStream> {
596        Ok(Box::pin(DfRecordBatchStreamAdapter::new(
597            self.to_stream(context, partition)?,
598        )))
599    }
600
601    fn metrics(&self) -> Option<MetricsSet> {
602        Some(self.metric.clone_inner())
603    }
604
605    fn name(&self) -> &str {
606        "MergeScanExec"
607    }
608}
609
610impl DisplayAs for MergeScanExec {
611    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
612        write!(f, "MergeScanExec: peers=[")?;
613        for region_id in self.regions.iter() {
614            write!(f, "{}, ", region_id)?;
615        }
616        write!(f, "]")?;
617
618        if matches!(t, DisplayFormatType::Verbose) {
619            let partition_metrics = self.partition_metrics();
620            if !partition_metrics.is_empty() {
621                write!(f, ", metrics={{")?;
622                for (i, pm) in partition_metrics.iter().enumerate() {
623                    if i > 0 {
624                        write!(f, ", ")?;
625                    }
626                    write!(f, "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
627                           pm.partition, pm.total_regions,
628                           pm.total_poll_duration,
629                           pm.total_do_get_cost)?;
630                    for (j, rm) in pm.region_metrics.iter().enumerate() {
631                        if j > 0 {
632                            write!(f, ",")?;
633                        }
634                        write!(f, "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
635                               rm.region_id,
636                               rm.poll_duration,
637                               rm.do_get_cost,
638                               rm.total_cost)?;
639                    }
640                    write!(f, "]}}")?;
641                }
642                write!(f, "}}")?;
643            }
644        }
645
646        Ok(())
647    }
648}
649
650#[derive(Debug, Clone)]
651struct MergeScanMetric {
652    /// Nanosecond elapsed till the scan operator is ready to emit data
653    ready_time: Time,
654    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
655    first_consume_time: Time,
656    /// Nanosecond elapsed till the scan operator finished execution
657    finish_time: Time,
658    /// Count of rows fetched from remote
659    output_rows: Count,
660
661    /// Gauge for greptime plan execution cost metrics for output
662    greptime_exec_cost: Gauge,
663}
664
665impl MergeScanMetric {
666    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
667        Self {
668            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
669            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
670            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
671            output_rows: MetricBuilder::new(metric).output_rows(1),
672            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
673        }
674    }
675
676    pub fn ready_time(&self) -> &Time {
677        &self.ready_time
678    }
679
680    pub fn first_consume_time(&self) -> &Time {
681        &self.first_consume_time
682    }
683
684    pub fn finish_time(&self) -> &Time {
685        &self.finish_time
686    }
687
688    pub fn record_output_batch_rows(&self, num_rows: usize) {
689        self.output_rows.add(num_rows);
690    }
691
692    pub fn record_greptime_exec_cost(&self, metrics: usize) {
693        self.greptime_exec_cost.add(metrics);
694    }
695}