query/dist_plan/
merge_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29    DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{
44    Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
45};
46use datatypes::schema::{Schema, SchemaRef};
47use futures_util::StreamExt;
48use greptime_proto::v1::region::RegionRequestHeader;
49use meter_core::data::ReadItem;
50use meter_macros::read_meter;
51use session::context::QueryContextRef;
52use snafu::ResultExt;
53use store_api::storage::RegionId;
54use table::table_name::TableName;
55use tokio::time::Instant;
56
57use crate::error::ConvertSchemaSnafu;
58use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
59use crate::region_query::RegionQueryHandlerRef;
60
61#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
62pub struct MergeScanLogicalPlan {
63    /// In logical plan phase it only contains one input
64    input: LogicalPlan,
65    /// If this plan is a placeholder
66    is_placeholder: bool,
67    partition_cols: Vec<String>,
68}
69
70impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
71    fn name(&self) -> &str {
72        Self::name()
73    }
74
75    // Prevent further optimization.
76    // The input can be retrieved by `self.input()`
77    fn inputs(&self) -> Vec<&LogicalPlan> {
78        vec![]
79    }
80
81    fn schema(&self) -> &datafusion_common::DFSchemaRef {
82        self.input.schema()
83    }
84
85    // Prevent further optimization
86    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
87        vec![]
88    }
89
90    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91        write!(
92            f,
93            "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
94            self.is_placeholder, self.input
95        )
96    }
97
98    fn with_exprs_and_inputs(
99        &self,
100        _exprs: Vec<datafusion::prelude::Expr>,
101        _inputs: Vec<LogicalPlan>,
102    ) -> Result<Self> {
103        Ok(self.clone())
104    }
105}
106
107impl MergeScanLogicalPlan {
108    pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
109        Self {
110            input,
111            is_placeholder,
112            partition_cols,
113        }
114    }
115
116    pub fn name() -> &'static str {
117        "MergeScan"
118    }
119
120    /// Create a [LogicalPlan::Extension] node from this merge scan plan
121    pub fn into_logical_plan(self) -> LogicalPlan {
122        LogicalPlan::Extension(Extension {
123            node: Arc::new(self),
124        })
125    }
126
127    pub fn is_placeholder(&self) -> bool {
128        self.is_placeholder
129    }
130
131    pub fn input(&self) -> &LogicalPlan {
132        &self.input
133    }
134
135    pub fn partition_cols(&self) -> &[String] {
136        &self.partition_cols
137    }
138}
139
140pub struct MergeScanExec {
141    table: TableName,
142    regions: Vec<RegionId>,
143    plan: LogicalPlan,
144    schema: SchemaRef,
145    arrow_schema: ArrowSchemaRef,
146    region_query_handler: RegionQueryHandlerRef,
147    metric: ExecutionPlanMetricsSet,
148    properties: PlanProperties,
149    /// Metrics from sub stages
150    sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
151    query_ctx: QueryContextRef,
152    target_partition: usize,
153    partition_cols: Vec<String>,
154}
155
156impl std::fmt::Debug for MergeScanExec {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("MergeScanExec")
159            .field("table", &self.table)
160            .field("regions", &self.regions)
161            .field("schema", &self.schema)
162            .field("plan", &self.plan)
163            .finish()
164    }
165}
166
167impl MergeScanExec {
168    #[allow(clippy::too_many_arguments)]
169    pub fn new(
170        session_state: &SessionState,
171        table: TableName,
172        regions: Vec<RegionId>,
173        plan: LogicalPlan,
174        arrow_schema: &ArrowSchema,
175        region_query_handler: RegionQueryHandlerRef,
176        query_ctx: QueryContextRef,
177        target_partition: usize,
178        partition_cols: Vec<String>,
179    ) -> Result<Self> {
180        // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
181        // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
182        // Reconsider if it's possible to remove it.
183        let arrow_schema = Arc::new(arrow_schema.clone());
184
185        // States the output ordering of the plan.
186        //
187        // When the input plan is a sort, we can use the sort ordering as the output ordering
188        // if the target partition is greater than the number of regions, which means we won't
189        // break the ordering on merging (of MergeScan).
190        //
191        // Otherwise, we need to use the default ordering.
192        let eq_properties = if let LogicalPlan::Sort(sort) = &plan
193            && target_partition >= regions.len()
194        {
195            let lex_ordering = sort
196                .expr
197                .iter()
198                .map(|sort_expr| {
199                    let physical_expr = session_state
200                        .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
201                    Ok(PhysicalSortExpr::new(
202                        physical_expr,
203                        SortOptions {
204                            descending: !sort_expr.asc,
205                            nulls_first: sort_expr.nulls_first,
206                        },
207                    ))
208                })
209                .collect::<Result<Vec<_>>>()?;
210            EquivalenceProperties::new_with_orderings(
211                arrow_schema.clone(),
212                &[LexOrdering::new(lex_ordering)],
213            )
214        } else {
215            EquivalenceProperties::new(arrow_schema.clone())
216        };
217
218        let partition_exprs = partition_cols
219            .iter()
220            .filter_map(|col| {
221                session_state
222                    .create_physical_expr(
223                        Expr::Column(ColumnExpr::new_unqualified(col)),
224                        plan.schema(),
225                    )
226                    .ok()
227            })
228            .collect();
229        let partitioning = Partitioning::Hash(partition_exprs, target_partition);
230
231        let properties = PlanProperties::new(
232            eq_properties,
233            partitioning,
234            EmissionType::Incremental,
235            Boundedness::Bounded,
236        );
237        let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
238        Ok(Self {
239            table,
240            regions,
241            plan,
242            schema,
243            arrow_schema,
244            region_query_handler,
245            metric: ExecutionPlanMetricsSet::new(),
246            sub_stage_metrics: Arc::default(),
247            properties,
248            query_ctx,
249            target_partition,
250            partition_cols,
251        })
252    }
253
254    pub fn to_stream(
255        &self,
256        context: Arc<TaskContext>,
257        partition: usize,
258    ) -> Result<SendableRecordBatchStream> {
259        // prepare states to move
260        let regions = self.regions.clone();
261        let region_query_handler = self.region_query_handler.clone();
262        let metric = MergeScanMetric::new(&self.metric);
263        let schema = self.schema.clone();
264        let query_ctx = self.query_ctx.clone();
265        let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
266        let plan = self.plan.clone();
267        let target_partition = self.target_partition;
268        let dbname = context.task_id().unwrap_or_default();
269        let tracing_context = TracingContext::from_json(context.session_id().as_str());
270        let current_channel = self.query_ctx.channel();
271        let read_preference = self.query_ctx.read_preference();
272
273        let stream = Box::pin(stream!({
274            // only report metrics once for each MergeScan
275            if partition == 0 {
276                MERGE_SCAN_REGIONS.observe(regions.len() as f64);
277            }
278
279            let _finish_timer = metric.finish_time().timer();
280            let mut ready_timer = metric.ready_time().timer();
281            let mut first_consume_timer = Some(metric.first_consume_time().timer());
282
283            for region_id in regions
284                .iter()
285                .skip(partition)
286                .step_by(target_partition)
287                .copied()
288            {
289                let request = QueryRequest {
290                    header: Some(RegionRequestHeader {
291                        tracing_context: tracing_context.to_w3c(),
292                        dbname: dbname.clone(),
293                        query_context: Some(query_ctx.as_ref().into()),
294                    }),
295                    region_id,
296                    plan: plan.clone(),
297                };
298                let do_get_start = Instant::now();
299                let mut stream = region_query_handler
300                    .do_get(read_preference, request)
301                    .await
302                    .map_err(|e| {
303                        MERGE_SCAN_ERRORS_TOTAL.inc();
304                        BoxedError::new(e)
305                    })
306                    .context(ExternalSnafu)?;
307                let do_get_cost = do_get_start.elapsed();
308
309                ready_timer.stop();
310
311                let mut poll_duration = Duration::ZERO;
312                let mut poll_timer = Instant::now();
313                while let Some(batch) = stream.next().await {
314                    let poll_elapsed = poll_timer.elapsed();
315                    poll_duration += poll_elapsed;
316
317                    let batch = batch?;
318                    // reconstruct batch using `self.schema`
319                    // to remove metadata and correct column name
320                    let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
321                    metric.record_output_batch_rows(batch.num_rows());
322                    if let Some(mut first_consume_timer) = first_consume_timer.take() {
323                        first_consume_timer.stop();
324                    }
325
326                    if let Some(metrics) = stream.metrics() {
327                        let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
328                        sub_stage_metrics.insert(region_id, metrics);
329                    }
330
331                    yield Ok(batch);
332                    // reset poll timer
333                    poll_timer = Instant::now();
334                }
335                common_telemetry::debug!(
336                    "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
337                    partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
338                );
339
340                // process metrics after all data is drained.
341                if let Some(metrics) = stream.metrics() {
342                    let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
343                    let value = read_meter!(
344                        c,
345                        s,
346                        ReadItem {
347                            cpu_time: metrics.elapsed_compute as u64,
348                            table_scan: metrics.memory_usage as u64
349                        },
350                        current_channel as u8
351                    );
352                    metric.record_greptime_exec_cost(value as usize);
353
354                    // record metrics from sub sgates
355                    let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
356                    sub_stage_metrics.insert(region_id, metrics);
357                }
358
359                MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
360            }
361        }));
362
363        Ok(Box::pin(RecordBatchStreamWrapper {
364            schema: self.schema.clone(),
365            stream,
366            output_ordering: None,
367            metrics: Default::default(),
368        }))
369    }
370
371    pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
372        let Distribution::HashPartitioned(hash_exprs) = distribution else {
373            // not applicable
374            return None;
375        };
376
377        if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
378            && curr_dist == &hash_exprs
379        {
380            // No need to change the distribution
381            return None;
382        }
383
384        let mut hash_cols = HashSet::default();
385        for expr in &hash_exprs {
386            if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
387                hash_cols.insert(col_expr.name());
388            }
389        }
390        for col in &self.partition_cols {
391            if !hash_cols.contains(col.as_str()) {
392                // The partitioning columns are not the same
393                return None;
394            }
395        }
396
397        Some(Self {
398            table: self.table.clone(),
399            regions: self.regions.clone(),
400            plan: self.plan.clone(),
401            schema: self.schema.clone(),
402            arrow_schema: self.arrow_schema.clone(),
403            region_query_handler: self.region_query_handler.clone(),
404            metric: self.metric.clone(),
405            properties: PlanProperties::new(
406                self.properties.eq_properties.clone(),
407                Partitioning::Hash(hash_exprs, self.target_partition),
408                self.properties.emission_type,
409                self.properties.boundedness,
410            ),
411            sub_stage_metrics: self.sub_stage_metrics.clone(),
412            query_ctx: self.query_ctx.clone(),
413            target_partition: self.target_partition,
414            partition_cols: self.partition_cols.clone(),
415        })
416    }
417
418    fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
419        let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
420        Ok(Arc::new(schema))
421    }
422
423    pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
424        self.sub_stage_metrics
425            .lock()
426            .unwrap()
427            .values()
428            .cloned()
429            .collect()
430    }
431
432    pub fn partition_count(&self) -> usize {
433        self.target_partition
434    }
435
436    pub fn region_count(&self) -> usize {
437        self.regions.len()
438    }
439}
440
441impl ExecutionPlan for MergeScanExec {
442    fn as_any(&self) -> &dyn Any {
443        self
444    }
445
446    fn schema(&self) -> ArrowSchemaRef {
447        self.arrow_schema.clone()
448    }
449
450    fn properties(&self) -> &PlanProperties {
451        &self.properties
452    }
453
454    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
455        vec![]
456    }
457
458    // DataFusion will swap children unconditionally.
459    // But since this node is leaf node, it's safe to just return self.
460    fn with_new_children(
461        self: Arc<Self>,
462        _children: Vec<Arc<dyn ExecutionPlan>>,
463    ) -> Result<Arc<dyn ExecutionPlan>> {
464        Ok(self.clone())
465    }
466
467    fn execute(
468        &self,
469        partition: usize,
470        context: Arc<TaskContext>,
471    ) -> Result<DfSendableRecordBatchStream> {
472        Ok(Box::pin(DfRecordBatchStreamAdapter::new(
473            self.to_stream(context, partition)?,
474        )))
475    }
476
477    fn metrics(&self) -> Option<MetricsSet> {
478        Some(self.metric.clone_inner())
479    }
480
481    fn name(&self) -> &str {
482        "MergeScanExec"
483    }
484}
485
486impl DisplayAs for MergeScanExec {
487    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
488        write!(f, "MergeScanExec: peers=[")?;
489        for region_id in self.regions.iter() {
490            write!(f, "{}, ", region_id)?;
491        }
492        write!(f, "]")
493    }
494}
495
496#[derive(Debug, Clone)]
497struct MergeScanMetric {
498    /// Nanosecond elapsed till the scan operator is ready to emit data
499    ready_time: Time,
500    /// Nanosecond elapsed till the first record batch emitted from the scan operator gets consumed
501    first_consume_time: Time,
502    /// Nanosecond elapsed till the scan operator finished execution
503    finish_time: Time,
504    /// Count of rows fetched from remote
505    output_rows: Count,
506
507    /// Gauge for greptime plan execution cost metrics for output
508    greptime_exec_cost: Gauge,
509}
510
511impl MergeScanMetric {
512    pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
513        Self {
514            ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
515            first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
516            finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
517            output_rows: MetricBuilder::new(metric).output_rows(1),
518            greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
519        }
520    }
521
522    pub fn ready_time(&self) -> &Time {
523        &self.ready_time
524    }
525
526    pub fn first_consume_time(&self) -> &Time {
527        &self.first_consume_time
528    }
529
530    pub fn finish_time(&self) -> &Time {
531        &self.finish_time
532    }
533
534    pub fn record_output_batch_rows(&self, num_rows: usize) {
535        self.output_rows.add(num_rows);
536    }
537
538    pub fn record_greptime_exec_cost(&self, metrics: usize) {
539        self.greptime_exec_cost.add(metrics);
540    }
541}