query/
analyze.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Customized `ANALYZE` plan that aware of [MergeScanExec].
16//!
17//! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs`
18
19use std::any::Any;
20use std::fmt::Display;
21use std::sync::Arc;
22
23use ahash::HashMap;
24use arrow::array::{StringBuilder, UInt32Builder};
25use arrow_schema::{DataType, Field, Schema, SchemaRef};
26use common_recordbatch::adapter::{MetricCollector, PlanMetrics, RecordBatchMetrics};
27use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
28use datafusion::error::Result as DfResult;
29use datafusion::execution::TaskContext;
30use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
32use datafusion::physical_plan::{
33    accept, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
36use datafusion_common::{internal_err, DataFusionError};
37use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning};
38use futures::StreamExt;
39use serde::Serialize;
40use sqlparser::ast::AnalyzeFormat;
41
42use crate::dist_plan::MergeScanExec;
43
44const STAGE: &str = "stage";
45const NODE: &str = "node";
46const PLAN: &str = "plan";
47
48#[derive(Debug)]
49pub struct DistAnalyzeExec {
50    input: Arc<dyn ExecutionPlan>,
51    schema: SchemaRef,
52    properties: PlanProperties,
53    verbose: bool,
54    format: AnalyzeFormat,
55}
56
57impl DistAnalyzeExec {
58    /// Create a new DistAnalyzeExec
59    pub fn new(input: Arc<dyn ExecutionPlan>, verbose: bool, format: AnalyzeFormat) -> Self {
60        let schema = SchemaRef::new(Schema::new(vec![
61            Field::new(STAGE, DataType::UInt32, true),
62            Field::new(NODE, DataType::UInt32, true),
63            Field::new(PLAN, DataType::Utf8, true),
64        ]));
65        let properties = Self::compute_properties(&input, schema.clone());
66        Self {
67            input,
68            schema,
69            properties,
70            verbose,
71            format,
72        }
73    }
74
75    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
76    fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
77        let eq_properties = EquivalenceProperties::new(schema);
78        let output_partitioning = Partitioning::UnknownPartitioning(1);
79        let properties = input.properties();
80        PlanProperties::new(
81            eq_properties,
82            output_partitioning,
83            properties.emission_type,
84            properties.boundedness,
85        )
86    }
87}
88
89impl DisplayAs for DistAnalyzeExec {
90    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91        match t {
92            DisplayFormatType::Default | DisplayFormatType::Verbose => {
93                write!(f, "DistAnalyzeExec",)
94            }
95        }
96    }
97}
98
99impl ExecutionPlan for DistAnalyzeExec {
100    fn name(&self) -> &'static str {
101        "DistAnalyzeExec"
102    }
103
104    /// Return a reference to Any that can be used for downcasting
105    fn as_any(&self) -> &dyn Any {
106        self
107    }
108
109    fn properties(&self) -> &PlanProperties {
110        &self.properties
111    }
112
113    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
114        vec![&self.input]
115    }
116
117    /// AnalyzeExec is handled specially so this value is ignored
118    fn required_input_distribution(&self) -> Vec<Distribution> {
119        vec![]
120    }
121
122    fn with_new_children(
123        self: Arc<Self>,
124        mut children: Vec<Arc<dyn ExecutionPlan>>,
125    ) -> DfResult<Arc<dyn ExecutionPlan>> {
126        Ok(Arc::new(Self::new(
127            children.pop().unwrap(),
128            self.verbose,
129            self.format,
130        )))
131    }
132
133    fn execute(
134        &self,
135        partition: usize,
136        context: Arc<TaskContext>,
137    ) -> DfResult<DfSendableRecordBatchStream> {
138        if 0 != partition {
139            return internal_err!("AnalyzeExec invalid partition. Expected 0, got {partition}");
140        }
141
142        // Wrap the input plan using `CoalescePartitionsExec` to poll multiple
143        // partitions in parallel
144        let coalesce_partition_plan = CoalescePartitionsExec::new(self.input.clone());
145
146        // Create future that computes thefinal output
147        let captured_input = self.input.clone();
148        let captured_schema = self.schema.clone();
149
150        // Finish the input stream and create the output
151        let format = self.format;
152        let verbose = self.verbose;
153        let mut input_stream = coalesce_partition_plan.execute(0, context)?;
154        let output = async move {
155            let mut total_rows = 0;
156            while let Some(batch) = input_stream.next().await.transpose()? {
157                total_rows += batch.num_rows();
158            }
159
160            create_output_batch(total_rows, captured_input, captured_schema, format, verbose)
161        };
162
163        Ok(Box::pin(RecordBatchStreamAdapter::new(
164            self.schema.clone(),
165            futures::stream::once(output),
166        )))
167    }
168}
169
170/// Build the result [`DfRecordBatch`] of `ANALYZE`
171struct AnalyzeOutputBuilder {
172    stage_builder: UInt32Builder,
173    node_builder: UInt32Builder,
174    plan_builder: StringBuilder,
175    schema: SchemaRef,
176}
177
178impl AnalyzeOutputBuilder {
179    fn new(schema: SchemaRef) -> Self {
180        Self {
181            stage_builder: UInt32Builder::with_capacity(4),
182            node_builder: UInt32Builder::with_capacity(4),
183            plan_builder: StringBuilder::with_capacity(1, 1024),
184            schema,
185        }
186    }
187
188    fn append_metric(&mut self, stage: u32, node: u32, content: String) {
189        self.stage_builder.append_value(stage);
190        self.node_builder.append_value(node);
191        self.plan_builder.append_value(content);
192    }
193
194    fn append_total_rows(&mut self, total_rows: usize) {
195        self.stage_builder.append_null();
196        self.node_builder.append_null();
197        self.plan_builder
198            .append_value(format!("Total rows: {}", total_rows));
199    }
200
201    fn finish(mut self) -> DfResult<DfRecordBatch> {
202        DfRecordBatch::try_new(
203            self.schema,
204            vec![
205                Arc::new(self.stage_builder.finish()),
206                Arc::new(self.node_builder.finish()),
207                Arc::new(self.plan_builder.finish()),
208            ],
209        )
210        .map_err(DataFusionError::from)
211    }
212}
213
214/// Creates the output of AnalyzeExec as a RecordBatch
215fn create_output_batch(
216    total_rows: usize,
217    input: Arc<dyn ExecutionPlan>,
218    schema: SchemaRef,
219    format: AnalyzeFormat,
220    verbose: bool,
221) -> DfResult<DfRecordBatch> {
222    let mut builder = AnalyzeOutputBuilder::new(schema);
223
224    // Treat the current stage as stage 0. Fetch its metrics
225    let mut collector = MetricCollector::new(verbose);
226    // Safety: metric collector won't return error
227    accept(input.as_ref(), &mut collector).unwrap();
228    let stage_0_metrics = collector.record_batch_metrics;
229
230    // Append the metrics of the current stage
231    builder.append_metric(0, 0, metrics_to_string(stage_0_metrics, format)?);
232
233    // Find merge scan and append its sub_stage_metrics
234    input.apply(|plan| {
235        if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
236            let sub_stage_metrics = merge_scan.sub_stage_metrics();
237            for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
238                builder.append_metric(1, node as _, metrics_to_string(metric, format)?);
239            }
240            return Ok(TreeNodeRecursion::Stop);
241        }
242        Ok(TreeNodeRecursion::Continue)
243    })?;
244
245    // Write total rows
246    builder.append_total_rows(total_rows);
247
248    builder.finish()
249}
250
251fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfResult<String> {
252    match format {
253        AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
254        AnalyzeFormat::TEXT => Ok(metrics.to_string()),
255        AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
256            "GRAPHVIZ format is not supported for metrics output".to_string(),
257        )),
258    }
259}
260
261#[derive(Debug, Default, Serialize)]
262struct JsonMetrics {
263    name: String,
264    param: String,
265
266    // well-known metrics
267    output_rows: usize,
268    // busy time in nanoseconds
269    elapsed_compute: usize,
270
271    // other metrics
272    metrics: HashMap<String, usize>,
273    children: Vec<JsonMetrics>,
274}
275
276impl JsonMetrics {
277    fn from_record_batch_metrics(record_batch_metrics: RecordBatchMetrics) -> Self {
278        let mut layers: HashMap<usize, Vec<Self>> = HashMap::default();
279
280        for plan_metrics in record_batch_metrics.plan_metrics.into_iter().rev() {
281            let (level, mut metrics) = Self::from_plan_metrics(plan_metrics);
282            if let Some(next_layer) = layers.remove(&(level + 1)) {
283                metrics.children = next_layer;
284            }
285            if level == 0 {
286                return metrics;
287            }
288            layers.entry(level).or_default().push(metrics);
289        }
290
291        // Unreachable path. Each metrics should contains at least one level 0.
292        Self::default()
293    }
294
295    /// Convert a [`PlanMetrics`] to a [`JsonMetrics`] without children.
296    ///
297    /// Returns the level of the plan and the [`JsonMetrics`].
298    fn from_plan_metrics(plan_metrics: PlanMetrics) -> (usize, Self) {
299        let raw_name = plan_metrics.plan.trim_end();
300        let mut elapsed_compute = 0;
301        let mut output_rows = 0;
302        let mut other_metrics = HashMap::default();
303        let (name, param) = raw_name.split_once(": ").unwrap_or_default();
304
305        for (name, value) in plan_metrics.metrics.into_iter() {
306            if name == "elapsed_compute" {
307                elapsed_compute = value;
308            } else if name == "output_rows" {
309                output_rows = value;
310            } else {
311                other_metrics.insert(name, value);
312            }
313        }
314
315        (
316            plan_metrics.level,
317            Self {
318                name: name.to_string(),
319                param: param.to_string(),
320                output_rows,
321                elapsed_compute,
322                metrics: other_metrics,
323                children: vec![],
324            },
325        )
326    }
327}
328
329impl Display for JsonMetrics {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        write!(f, "{}", serde_json::to_string(self).unwrap())
332    }
333}