query/
analyze.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Customized `ANALYZE` plan that aware of [MergeScanExec].
16//!
17//! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs`
18
19use std::any::Any;
20use std::fmt::Display;
21use std::sync::Arc;
22
23use ahash::HashMap;
24use arrow::array::{StringBuilder, UInt32Builder};
25use arrow_schema::{DataType, Field, Schema, SchemaRef};
26use common_recordbatch::adapter::{MetricCollector, PlanMetrics, RecordBatchMetrics};
27use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
28use datafusion::error::Result as DfResult;
29use datafusion::execution::TaskContext;
30use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
32use datafusion::physical_plan::{
33    accept, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
36use datafusion_common::{internal_err, DataFusionError};
37use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning};
38use futures::StreamExt;
39use serde::Serialize;
40use sqlparser::ast::AnalyzeFormat;
41
42use crate::dist_plan::MergeScanExec;
43
44const STAGE: &str = "stage";
45const NODE: &str = "node";
46const PLAN: &str = "plan";
47
48#[derive(Debug)]
49pub struct DistAnalyzeExec {
50    input: Arc<dyn ExecutionPlan>,
51    schema: SchemaRef,
52    properties: PlanProperties,
53    verbose: bool,
54    format: AnalyzeFormat,
55}
56
57impl DistAnalyzeExec {
58    /// Create a new DistAnalyzeExec
59    pub fn new(input: Arc<dyn ExecutionPlan>, verbose: bool, format: AnalyzeFormat) -> Self {
60        let schema = SchemaRef::new(Schema::new(vec![
61            Field::new(STAGE, DataType::UInt32, true),
62            Field::new(NODE, DataType::UInt32, true),
63            Field::new(PLAN, DataType::Utf8, true),
64        ]));
65        let properties = Self::compute_properties(&input, schema.clone());
66        Self {
67            input,
68            schema,
69            properties,
70            verbose,
71            format,
72        }
73    }
74
75    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
76    fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
77        let eq_properties = EquivalenceProperties::new(schema);
78        let output_partitioning = Partitioning::UnknownPartitioning(1);
79        let properties = input.properties();
80        PlanProperties::new(
81            eq_properties,
82            output_partitioning,
83            properties.emission_type,
84            properties.boundedness,
85        )
86    }
87}
88
89impl DisplayAs for DistAnalyzeExec {
90    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91        match t {
92            DisplayFormatType::Default
93            | DisplayFormatType::Verbose
94            | DisplayFormatType::TreeRender => {
95                write!(f, "DistAnalyzeExec",)
96            }
97        }
98    }
99}
100
101impl ExecutionPlan for DistAnalyzeExec {
102    fn name(&self) -> &'static str {
103        "DistAnalyzeExec"
104    }
105
106    /// Return a reference to Any that can be used for downcasting
107    fn as_any(&self) -> &dyn Any {
108        self
109    }
110
111    fn properties(&self) -> &PlanProperties {
112        &self.properties
113    }
114
115    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
116        vec![&self.input]
117    }
118
119    /// AnalyzeExec is handled specially so this value is ignored
120    fn required_input_distribution(&self) -> Vec<Distribution> {
121        vec![]
122    }
123
124    fn with_new_children(
125        self: Arc<Self>,
126        mut children: Vec<Arc<dyn ExecutionPlan>>,
127    ) -> DfResult<Arc<dyn ExecutionPlan>> {
128        Ok(Arc::new(Self::new(
129            children.pop().unwrap(),
130            self.verbose,
131            self.format,
132        )))
133    }
134
135    fn execute(
136        &self,
137        partition: usize,
138        context: Arc<TaskContext>,
139    ) -> DfResult<DfSendableRecordBatchStream> {
140        if 0 != partition {
141            return internal_err!("AnalyzeExec invalid partition. Expected 0, got {partition}");
142        }
143
144        // Wrap the input plan using `CoalescePartitionsExec` to poll multiple
145        // partitions in parallel
146        let coalesce_partition_plan = CoalescePartitionsExec::new(self.input.clone());
147
148        // Create future that computes thefinal output
149        let captured_input = self.input.clone();
150        let captured_schema = self.schema.clone();
151
152        // Finish the input stream and create the output
153        let format = self.format;
154        let verbose = self.verbose;
155        let mut input_stream = coalesce_partition_plan.execute(0, context)?;
156        let output = async move {
157            let mut total_rows = 0;
158            while let Some(batch) = input_stream.next().await.transpose()? {
159                total_rows += batch.num_rows();
160            }
161
162            create_output_batch(total_rows, captured_input, captured_schema, format, verbose)
163        };
164
165        Ok(Box::pin(RecordBatchStreamAdapter::new(
166            self.schema.clone(),
167            futures::stream::once(output),
168        )))
169    }
170}
171
172/// Build the result [`DfRecordBatch`] of `ANALYZE`
173struct AnalyzeOutputBuilder {
174    stage_builder: UInt32Builder,
175    node_builder: UInt32Builder,
176    plan_builder: StringBuilder,
177    schema: SchemaRef,
178}
179
180impl AnalyzeOutputBuilder {
181    fn new(schema: SchemaRef) -> Self {
182        Self {
183            stage_builder: UInt32Builder::with_capacity(4),
184            node_builder: UInt32Builder::with_capacity(4),
185            plan_builder: StringBuilder::with_capacity(1, 1024),
186            schema,
187        }
188    }
189
190    fn append_metric(&mut self, stage: u32, node: u32, content: String) {
191        self.stage_builder.append_value(stage);
192        self.node_builder.append_value(node);
193        self.plan_builder.append_value(content);
194    }
195
196    fn append_total_rows(&mut self, total_rows: usize) {
197        self.stage_builder.append_null();
198        self.node_builder.append_null();
199        self.plan_builder
200            .append_value(format!("Total rows: {}", total_rows));
201    }
202
203    fn finish(mut self) -> DfResult<DfRecordBatch> {
204        DfRecordBatch::try_new(
205            self.schema,
206            vec![
207                Arc::new(self.stage_builder.finish()),
208                Arc::new(self.node_builder.finish()),
209                Arc::new(self.plan_builder.finish()),
210            ],
211        )
212        .map_err(DataFusionError::from)
213    }
214}
215
216/// Creates the output of AnalyzeExec as a RecordBatch
217fn create_output_batch(
218    total_rows: usize,
219    input: Arc<dyn ExecutionPlan>,
220    schema: SchemaRef,
221    format: AnalyzeFormat,
222    verbose: bool,
223) -> DfResult<DfRecordBatch> {
224    let mut builder = AnalyzeOutputBuilder::new(schema);
225
226    // Treat the current stage as stage 0. Fetch its metrics
227    let mut collector = MetricCollector::new(verbose);
228    // Safety: metric collector won't return error
229    accept(input.as_ref(), &mut collector).unwrap();
230    let stage_0_metrics = collector.record_batch_metrics;
231
232    // Append the metrics of the current stage
233    builder.append_metric(0, 0, metrics_to_string(stage_0_metrics, format)?);
234
235    // Find merge scan and append its sub_stage_metrics
236    input.apply(|plan| {
237        if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
238            let sub_stage_metrics = merge_scan.sub_stage_metrics();
239            for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
240                builder.append_metric(1, node as _, metrics_to_string(metric, format)?);
241            }
242            // might have multiple merge scans, so continue
243            return Ok(TreeNodeRecursion::Continue);
244        }
245        Ok(TreeNodeRecursion::Continue)
246    })?;
247
248    // Write total rows
249    builder.append_total_rows(total_rows);
250
251    builder.finish()
252}
253
254fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfResult<String> {
255    match format {
256        AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
257        AnalyzeFormat::TEXT => Ok(metrics.to_string()),
258        AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
259            "GRAPHVIZ format is not supported for metrics output".to_string(),
260        )),
261    }
262}
263
264#[derive(Debug, Default, Serialize)]
265struct JsonMetrics {
266    name: String,
267    param: String,
268
269    // well-known metrics
270    output_rows: usize,
271    // busy time in nanoseconds
272    elapsed_compute: usize,
273
274    // other metrics
275    metrics: HashMap<String, usize>,
276    children: Vec<JsonMetrics>,
277}
278
279impl JsonMetrics {
280    fn from_record_batch_metrics(record_batch_metrics: RecordBatchMetrics) -> Self {
281        let mut layers: HashMap<usize, Vec<Self>> = HashMap::default();
282
283        for plan_metrics in record_batch_metrics.plan_metrics.into_iter().rev() {
284            let (level, mut metrics) = Self::from_plan_metrics(plan_metrics);
285            if let Some(next_layer) = layers.remove(&(level + 1)) {
286                metrics.children = next_layer;
287            }
288            if level == 0 {
289                return metrics;
290            }
291            layers.entry(level).or_default().push(metrics);
292        }
293
294        // Unreachable path. Each metrics should contains at least one level 0.
295        Self::default()
296    }
297
298    /// Convert a [`PlanMetrics`] to a [`JsonMetrics`] without children.
299    ///
300    /// Returns the level of the plan and the [`JsonMetrics`].
301    fn from_plan_metrics(plan_metrics: PlanMetrics) -> (usize, Self) {
302        let raw_name = plan_metrics.plan.trim_end();
303        let mut elapsed_compute = 0;
304        let mut output_rows = 0;
305        let mut other_metrics = HashMap::default();
306        let (name, param) = raw_name.split_once(": ").unwrap_or_default();
307
308        for (name, value) in plan_metrics.metrics.into_iter() {
309            if name == "elapsed_compute" {
310                elapsed_compute = value;
311            } else if name == "output_rows" {
312                output_rows = value;
313            } else {
314                other_metrics.insert(name, value);
315            }
316        }
317
318        (
319            plan_metrics.level,
320            Self {
321                name: name.to_string(),
322                param: param.to_string(),
323                output_rows,
324                elapsed_compute,
325                metrics: other_metrics,
326                children: vec![],
327            },
328        )
329    }
330}
331
332impl Display for JsonMetrics {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        write!(f, "{}", serde_json::to_string(self).unwrap())
335    }
336}