1use std::any::Any;
20use std::fmt::Display;
21use std::sync::Arc;
22
23use ahash::HashMap;
24use arrow::array::{StringBuilder, UInt32Builder};
25use arrow_schema::{DataType, Field, Schema, SchemaRef};
26use common_recordbatch::adapter::{MetricCollector, PlanMetrics, RecordBatchMetrics};
27use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
28use datafusion::error::Result as DfResult;
29use datafusion::execution::TaskContext;
30use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
32use datafusion::physical_plan::{
33 accept, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
36use datafusion_common::{internal_err, DataFusionError};
37use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning};
38use futures::StreamExt;
39use serde::Serialize;
40use sqlparser::ast::AnalyzeFormat;
41
42use crate::dist_plan::MergeScanExec;
43
44const STAGE: &str = "stage";
45const NODE: &str = "node";
46const PLAN: &str = "plan";
47
48#[derive(Debug)]
49pub struct DistAnalyzeExec {
50 input: Arc<dyn ExecutionPlan>,
51 schema: SchemaRef,
52 properties: PlanProperties,
53 verbose: bool,
54 format: AnalyzeFormat,
55}
56
57impl DistAnalyzeExec {
58 pub fn new(input: Arc<dyn ExecutionPlan>, verbose: bool, format: AnalyzeFormat) -> Self {
60 let schema = SchemaRef::new(Schema::new(vec![
61 Field::new(STAGE, DataType::UInt32, true),
62 Field::new(NODE, DataType::UInt32, true),
63 Field::new(PLAN, DataType::Utf8, true),
64 ]));
65 let properties = Self::compute_properties(&input, schema.clone());
66 Self {
67 input,
68 schema,
69 properties,
70 verbose,
71 format,
72 }
73 }
74
75 fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
77 let eq_properties = EquivalenceProperties::new(schema);
78 let output_partitioning = Partitioning::UnknownPartitioning(1);
79 let properties = input.properties();
80 PlanProperties::new(
81 eq_properties,
82 output_partitioning,
83 properties.emission_type,
84 properties.boundedness,
85 )
86 }
87}
88
89impl DisplayAs for DistAnalyzeExec {
90 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91 match t {
92 DisplayFormatType::Default
93 | DisplayFormatType::Verbose
94 | DisplayFormatType::TreeRender => {
95 write!(f, "DistAnalyzeExec",)
96 }
97 }
98 }
99}
100
101impl ExecutionPlan for DistAnalyzeExec {
102 fn name(&self) -> &'static str {
103 "DistAnalyzeExec"
104 }
105
106 fn as_any(&self) -> &dyn Any {
108 self
109 }
110
111 fn properties(&self) -> &PlanProperties {
112 &self.properties
113 }
114
115 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
116 vec![&self.input]
117 }
118
119 fn required_input_distribution(&self) -> Vec<Distribution> {
121 vec![]
122 }
123
124 fn with_new_children(
125 self: Arc<Self>,
126 mut children: Vec<Arc<dyn ExecutionPlan>>,
127 ) -> DfResult<Arc<dyn ExecutionPlan>> {
128 Ok(Arc::new(Self::new(
129 children.pop().unwrap(),
130 self.verbose,
131 self.format,
132 )))
133 }
134
135 fn execute(
136 &self,
137 partition: usize,
138 context: Arc<TaskContext>,
139 ) -> DfResult<DfSendableRecordBatchStream> {
140 if 0 != partition {
141 return internal_err!("AnalyzeExec invalid partition. Expected 0, got {partition}");
142 }
143
144 let coalesce_partition_plan = CoalescePartitionsExec::new(self.input.clone());
147
148 let captured_input = self.input.clone();
150 let captured_schema = self.schema.clone();
151
152 let format = self.format;
154 let verbose = self.verbose;
155 let mut input_stream = coalesce_partition_plan.execute(0, context)?;
156 let output = async move {
157 let mut total_rows = 0;
158 while let Some(batch) = input_stream.next().await.transpose()? {
159 total_rows += batch.num_rows();
160 }
161
162 create_output_batch(total_rows, captured_input, captured_schema, format, verbose)
163 };
164
165 Ok(Box::pin(RecordBatchStreamAdapter::new(
166 self.schema.clone(),
167 futures::stream::once(output),
168 )))
169 }
170}
171
172struct AnalyzeOutputBuilder {
174 stage_builder: UInt32Builder,
175 node_builder: UInt32Builder,
176 plan_builder: StringBuilder,
177 schema: SchemaRef,
178}
179
180impl AnalyzeOutputBuilder {
181 fn new(schema: SchemaRef) -> Self {
182 Self {
183 stage_builder: UInt32Builder::with_capacity(4),
184 node_builder: UInt32Builder::with_capacity(4),
185 plan_builder: StringBuilder::with_capacity(1, 1024),
186 schema,
187 }
188 }
189
190 fn append_metric(&mut self, stage: u32, node: u32, content: String) {
191 self.stage_builder.append_value(stage);
192 self.node_builder.append_value(node);
193 self.plan_builder.append_value(content);
194 }
195
196 fn append_total_rows(&mut self, total_rows: usize) {
197 self.stage_builder.append_null();
198 self.node_builder.append_null();
199 self.plan_builder
200 .append_value(format!("Total rows: {}", total_rows));
201 }
202
203 fn finish(mut self) -> DfResult<DfRecordBatch> {
204 DfRecordBatch::try_new(
205 self.schema,
206 vec![
207 Arc::new(self.stage_builder.finish()),
208 Arc::new(self.node_builder.finish()),
209 Arc::new(self.plan_builder.finish()),
210 ],
211 )
212 .map_err(DataFusionError::from)
213 }
214}
215
216fn create_output_batch(
218 total_rows: usize,
219 input: Arc<dyn ExecutionPlan>,
220 schema: SchemaRef,
221 format: AnalyzeFormat,
222 verbose: bool,
223) -> DfResult<DfRecordBatch> {
224 let mut builder = AnalyzeOutputBuilder::new(schema);
225
226 let mut collector = MetricCollector::new(verbose);
228 accept(input.as_ref(), &mut collector).unwrap();
230 let stage_0_metrics = collector.record_batch_metrics;
231
232 builder.append_metric(0, 0, metrics_to_string(stage_0_metrics, format)?);
234
235 input.apply(|plan| {
237 if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
238 let sub_stage_metrics = merge_scan.sub_stage_metrics();
239 for (node, metric) in sub_stage_metrics.into_iter().enumerate() {
240 builder.append_metric(1, node as _, metrics_to_string(metric, format)?);
241 }
242 return Ok(TreeNodeRecursion::Continue);
244 }
245 Ok(TreeNodeRecursion::Continue)
246 })?;
247
248 builder.append_total_rows(total_rows);
250
251 builder.finish()
252}
253
254fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfResult<String> {
255 match format {
256 AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
257 AnalyzeFormat::TEXT => Ok(metrics.to_string()),
258 AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
259 "GRAPHVIZ format is not supported for metrics output".to_string(),
260 )),
261 }
262}
263
264#[derive(Debug, Default, Serialize)]
265struct JsonMetrics {
266 name: String,
267 param: String,
268
269 output_rows: usize,
271 elapsed_compute: usize,
273
274 metrics: HashMap<String, usize>,
276 children: Vec<JsonMetrics>,
277}
278
279impl JsonMetrics {
280 fn from_record_batch_metrics(record_batch_metrics: RecordBatchMetrics) -> Self {
281 let mut layers: HashMap<usize, Vec<Self>> = HashMap::default();
282
283 for plan_metrics in record_batch_metrics.plan_metrics.into_iter().rev() {
284 let (level, mut metrics) = Self::from_plan_metrics(plan_metrics);
285 if let Some(next_layer) = layers.remove(&(level + 1)) {
286 metrics.children = next_layer;
287 }
288 if level == 0 {
289 return metrics;
290 }
291 layers.entry(level).or_default().push(metrics);
292 }
293
294 Self::default()
296 }
297
298 fn from_plan_metrics(plan_metrics: PlanMetrics) -> (usize, Self) {
302 let raw_name = plan_metrics.plan.trim_end();
303 let mut elapsed_compute = 0;
304 let mut output_rows = 0;
305 let mut other_metrics = HashMap::default();
306 let (name, param) = raw_name.split_once(": ").unwrap_or_default();
307
308 for (name, value) in plan_metrics.metrics.into_iter() {
309 if name == "elapsed_compute" {
310 elapsed_compute = value;
311 } else if name == "output_rows" {
312 output_rows = value;
313 } else {
314 other_metrics.insert(name, value);
315 }
316 }
317
318 (
319 plan_metrics.level,
320 Self {
321 name: name.to_string(),
322 param: param.to_string(),
323 output_rows,
324 elapsed_compute,
325 metrics: other_metrics,
326 children: vec![],
327 },
328 )
329 }
330}
331
332impl Display for JsonMetrics {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 write!(f, "{}", serde_json::to_string(self).unwrap())
335 }
336}