query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29    DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use datatypes::schema::{Schema, SchemaRef};
45use futures_util::StreamExt;
46use greptime_proto::v1::region::RegionRequestHeader;
47use meter_core::data::ReadItem;
48use meter_macros::read_meter;
49use session::context::QueryContextRef;
50use snafu::ResultExt;
51use store_api::storage::RegionId;
52use table::table_name::TableName;
53use tokio::time::Instant;
54
55use crate::dist_plan::analyzer::AliasMapping;
56use crate::error::ConvertSchemaSnafu;
57use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
58use crate::region_query::RegionQueryHandlerRef;
59
60#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
61pub struct MergeScanLogicalPlan {
62    /// In logical plan phase it only contains one input
63    input: LogicalPlan,
64    /// If this plan is a placeholder
65    is_placeholder: bool,
66    partition_cols: AliasMapping,
67}
68
69impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
70    fn name(&self) -> &str {
71        Self::name()
72    }
73
74    // Prevent further optimization.
75    // The input can be retrieved by `self.input()`
76    fn inputs(&self) -> Vec<&LogicalPlan> {
77        vec![]
78    }
79
80    fn schema(&self) -> &datafusion_common::DFSchemaRef {
81        self.input.schema()
82    }
83
84    // Prevent further optimization
85    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
86        vec![]
87    }
88
89    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
90        write!(
91            f,
92            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
93            self.is_placeholder, self.input
94        )
95    }
96
97    fn with_exprs_and_inputs(
98        &self,
99        _exprs: Vec<datafusion::prelude::Expr>,
100        _inputs: Vec<LogicalPlan>,
101    ) -> Result<Self> {
102        Ok(self.clone())
103    }
104}
105
106impl MergeScanLogicalPlan {
107    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
108        Self {
109            input,
110            is_placeholder,
111            partition_cols,
112        }
113    }
114
115    pub fn name() -> &'static str {
116        "MergeScan"
117    }
118
119    /// Create a [LogicalPlan::Extension] node from this merge scan plan
120    pub fn into_logical_plan(self) -> LogicalPlan {
121        LogicalPlan::Extension(Extension {
122            node: Arc::new(self),
123        })
124    }
125
126    pub fn is_placeholder(&self) -> bool {
127        self.is_placeholder
128    }
129
130    pub fn input(&self) -> &LogicalPlan {
131        &self.input
132    }
133
134    pub fn partition_cols(&self) -> &AliasMapping {
135        &self.partition_cols
136    }
137}
138
139pub struct MergeScanExec {
140    table: TableName,
141    regions: Vec<RegionId>,
142    plan: LogicalPlan,
143    schema: SchemaRef,
144    arrow_schema: ArrowSchemaRef,
145    region_query_handler: RegionQueryHandlerRef,
146    metric: ExecutionPlanMetricsSet,
147    properties: PlanProperties,
148    /// Metrics from sub stages
149    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
150    /// Metrics for each partition
151    partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
152    query_ctx: QueryContextRef,
153    target_partition: usize,
154    partition_cols: AliasMapping,
155}
156
157impl std::fmt::Debug for MergeScanExec {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("MergeScanExec")
160            .field("table", &self.table)
161            .field("regions", &self.regions)
162            .field("schema", &self.schema)
163            .field("plan", &self.plan)
164            .finish()
165    }
166}
167
168impl MergeScanExec {
169    #[allow(clippy::too_many_arguments)]
170    pub fn new(
171        session_state: &SessionState,
172        table: TableName,
173        regions: Vec<RegionId>,
174        plan: LogicalPlan,
175        arrow_schema: &ArrowSchema,
176        region_query_handler: RegionQueryHandlerRef,
177        query_ctx: QueryContextRef,
178        target_partition: usize,
179        partition_cols: AliasMapping,
180    ) -> Result<Self> {
181        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
182        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
183        // Reconsider if it's possible to remove it.
184        let arrow_schema = Arc::new(arrow_schema.clone());
185
186        // States the output ordering of the plan.
187        //
188        // When the input plan is a sort, we can use the sort ordering as the output ordering
189        // if the target partition is greater than the number of regions, which means we won't
190        // break the ordering on merging (of MergeScan).
191        //
192        // Otherwise, we need to use the default ordering.
193        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
194            && target_partition >= regions.len()
195        {
196            let lex_ordering = sort
197                .expr
198                .iter()
199                .map(|sort_expr| {
200                    let physical_expr = session_state
201                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
202                    Ok(PhysicalSortExpr::new(
203                        physical_expr,
204                        SortOptions {
205                            descending: !sort_expr.asc,
206                            nulls_first: sort_expr.nulls_first,
207                        },
208                    ))
209                })
210                .collect::<Result<Vec<_>>>()?;
211            EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
212        } else {
213            EquivalenceProperties::new(arrow_schema.clone())
214        };
215
216        let partition_exprs = partition_cols
217            .iter()
218            .filter_map(|col| {
219                if let Some(first_alias) = col.1.first() {
220                    session_state
221                        .create_physical_expr(
222                            Expr::Column(ColumnExpr::new_unqualified(
223                                first_alias.name().to_string(),
224                            )),
225                            plan.schema(),
226                        )
227                        .ok()
228                } else {
229                    None
230                }
231            })
232            .collect();
233        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
234
235        let properties = PlanProperties::new(
236            eq_properties,
237            partitioning,
238            EmissionType::Incremental,
239            Boundedness::Bounded,
240        );
241        let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
242        Ok(Self {
243            table,
244            regions,
245            plan,
246            schema,
247            arrow_schema,
248            region_query_handler,
249            metric: ExecutionPlanMetricsSet::new(),
250            sub_stage_metrics: Arc::default(),
251            partition_metrics: Arc::default(),
252            properties,
253            query_ctx,
254            target_partition,
255            partition_cols,
256        })
257    }
258
259    pub fn to_stream(
260        &self,
261        context: Arc<TaskContext>,
262        partition: usize,
263    ) -> Result<SendableRecordBatchStream> {
264        // prepare states to move
265        let regions = self.regions.clone();
266        let region_query_handler = self.region_query_handler.clone();
267        let metric = MergeScanMetric::new(&self.metric);
268        let schema = self.schema.clone();
269        let query_ctx = self.query_ctx.clone();
270        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
271        let partition_metrics_moved = self.partition_metrics.clone();
272        let plan = self.plan.clone();
273        let target_partition = self.target_partition;
274        let dbname = context.task_id().unwrap_or_default();
275        let tracing_context = TracingContext::from_json(context.session_id().as_str());
276        let current_channel = self.query_ctx.channel();
277        let read_preference = self.query_ctx.read_preference();
278        let explain_verbose = self.query_ctx.explain_verbose();
279
280        let stream = Box::pin(stream!({
281            // only report metrics once for each MergeScan
282            if partition == 0 {
283                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
284            }
285
286            let _finish_timer = metric.finish_time().timer();
287            let mut ready_timer = metric.ready_time().timer();
288            let mut first_consume_timer = Some(metric.first_consume_time().timer());
289
290            for region_id in regions
291                .iter()
292                .skip(partition)
293                .step_by(target_partition)
294                .copied()
295            {
296                let request = QueryRequest {
297                    header: Some(RegionRequestHeader {
298                        tracing_context: tracing_context.to_w3c(),
299                        dbname: dbname.clone(),
300                        query_context: Some(query_ctx.as_ref().into()),
301                    }),
302                    region_id,
303                    plan: plan.clone(),
304                };
305                let region_start = Instant::now();
306                let do_get_start = Instant::now();
307
308                if explain_verbose {
309                    common_telemetry::info!(
310                        "Merge scan one region, partition: {}, region_id: {}",
311                        partition,
312                        region_id
313                    );
314                }
315
316                let mut stream = region_query_handler
317                    .do_get(read_preference, request)
318                    .await
319                    .map_err(|e| {
320                        MERGE_SCAN_ERRORS_TOTAL.inc();
321                        BoxedError::new(e)
322                    })
323                    .context(ExternalSnafu)?;
324                let do_get_cost = do_get_start.elapsed();
325
326                ready_timer.stop();
327
328                let mut poll_duration = Duration::ZERO;
329                let mut poll_timer = Instant::now();
330                while let Some(batch) = stream.next().await {
331                    let poll_elapsed = poll_timer.elapsed();
332                    poll_duration += poll_elapsed;
333
334                    let batch = batch?;
335                    // reconstruct batch using `self.schema`
336                    // to remove metadata and correct column name
337                    let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
338                    metric.record_output_batch_rows(batch.num_rows());
339                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
340                        first_consume_timer.stop();
341                    }
342
343                    if let Some(metrics) = stream.metrics() {
344                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
345                        sub_stage_metrics.insert(region_id, metrics);
346                    }
347
348                    yield Ok(batch);
349                    // reset poll timer
350                    poll_timer = Instant::now();
351                }
352                let total_cost = region_start.elapsed();
353
354                // Record region metrics and push to global partition_metrics
355                let region_metrics = RegionMetrics {
356                    region_id,
357                    poll_duration,
358                    do_get_cost,
359                    total_cost,
360                };
361
362                // Push RegionMetrics to global partition_metrics immediately after scanning this region
363                {
364                    let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
365                    let partition_metrics = partition_metrics_guard
366                        .entry(partition)
367                        .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
368                    partition_metrics.add_region_metrics(region_metrics);
369                }
370
371                if explain_verbose {
372                    common_telemetry::info!(
373                        "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
374                        partition,
375                        region_id,
376                        poll_duration,
377                        metric.first_consume_time(),
378                        do_get_cost
379                    );
380                }
381
382                // process metrics after all data is drained.
383                if let Some(metrics) = stream.metrics() {
384                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
385                    let value = read_meter!(
386                        c,
387                        s,
388                        ReadItem {
389                            cpu_time: metrics.elapsed_compute as u64,
390                            table_scan: metrics.memory_usage as u64
391                        },
392                        current_channel as u8
393                    );
394                    metric.record_greptime_exec_cost(value as usize);
395
396                    // record metrics from sub sgates
397                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
398                    sub_stage_metrics.insert(region_id, metrics);
399                }
400
401                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
402            }
403
404            // Finish partition metrics and log results
405            {
406                let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
407                if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
408                    partition_metrics.finish();
409                }
410            }
411        }));
412
413        Ok(Box::pin(RecordBatchStreamWrapper {
414            schema: self.schema.clone(),
415            stream,
416            output_ordering: None,
417            metrics: Default::default(),
418        }))
419    }
420
421    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
422        let Distribution::HashPartitioned(hash_exprs) = distribution else {
423            // not applicable
424            return None;
425        };
426
427        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
428            && curr_dist == &hash_exprs
429        {
430            // No need to change the distribution
431            return None;
432        }
433
434        let all_partition_col_aliases: HashSet<_> = self
435            .partition_cols
436            .values()
437            .flat_map(|aliases| aliases.iter().map(|c| c.name()))
438            .collect();
439        let mut overlaps = vec![];
440        for expr in &hash_exprs {
441            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
442                && all_partition_col_aliases.contains(col_expr.name())
443            {
444                overlaps.push(expr.clone());
445            }
446        }
447
448        if overlaps.is_empty() {
449            return None;
450        }
451
452        Some(Self {
453            table: self.table.clone(),
454            regions: self.regions.clone(),
455            plan: self.plan.clone(),
456            schema: self.schema.clone(),
457            arrow_schema: self.arrow_schema.clone(),
458            region_query_handler: self.region_query_handler.clone(),
459            metric: self.metric.clone(),
460            properties: PlanProperties::new(
461                self.properties.eq_properties.clone(),
462                Partitioning::Hash(overlaps, self.target_partition),
463                self.properties.emission_type,
464                self.properties.boundedness,
465            ),
466            sub_stage_metrics: self.sub_stage_metrics.clone(),
467            partition_metrics: self.partition_metrics.clone(),
468            query_ctx: self.query_ctx.clone(),
469            target_partition: self.target_partition,
470            partition_cols: self.partition_cols.clone(),
471        })
472    }
473
474    fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
475        let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
476        Ok(Arc::new(schema))
477    }
478
479    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
480        self.sub_stage_metrics
481            .lock()
482            .unwrap()
483            .values()
484            .cloned()
485            .collect()
486    }
487
488    pub fn partition_count(&self) -> usize {
489        self.target_partition
490    }
491
492    pub fn region_count(&self) -> usize {
493        self.regions.len()
494    }
495
496    fn partition_metrics(&self) -> Vec<PartitionMetrics> {
497        self.partition_metrics
498            .lock()
499            .unwrap()
500            .values()
501            .cloned()
502            .collect()
503    }
504}
505
506/// Metrics for a region of a partition.
507#[derive(Debug, Clone)]
508struct RegionMetrics {
509    region_id: RegionId,
510    poll_duration: Duration,
511    do_get_cost: Duration,
512    /// Total cost to scan the region.
513    total_cost: Duration,
514}
515
516/// Metrics for a partition of a MergeScanExec.
517#[derive(Debug, Clone)]
518struct PartitionMetrics {
519    partition: usize,
520    region_metrics: Vec<RegionMetrics>,
521    total_poll_duration: Duration,
522    total_do_get_cost: Duration,
523    total_regions: usize,
524    explain_verbose: bool,
525    finished: bool,
526}
527
528impl PartitionMetrics {
529    fn new(partition: usize, explain_verbose: bool) -> Self {
530        Self {
531            partition,
532            region_metrics: Vec::new(),
533            total_poll_duration: Duration::ZERO,
534            total_do_get_cost: Duration::ZERO,
535            total_regions: 0,
536            explain_verbose,
537            finished: false,
538        }
539    }
540
541    fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
542        self.total_poll_duration += region_metrics.poll_duration;
543        self.total_do_get_cost += region_metrics.do_get_cost;
544        self.total_regions += 1;
545        self.region_metrics.push(region_metrics);
546    }
547
548    /// Finish the partition metrics and log the results.
549    fn finish(&mut self) {
550        if self.finished {
551            return;
552        }
553        self.finished = true;
554        self.log_metrics();
555    }
556
557    /// Log partition metrics based on explain_verbose level.
558    fn log_metrics(&self) {
559        if self.explain_verbose {
560            common_telemetry::info!(
561                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
562                self.partition,
563                self.total_regions,
564                self.total_poll_duration,
565                self.total_do_get_cost
566            );
567        } else {
568            common_telemetry::debug!(
569                "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
570                self.partition,
571                self.total_regions,
572                self.total_poll_duration,
573                self.total_do_get_cost
574            );
575        }
576    }
577}
578
579impl Drop for PartitionMetrics {
580    fn drop(&mut self) {
581        if !self.finished {
582            self.log_metrics();
583        }
584    }
585}
586
587impl ExecutionPlan for MergeScanExec {
588    fn as_any(&self) -> &dyn Any {
589        self
590    }
591
592    fn schema(&self) -> ArrowSchemaRef {
593        self.arrow_schema.clone()
594    }
595
596    fn properties(&self) -> &PlanProperties {
597        &self.properties
598    }
599
600    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
601        vec![]
602    }
603
604    // DataFusion will swap children unconditionally.
605    // But since this node is leaf node, it's safe to just return self.
606    fn with_new_children(
607        self: Arc<Self>,
608        _children: Vec<Arc<dyn ExecutionPlan>>,
609    ) -> Result<Arc<dyn ExecutionPlan>> {
610        Ok(self.clone())
611    }
612
613    fn execute(
614        &self,
615        partition: usize,
616        context: Arc<TaskContext>,
617    ) -> Result<DfSendableRecordBatchStream> {
618        Ok(Box::pin(DfRecordBatchStreamAdapter::new(
619            self.to_stream(context, partition)?,
620        )))
621    }
622
623    fn metrics(&self) -> Option<MetricsSet> {
624        Some(self.metric.clone_inner())
625    }
626
627    fn name(&self) -> &str {
628        "MergeScanExec"
629    }
630}
631
632impl DisplayAs for MergeScanExec {
633    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
634        write!(f, "MergeScanExec: peers=[")?;
635        for region_id in self.regions.iter() {
636            write!(f, "{}, ", region_id)?;
637        }
638        write!(f, "]")?;
639
640        if matches!(t, DisplayFormatType::Verbose) {
641            let partition_metrics = self.partition_metrics();
642            if !partition_metrics.is_empty() {
643                write!(f, ", metrics={{")?;
644                for (i, pm) in partition_metrics.iter().enumerate() {
645                    if i > 0 {
646                        write!(f, ", ")?;
647                    }
648                    write!(
649                        f,
650                        "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
651                        pm.partition,
652                        pm.total_regions,
653                        pm.total_poll_duration,
654                        pm.total_do_get_cost
655                    )?;
656                    for (j, rm) in pm.region_metrics.iter().enumerate() {
657                        if j > 0 {
658                            write!(f, ",")?;
659                        }
660                        write!(
661                            f,
662                            "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
663                            rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
664                        )?;
665                    }
666                    write!(f, "]}}")?;
667                }
668                write!(f, "}}")?;
669            }
670        }
671
672        Ok(())
673    }
674}
675
676#[derive(Debug, Clone)]
677struct MergeScanMetric {
678    /// Nanosecond elapsed till the scan operator is ready to emit data
679    ready_time: Time,
680    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
681    first_consume_time: Time,
682    /// Nanosecond elapsed till the scan operator finished execution
683    finish_time: Time,
684    /// Count of rows fetched from remote
685    output_rows: Count,
686
687    /// Gauge for greptime plan execution cost metrics for output
688    greptime_exec_cost: Gauge,
689}
690
691impl MergeScanMetric {
692    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
693        Self {
694            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
695            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
696            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
697            output_rows: MetricBuilder::new(metric).output_rows(1),
698            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
699        }
700    }
701
702    pub fn ready_time(&self) -> &Time {
703        &self.ready_time
704    }
705
706    pub fn first_consume_time(&self) -> &Time {
707        &self.first_consume_time
708    }
709
710    pub fn finish_time(&self) -> &Time {
711        &self.finish_time
712    }
713
714    pub fn record_output_batch_rows(&self, num_rows: usize) {
715        self.output_rows.add(num_rows);
716    }
717
718    pub fn record_greptime_exec_cost(&self, metrics: usize) {
719        self.greptime_exec_cost.add(metrics);
720    }
721}