1use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27 DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::error::DataFusionError;
30use datafusion::execution::context::{SessionState, TaskContext};
31use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream,
38};
39use datafusion::physical_planner::PhysicalPlanner;
40use datafusion::prelude::{col, lit, Expr};
41use datatypes::arrow::array::TimestampMillisecondArray;
42use datatypes::arrow::datatypes::SchemaRef;
43use datatypes::arrow::record_batch::RecordBatch;
44use futures::Stream;
45
46use crate::extension_plan::Millisecond;
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
53pub struct EmptyMetric {
54 start: Millisecond,
55 end: Millisecond,
56 interval: Millisecond,
57 expr: Option<Expr>,
58 time_index_schema: DFSchemaRef,
61 result_schema: DFSchemaRef,
63}
64
65impl EmptyMetric {
66 pub fn new(
67 start: Millisecond,
68 end: Millisecond,
69 interval: Millisecond,
70 time_index_column_name: String,
71 field_column_name: String,
72 field_expr: Option<Expr>,
73 ) -> DataFusionResult<Self> {
74 let qualifier = Some(TableReference::bare(""));
75 let ts_only_schema = build_ts_only_schema(&time_index_column_name);
76 let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
77 if let Some(field_expr) = &field_expr {
78 let field_data_type = field_expr.get_type(&ts_only_schema)?;
79 fields.push((
80 qualifier.clone(),
81 Arc::new(Field::new(field_column_name, field_data_type, true)),
82 ));
83 }
84 let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
85
86 Ok(Self {
87 start,
88 end,
89 interval,
90 time_index_schema: Arc::new(ts_only_schema),
91 result_schema: schema,
92 expr: field_expr,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "EmptyMetric"
98 }
99
100 pub fn to_execution_plan(
101 &self,
102 session_state: &SessionState,
103 physical_planner: &dyn PhysicalPlanner,
104 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
105 let physical_expr = self
106 .expr
107 .as_ref()
108 .map(|expr| {
109 physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
110 })
111 .transpose()?;
112 let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
113 let properties = Arc::new(PlanProperties::new(
114 EquivalenceProperties::new(result_schema.clone()),
115 Partitioning::UnknownPartitioning(1),
116 EmissionType::Incremental,
117 Boundedness::Bounded,
118 ));
119 Ok(Arc::new(EmptyMetricExec {
120 start: self.start,
121 end: self.end,
122 interval: self.interval,
123 time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
124 result_schema,
125 expr: physical_expr,
126 properties,
127 metric: ExecutionPlanMetricsSet::new(),
128 }))
129 }
130}
131
132impl UserDefinedLogicalNodeCore for EmptyMetric {
133 fn name(&self) -> &str {
134 Self::name()
135 }
136
137 fn inputs(&self) -> Vec<&LogicalPlan> {
138 vec![]
139 }
140
141 fn schema(&self) -> &DFSchemaRef {
142 &self.result_schema
143 }
144
145 fn expressions(&self) -> Vec<Expr> {
146 if let Some(expr) = &self.expr {
147 vec![expr.clone()]
148 } else {
149 vec![]
150 }
151 }
152
153 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154 write!(
155 f,
156 "EmptyMetric: range=[{}..{}], interval=[{}]",
157 self.start, self.end, self.interval,
158 )
159 }
160
161 fn with_exprs_and_inputs(
162 &self,
163 exprs: Vec<Expr>,
164 _inputs: Vec<LogicalPlan>,
165 ) -> DataFusionResult<Self> {
166 Ok(Self {
167 start: self.start,
168 end: self.end,
169 interval: self.interval,
170 expr: exprs.into_iter().next(),
171 time_index_schema: self.time_index_schema.clone(),
172 result_schema: self.result_schema.clone(),
173 })
174 }
175}
176
177impl PartialOrd for EmptyMetric {
178 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179 match self.start.partial_cmp(&other.start) {
181 Some(core::cmp::Ordering::Equal) => {}
182 ord => return ord,
183 }
184 match self.end.partial_cmp(&other.end) {
185 Some(core::cmp::Ordering::Equal) => {}
186 ord => return ord,
187 }
188 match self.interval.partial_cmp(&other.interval) {
189 Some(core::cmp::Ordering::Equal) => {}
190 ord => return ord,
191 }
192 self.expr.partial_cmp(&other.expr)
193 }
194}
195
196#[derive(Debug, Clone)]
197pub struct EmptyMetricExec {
198 start: Millisecond,
199 end: Millisecond,
200 interval: Millisecond,
201 time_index_schema: SchemaRef,
204 result_schema: SchemaRef,
206 expr: Option<PhysicalExprRef>,
207 properties: Arc<PlanProperties>,
208 metric: ExecutionPlanMetricsSet,
209}
210
211impl ExecutionPlan for EmptyMetricExec {
212 fn as_any(&self) -> &dyn Any {
213 self
214 }
215
216 fn schema(&self) -> SchemaRef {
217 self.result_schema.clone()
218 }
219
220 fn properties(&self) -> &PlanProperties {
221 self.properties.as_ref()
222 }
223
224 fn maintains_input_order(&self) -> Vec<bool> {
225 vec![]
226 }
227
228 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229 vec![]
230 }
231
232 fn with_new_children(
233 self: Arc<Self>,
234 _children: Vec<Arc<dyn ExecutionPlan>>,
235 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
236 Ok(Arc::new(self.as_ref().clone()))
237 }
238
239 fn execute(
240 &self,
241 partition: usize,
242 _context: Arc<TaskContext>,
243 ) -> DataFusionResult<SendableRecordBatchStream> {
244 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
245 Ok(Box::pin(EmptyMetricStream {
246 start: self.start,
247 end: self.end,
248 interval: self.interval,
249 expr: self.expr.clone(),
250 is_first_poll: true,
251 time_index_schema: self.time_index_schema.clone(),
252 result_schema: self.result_schema.clone(),
253 metric: baseline_metric,
254 }))
255 }
256
257 fn metrics(&self) -> Option<MetricsSet> {
258 Some(self.metric.clone_inner())
259 }
260
261 fn statistics(&self) -> DataFusionResult<Statistics> {
262 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
263 let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
264
265 Ok(Statistics {
266 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
267 total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
268 column_statistics: Statistics::unknown_column(&self.schema()),
269 })
270 }
271
272 fn name(&self) -> &str {
273 "EmptyMetricExec"
274 }
275}
276
277impl DisplayAs for EmptyMetricExec {
278 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
279 match t {
280 DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
281 f,
282 "EmptyMetric: range=[{}..{}], interval=[{}]",
283 self.start, self.end, self.interval,
284 ),
285 }
286 }
287}
288
289pub struct EmptyMetricStream {
290 start: Millisecond,
291 end: Millisecond,
292 interval: Millisecond,
293 expr: Option<PhysicalExprRef>,
294 is_first_poll: bool,
296 time_index_schema: SchemaRef,
299 result_schema: SchemaRef,
301 metric: BaselineMetrics,
302}
303
304impl RecordBatchStream for EmptyMetricStream {
305 fn schema(&self) -> SchemaRef {
306 self.result_schema.clone()
307 }
308}
309
310impl Stream for EmptyMetricStream {
311 type Item = DataFusionResult<RecordBatch>;
312
313 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314 let result = if self.is_first_poll {
315 self.is_first_poll = false;
316 let _timer = self.metric.elapsed_compute().timer();
317
318 let time_array = (self.start..=self.end)
321 .step_by(self.interval as _)
322 .collect::<Vec<_>>();
323 let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
324 let num_rows = time_array.len();
325 let input_record_batch =
326 RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
327 .map_err(|e| DataFusionError::ArrowError(e, None))?;
328 let mut result_arrays: Vec<ArrayRef> = vec![time_array];
329
330 if let Some(field_expr) = &self.expr {
332 result_arrays.push(
333 field_expr
334 .evaluate(&input_record_batch)
335 .and_then(|x| x.into_array(num_rows))?,
336 );
337 }
338
339 let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
341 .map_err(|e| DataFusionError::ArrowError(e, None));
342
343 Poll::Ready(Some(batch))
344 } else {
345 Poll::Ready(None)
346 };
347 self.metric.record_poll(result)
348 }
349}
350
351fn build_ts_only_schema(column_name: &str) -> DFSchema {
353 let ts_field = Field::new(
354 column_name,
355 DataType::Timestamp(TimeUnit::Millisecond, None),
356 false,
357 );
358 DFSchema::new_with_metadata(
360 vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
361 HashMap::new(),
362 )
363 .unwrap()
364}
365
366pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
369 let input_schema = build_ts_only_schema(time_index_column_name);
370 col(time_index_column_name)
372 .cast_to(&DataType::Int64, &input_schema)
373 .unwrap()
374 .cast_to(&DataType::Float64, &input_schema)
375 .unwrap()
376 .div(lit(1000.0)) }
378
379#[cfg(test)]
380mod test {
381 use datafusion::physical_planner::DefaultPhysicalPlanner;
382 use datafusion::prelude::SessionContext;
383
384 use super::*;
385
386 async fn do_empty_metric_test(
387 start: Millisecond,
388 end: Millisecond,
389 interval: Millisecond,
390 time_column_name: String,
391 field_column_name: String,
392 expected: String,
393 ) {
394 let session_context = SessionContext::default();
395 let df_default_physical_planner = DefaultPhysicalPlanner::default();
396 let time_expr = build_special_time_expr(&time_column_name);
397 let empty_metric = EmptyMetric::new(
398 start,
399 end,
400 interval,
401 time_column_name,
402 field_column_name,
403 Some(time_expr),
404 )
405 .unwrap();
406 let empty_metric_exec = empty_metric
407 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
408 .unwrap();
409
410 let result =
411 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
412 .await
413 .unwrap();
414 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
415 .unwrap()
416 .to_string();
417
418 assert_eq!(result_literal, expected);
419 }
420
421 #[tokio::test]
422 async fn normal_empty_metric_test() {
423 do_empty_metric_test(
424 0,
425 100,
426 10,
427 "time".to_string(),
428 "value".to_string(),
429 String::from(
430 "+-------------------------+-------+\
431 \n| time | value |\
432 \n+-------------------------+-------+\
433 \n| 1970-01-01T00:00:00 | 0.0 |\
434 \n| 1970-01-01T00:00:00.010 | 0.01 |\
435 \n| 1970-01-01T00:00:00.020 | 0.02 |\
436 \n| 1970-01-01T00:00:00.030 | 0.03 |\
437 \n| 1970-01-01T00:00:00.040 | 0.04 |\
438 \n| 1970-01-01T00:00:00.050 | 0.05 |\
439 \n| 1970-01-01T00:00:00.060 | 0.06 |\
440 \n| 1970-01-01T00:00:00.070 | 0.07 |\
441 \n| 1970-01-01T00:00:00.080 | 0.08 |\
442 \n| 1970-01-01T00:00:00.090 | 0.09 |\
443 \n| 1970-01-01T00:00:00.100 | 0.1 |\
444 \n+-------------------------+-------+",
445 ),
446 )
447 .await
448 }
449
450 #[tokio::test]
451 async fn unaligned_empty_metric_test() {
452 do_empty_metric_test(
453 0,
454 100,
455 11,
456 "time".to_string(),
457 "value".to_string(),
458 String::from(
459 "+-------------------------+-------+\
460 \n| time | value |\
461 \n+-------------------------+-------+\
462 \n| 1970-01-01T00:00:00 | 0.0 |\
463 \n| 1970-01-01T00:00:00.011 | 0.011 |\
464 \n| 1970-01-01T00:00:00.022 | 0.022 |\
465 \n| 1970-01-01T00:00:00.033 | 0.033 |\
466 \n| 1970-01-01T00:00:00.044 | 0.044 |\
467 \n| 1970-01-01T00:00:00.055 | 0.055 |\
468 \n| 1970-01-01T00:00:00.066 | 0.066 |\
469 \n| 1970-01-01T00:00:00.077 | 0.077 |\
470 \n| 1970-01-01T00:00:00.088 | 0.088 |\
471 \n| 1970-01-01T00:00:00.099 | 0.099 |\
472 \n+-------------------------+-------+",
473 ),
474 )
475 .await
476 }
477
478 #[tokio::test]
479 async fn one_row_empty_metric_test() {
480 do_empty_metric_test(
481 0,
482 100,
483 1000,
484 "time".to_string(),
485 "value".to_string(),
486 String::from(
487 "+---------------------+-------+\
488 \n| time | value |\
489 \n+---------------------+-------+\
490 \n| 1970-01-01T00:00:00 | 0.0 |\
491 \n+---------------------+-------+",
492 ),
493 )
494 .await
495 }
496
497 #[tokio::test]
498 async fn negative_range_empty_metric_test() {
499 do_empty_metric_test(
500 1000,
501 -1000,
502 10,
503 "time".to_string(),
504 "value".to_string(),
505 String::from(
506 "+------+-------+\
507 \n| time | value |\
508 \n+------+-------+\
509 \n+------+-------+",
510 ),
511 )
512 .await
513 }
514
515 #[tokio::test]
516 async fn no_field_expr() {
517 let session_context = SessionContext::default();
518 let df_default_physical_planner = DefaultPhysicalPlanner::default();
519 let empty_metric =
520 EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
521 let empty_metric_exec = empty_metric
522 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
523 .unwrap();
524
525 let result =
526 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
527 .await
528 .unwrap();
529 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
530 .unwrap()
531 .to_string();
532
533 let expected = String::from(
534 "+---------------------+\
535 \n| time |\
536 \n+---------------------+\
537 \n| 1970-01-01T00:00:00 |\
538 \n+---------------------+",
539 );
540 assert_eq!(result_literal, expected);
541 }
542}