query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::HashSet;
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29    DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{
44    Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
45};
46use datatypes::schema::{Schema, SchemaRef};
47use futures_util::StreamExt;
48use greptime_proto::v1::region::RegionRequestHeader;
49use meter_core::data::ReadItem;
50use meter_macros::read_meter;
51use session::context::QueryContextRef;
52use snafu::ResultExt;
53use store_api::storage::RegionId;
54use table::table_name::TableName;
55use tokio::time::Instant;
56
57use crate::error::ConvertSchemaSnafu;
58use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
59use crate::region_query::RegionQueryHandlerRef;
60
61#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
62pub struct MergeScanLogicalPlan {
63    /// In logical plan phase it only contains one input
64    input: LogicalPlan,
65    /// If this plan is a placeholder
66    is_placeholder: bool,
67    partition_cols: Vec<String>,
68}
69
70impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
71    fn name(&self) -> &str {
72        Self::name()
73    }
74
75    // Prevent further optimization.
76    // The input can be retrieved by `self.input()`
77    fn inputs(&self) -> Vec<&LogicalPlan> {
78        vec![]
79    }
80
81    fn schema(&self) -> &datafusion_common::DFSchemaRef {
82        self.input.schema()
83    }
84
85    // Prevent further optimization
86    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
87        vec![]
88    }
89
90    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91        write!(f, "MergeScan [is_placeholder={}]", self.is_placeholder)
92    }
93
94    fn with_exprs_and_inputs(
95        &self,
96        _exprs: Vec<datafusion::prelude::Expr>,
97        _inputs: Vec<LogicalPlan>,
98    ) -> Result<Self> {
99        Ok(self.clone())
100    }
101}
102
103impl MergeScanLogicalPlan {
104    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
105        Self {
106            input,
107            is_placeholder,
108            partition_cols,
109        }
110    }
111
112    pub fn name() -> &'static str {
113        "MergeScan"
114    }
115
116    /// Create a [LogicalPlan::Extension] node from this merge scan plan
117    pub fn into_logical_plan(self) -> LogicalPlan {
118        LogicalPlan::Extension(Extension {
119            node: Arc::new(self),
120        })
121    }
122
123    pub fn is_placeholder(&self) -> bool {
124        self.is_placeholder
125    }
126
127    pub fn input(&self) -> &LogicalPlan {
128        &self.input
129    }
130
131    pub fn partition_cols(&self) -> &[String] {
132        &self.partition_cols
133    }
134}
135
136pub struct MergeScanExec {
137    table: TableName,
138    regions: Vec<RegionId>,
139    plan: LogicalPlan,
140    schema: SchemaRef,
141    arrow_schema: ArrowSchemaRef,
142    region_query_handler: RegionQueryHandlerRef,
143    metric: ExecutionPlanMetricsSet,
144    properties: PlanProperties,
145    /// Metrics from sub stages
146    sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
147    query_ctx: QueryContextRef,
148    target_partition: usize,
149    partition_cols: Vec<String>,
150}
151
152impl std::fmt::Debug for MergeScanExec {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("MergeScanExec")
155            .field("table", &self.table)
156            .field("regions", &self.regions)
157            .field("schema", &self.schema)
158            .finish()
159    }
160}
161
162impl MergeScanExec {
163    #[allow(clippy::too_many_arguments)]
164    pub fn new(
165        session_state: &SessionState,
166        table: TableName,
167        regions: Vec<RegionId>,
168        plan: LogicalPlan,
169        arrow_schema: &ArrowSchema,
170        region_query_handler: RegionQueryHandlerRef,
171        query_ctx: QueryContextRef,
172        target_partition: usize,
173        partition_cols: Vec<String>,
174    ) -> Result<Self> {
175        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
176        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
177        // Reconsider if it's possible to remove it.
178        let arrow_schema = Arc::new(arrow_schema.clone());
179
180        // States the output ordering of the plan.
181        //
182        // When the input plan is a sort, we can use the sort ordering as the output ordering
183        // if the target partition is greater than the number of regions, which means we won't
184        // break the ordering on merging (of MergeScan).
185        //
186        // Otherwise, we need to use the default ordering.
187        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
188            && target_partition >= regions.len()
189        {
190            let lex_ordering = sort
191                .expr
192                .iter()
193                .map(|sort_expr| {
194                    let physical_expr = session_state
195                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
196                    Ok(PhysicalSortExpr::new(
197                        physical_expr,
198                        SortOptions {
199                            descending: !sort_expr.asc,
200                            nulls_first: sort_expr.nulls_first,
201                        },
202                    ))
203                })
204                .collect::<Result<Vec<_>>>()?;
205            EquivalenceProperties::new_with_orderings(
206                arrow_schema.clone(),
207                &[LexOrdering::new(lex_ordering)],
208            )
209        } else {
210            EquivalenceProperties::new(arrow_schema.clone())
211        };
212
213        let partition_exprs = partition_cols
214            .iter()
215            .filter_map(|col| {
216                session_state
217                    .create_physical_expr(
218                        Expr::Column(ColumnExpr::new_unqualified(col)),
219                        plan.schema(),
220                    )
221                    .ok()
222            })
223            .collect();
224        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
225
226        let properties = PlanProperties::new(
227            eq_properties,
228            partitioning,
229            EmissionType::Incremental,
230            Boundedness::Bounded,
231        );
232        let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
233        Ok(Self {
234            table,
235            regions,
236            plan,
237            schema,
238            arrow_schema,
239            region_query_handler,
240            metric: ExecutionPlanMetricsSet::new(),
241            sub_stage_metrics: Arc::default(),
242            properties,
243            query_ctx,
244            target_partition,
245            partition_cols,
246        })
247    }
248
249    pub fn to_stream(
250        &self,
251        context: Arc<TaskContext>,
252        partition: usize,
253    ) -> Result<SendableRecordBatchStream> {
254        // prepare states to move
255        let regions = self.regions.clone();
256        let region_query_handler = self.region_query_handler.clone();
257        let metric = MergeScanMetric::new(&self.metric);
258        let schema = self.schema.clone();
259        let query_ctx = self.query_ctx.clone();
260        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
261        let plan = self.plan.clone();
262        let target_partition = self.target_partition;
263        let dbname = context.task_id().unwrap_or_default();
264        let tracing_context = TracingContext::from_json(context.session_id().as_str());
265        let current_channel = self.query_ctx.channel();
266        let read_preference = self.query_ctx.read_preference();
267
268        let stream = Box::pin(stream!({
269            // only report metrics once for each MergeScan
270            if partition == 0 {
271                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
272            }
273
274            let _finish_timer = metric.finish_time().timer();
275            let mut ready_timer = metric.ready_time().timer();
276            let mut first_consume_timer = Some(metric.first_consume_time().timer());
277
278            for region_id in regions
279                .iter()
280                .skip(partition)
281                .step_by(target_partition)
282                .copied()
283            {
284                let request = QueryRequest {
285                    header: Some(RegionRequestHeader {
286                        tracing_context: tracing_context.to_w3c(),
287                        dbname: dbname.clone(),
288                        query_context: Some(query_ctx.as_ref().into()),
289                    }),
290                    region_id,
291                    plan: plan.clone(),
292                };
293                let do_get_start = Instant::now();
294                let mut stream = region_query_handler
295                    .do_get(read_preference, request)
296                    .await
297                    .map_err(|e| {
298                        MERGE_SCAN_ERRORS_TOTAL.inc();
299                        BoxedError::new(e)
300                    })
301                    .context(ExternalSnafu)?;
302                let do_get_cost = do_get_start.elapsed();
303
304                ready_timer.stop();
305
306                let mut poll_duration = Duration::ZERO;
307                let mut poll_timer = Instant::now();
308                while let Some(batch) = stream.next().await {
309                    let poll_elapsed = poll_timer.elapsed();
310                    poll_duration += poll_elapsed;
311
312                    let batch = batch?;
313                    // reconstruct batch using `self.schema`
314                    // to remove metadata and correct column name
315                    let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
316                    metric.record_output_batch_rows(batch.num_rows());
317                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
318                        first_consume_timer.stop();
319                    }
320                    yield Ok(batch);
321                    // reset poll timer
322                    poll_timer = Instant::now();
323                }
324                common_telemetry::debug!(
325                    "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
326                    partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
327                );
328
329                // process metrics after all data is drained.
330                if let Some(metrics) = stream.metrics() {
331                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
332                    let value = read_meter!(
333                        c,
334                        s,
335                        ReadItem {
336                            cpu_time: metrics.elapsed_compute as u64,
337                            table_scan: metrics.memory_usage as u64
338                        },
339                        current_channel as u8
340                    );
341                    metric.record_greptime_exec_cost(value as usize);
342
343                    // record metrics from sub sgates
344                    sub_stage_metrics_moved.lock().unwrap().push(metrics);
345                }
346
347                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
348            }
349        }));
350
351        Ok(Box::pin(RecordBatchStreamWrapper {
352            schema: self.schema.clone(),
353            stream,
354            output_ordering: None,
355            metrics: Default::default(),
356        }))
357    }
358
359    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
360        let Distribution::HashPartitioned(hash_exprs) = distribution else {
361            // not applicable
362            return None;
363        };
364
365        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
366            && curr_dist == &hash_exprs
367        {
368            // No need to change the distribution
369            return None;
370        }
371
372        let mut hash_cols = HashSet::default();
373        for expr in &hash_exprs {
374            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
375                hash_cols.insert(col_expr.name());
376            }
377        }
378        for col in &self.partition_cols {
379            if !hash_cols.contains(col.as_str()) {
380                // The partitioning columns are not the same
381                return None;
382            }
383        }
384
385        Some(Self {
386            table: self.table.clone(),
387            regions: self.regions.clone(),
388            plan: self.plan.clone(),
389            schema: self.schema.clone(),
390            arrow_schema: self.arrow_schema.clone(),
391            region_query_handler: self.region_query_handler.clone(),
392            metric: self.metric.clone(),
393            properties: PlanProperties::new(
394                self.properties.eq_properties.clone(),
395                Partitioning::Hash(hash_exprs, self.target_partition),
396                self.properties.emission_type,
397                self.properties.boundedness,
398            ),
399            sub_stage_metrics: self.sub_stage_metrics.clone(),
400            query_ctx: self.query_ctx.clone(),
401            target_partition: self.target_partition,
402            partition_cols: self.partition_cols.clone(),
403        })
404    }
405
406    fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
407        let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
408        Ok(Arc::new(schema))
409    }
410
411    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
412        self.sub_stage_metrics.lock().unwrap().clone()
413    }
414
415    pub fn partition_count(&self) -> usize {
416        self.target_partition
417    }
418
419    pub fn region_count(&self) -> usize {
420        self.regions.len()
421    }
422}
423
424impl ExecutionPlan for MergeScanExec {
425    fn as_any(&self) -> &dyn Any {
426        self
427    }
428
429    fn schema(&self) -> ArrowSchemaRef {
430        self.arrow_schema.clone()
431    }
432
433    fn properties(&self) -> &PlanProperties {
434        &self.properties
435    }
436
437    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
438        vec![]
439    }
440
441    // DataFusion will swap children unconditionally.
442    // But since this node is leaf node, it's safe to just return self.
443    fn with_new_children(
444        self: Arc<Self>,
445        _children: Vec<Arc<dyn ExecutionPlan>>,
446    ) -> Result<Arc<dyn ExecutionPlan>> {
447        Ok(self.clone())
448    }
449
450    fn execute(
451        &self,
452        partition: usize,
453        context: Arc<TaskContext>,
454    ) -> Result<DfSendableRecordBatchStream> {
455        Ok(Box::pin(DfRecordBatchStreamAdapter::new(
456            self.to_stream(context, partition)?,
457        )))
458    }
459
460    fn metrics(&self) -> Option<MetricsSet> {
461        Some(self.metric.clone_inner())
462    }
463
464    fn name(&self) -> &str {
465        "MergeScanExec"
466    }
467}
468
469impl DisplayAs for MergeScanExec {
470    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
471        write!(f, "MergeScanExec: peers=[")?;
472        for region_id in self.regions.iter() {
473            write!(f, "{}, ", region_id)?;
474        }
475        write!(f, "]")
476    }
477}
478
479#[derive(Debug, Clone)]
480struct MergeScanMetric {
481    /// Nanosecond elapsed till the scan operator is ready to emit data
482    ready_time: Time,
483    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
484    first_consume_time: Time,
485    /// Nanosecond elapsed till the scan operator finished execution
486    finish_time: Time,
487    /// Count of rows fetched from remote
488    output_rows: Count,
489
490    /// Gauge for greptime plan execution cost metrics for output
491    greptime_exec_cost: Gauge,
492}
493
494impl MergeScanMetric {
495    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
496        Self {
497            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
498            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
499            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
500            output_rows: MetricBuilder::new(metric).output_rows(1),
501            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
502        }
503    }
504
505    pub fn ready_time(&self) -> &Time {
506        &self.ready_time
507    }
508
509    pub fn first_consume_time(&self) -> &Time {
510        &self.first_consume_time
511    }
512
513    pub fn finish_time(&self) -> &Time {
514        &self.finish_time
515    }
516
517    pub fn record_output_batch_rows(&self, num_rows: usize) {
518        self.output_rows.add(num_rows);
519    }
520
521    pub fn record_greptime_exec_cost(&self, metrics: usize) {
522        self.greptime_exec_cost.add(metrics);
523    }
524}