1use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27 DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::datasource::{provider_as_source, MemTable};
30use datafusion::error::DataFusionError;
31use datafusion::execution::context::{SessionState, TaskContext};
32use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
33use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
34use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{
37 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
38 SendableRecordBatchStream,
39};
40use datafusion::physical_planner::PhysicalPlanner;
41use datafusion::prelude::{col, lit, Expr};
42use datafusion_expr::LogicalPlanBuilder;
43use datatypes::arrow::array::TimestampMillisecondArray;
44use datatypes::arrow::datatypes::SchemaRef;
45use datatypes::arrow::record_batch::RecordBatch;
46use futures::Stream;
47
48use crate::extension_plan::Millisecond;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct EmptyMetric {
56 start: Millisecond,
57 end: Millisecond,
58 interval: Millisecond,
59 expr: Option<Expr>,
60 time_index_schema: DFSchemaRef,
63 result_schema: DFSchemaRef,
65 dummy_input: LogicalPlan,
71}
72
73impl EmptyMetric {
74 pub fn new(
75 start: Millisecond,
76 end: Millisecond,
77 interval: Millisecond,
78 time_index_column_name: String,
79 field_column_name: String,
80 field_expr: Option<Expr>,
81 ) -> DataFusionResult<Self> {
82 let qualifier = Some(TableReference::bare(""));
83 let ts_only_schema = build_ts_only_schema(&time_index_column_name);
84 let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
85 if let Some(field_expr) = &field_expr {
86 let field_data_type = field_expr.get_type(&ts_only_schema)?;
87 fields.push((
88 qualifier.clone(),
89 Arc::new(Field::new(field_column_name, field_data_type, true)),
90 ));
91 }
92 let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
93
94 let table = MemTable::try_new(Arc::new(schema.as_arrow().clone()), vec![vec![]])?;
95 let source = provider_as_source(Arc::new(table));
96 let dummy_input =
97 LogicalPlanBuilder::scan("dummy", source, None).and_then(|x| x.build())?;
98
99 Ok(Self {
100 start,
101 end,
102 interval,
103 time_index_schema: Arc::new(ts_only_schema),
104 result_schema: schema,
105 expr: field_expr,
106 dummy_input,
107 })
108 }
109
110 pub const fn name() -> &'static str {
111 "EmptyMetric"
112 }
113
114 pub fn to_execution_plan(
115 &self,
116 session_state: &SessionState,
117 physical_planner: &dyn PhysicalPlanner,
118 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
119 let physical_expr = self
120 .expr
121 .as_ref()
122 .map(|expr| {
123 physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
124 })
125 .transpose()?;
126 let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
127 let properties = Arc::new(PlanProperties::new(
128 EquivalenceProperties::new(result_schema.clone()),
129 Partitioning::UnknownPartitioning(1),
130 EmissionType::Incremental,
131 Boundedness::Bounded,
132 ));
133 Ok(Arc::new(EmptyMetricExec {
134 start: self.start,
135 end: self.end,
136 interval: self.interval,
137 time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
138 result_schema,
139 expr: physical_expr,
140 properties,
141 metric: ExecutionPlanMetricsSet::new(),
142 }))
143 }
144}
145
146impl UserDefinedLogicalNodeCore for EmptyMetric {
147 fn name(&self) -> &str {
148 Self::name()
149 }
150
151 fn inputs(&self) -> Vec<&LogicalPlan> {
152 vec![&self.dummy_input]
153 }
154
155 fn schema(&self) -> &DFSchemaRef {
156 &self.result_schema
157 }
158
159 fn expressions(&self) -> Vec<Expr> {
160 if let Some(expr) = &self.expr {
161 vec![expr.clone()]
162 } else {
163 vec![]
164 }
165 }
166
167 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
168 write!(
169 f,
170 "EmptyMetric: range=[{}..{}], interval=[{}]",
171 self.start, self.end, self.interval,
172 )
173 }
174
175 fn with_exprs_and_inputs(
176 &self,
177 exprs: Vec<Expr>,
178 _inputs: Vec<LogicalPlan>,
179 ) -> DataFusionResult<Self> {
180 Ok(Self {
181 start: self.start,
182 end: self.end,
183 interval: self.interval,
184 expr: exprs.into_iter().next(),
185 time_index_schema: self.time_index_schema.clone(),
186 result_schema: self.result_schema.clone(),
187 dummy_input: self.dummy_input.clone(),
188 })
189 }
190}
191
192impl PartialOrd for EmptyMetric {
193 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
194 match self.start.partial_cmp(&other.start) {
196 Some(core::cmp::Ordering::Equal) => {}
197 ord => return ord,
198 }
199 match self.end.partial_cmp(&other.end) {
200 Some(core::cmp::Ordering::Equal) => {}
201 ord => return ord,
202 }
203 match self.interval.partial_cmp(&other.interval) {
204 Some(core::cmp::Ordering::Equal) => {}
205 ord => return ord,
206 }
207 self.expr.partial_cmp(&other.expr)
208 }
209}
210
211#[derive(Debug, Clone)]
212pub struct EmptyMetricExec {
213 start: Millisecond,
214 end: Millisecond,
215 interval: Millisecond,
216 time_index_schema: SchemaRef,
219 result_schema: SchemaRef,
221 expr: Option<PhysicalExprRef>,
222 properties: Arc<PlanProperties>,
223 metric: ExecutionPlanMetricsSet,
224}
225
226impl ExecutionPlan for EmptyMetricExec {
227 fn as_any(&self) -> &dyn Any {
228 self
229 }
230
231 fn schema(&self) -> SchemaRef {
232 self.result_schema.clone()
233 }
234
235 fn properties(&self) -> &PlanProperties {
236 self.properties.as_ref()
237 }
238
239 fn maintains_input_order(&self) -> Vec<bool> {
240 vec![]
241 }
242
243 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
244 vec![]
245 }
246
247 fn with_new_children(
248 self: Arc<Self>,
249 _children: Vec<Arc<dyn ExecutionPlan>>,
250 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
251 Ok(Arc::new(self.as_ref().clone()))
252 }
253
254 fn execute(
255 &self,
256 partition: usize,
257 _context: Arc<TaskContext>,
258 ) -> DataFusionResult<SendableRecordBatchStream> {
259 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
260 Ok(Box::pin(EmptyMetricStream {
261 start: self.start,
262 end: self.end,
263 interval: self.interval,
264 expr: self.expr.clone(),
265 is_first_poll: true,
266 time_index_schema: self.time_index_schema.clone(),
267 result_schema: self.result_schema.clone(),
268 metric: baseline_metric,
269 }))
270 }
271
272 fn metrics(&self) -> Option<MetricsSet> {
273 Some(self.metric.clone_inner())
274 }
275
276 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
277 if partition.is_some() {
278 return Ok(Statistics::new_unknown(self.schema().as_ref()));
279 }
280
281 let estimated_row_num = if self.end > self.start {
282 (self.end - self.start) as f64 / self.interval as f64
283 } else {
284 0.0
285 };
286 let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
287
288 Ok(Statistics {
289 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
290 total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
291 column_statistics: Statistics::unknown_column(&self.schema()),
292 })
293 }
294
295 fn name(&self) -> &str {
296 "EmptyMetricExec"
297 }
298}
299
300impl DisplayAs for EmptyMetricExec {
301 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
302 match t {
303 DisplayFormatType::Default
304 | DisplayFormatType::Verbose
305 | DisplayFormatType::TreeRender => write!(
306 f,
307 "EmptyMetric: range=[{}..{}], interval=[{}]",
308 self.start, self.end, self.interval,
309 ),
310 }
311 }
312}
313
314pub struct EmptyMetricStream {
315 start: Millisecond,
316 end: Millisecond,
317 interval: Millisecond,
318 expr: Option<PhysicalExprRef>,
319 is_first_poll: bool,
321 time_index_schema: SchemaRef,
324 result_schema: SchemaRef,
326 metric: BaselineMetrics,
327}
328
329impl RecordBatchStream for EmptyMetricStream {
330 fn schema(&self) -> SchemaRef {
331 self.result_schema.clone()
332 }
333}
334
335impl Stream for EmptyMetricStream {
336 type Item = DataFusionResult<RecordBatch>;
337
338 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
339 let result = if self.is_first_poll {
340 self.is_first_poll = false;
341 let _timer = self.metric.elapsed_compute().timer();
342
343 let time_array = (self.start..=self.end)
346 .step_by(self.interval as _)
347 .collect::<Vec<_>>();
348 let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
349 let num_rows = time_array.len();
350 let input_record_batch =
351 RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
352 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
353 let mut result_arrays: Vec<ArrayRef> = vec![time_array];
354
355 if let Some(field_expr) = &self.expr {
357 result_arrays.push(
358 field_expr
359 .evaluate(&input_record_batch)
360 .and_then(|x| x.into_array(num_rows))?,
361 );
362 }
363
364 let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
366 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None));
367
368 Poll::Ready(Some(batch))
369 } else {
370 Poll::Ready(None)
371 };
372 self.metric.record_poll(result)
373 }
374}
375
376fn build_ts_only_schema(column_name: &str) -> DFSchema {
378 let ts_field = Field::new(
379 column_name,
380 DataType::Timestamp(TimeUnit::Millisecond, None),
381 false,
382 );
383 DFSchema::new_with_metadata(
385 vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
386 HashMap::new(),
387 )
388 .unwrap()
389}
390
391pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
394 let input_schema = build_ts_only_schema(time_index_column_name);
395 col(time_index_column_name)
397 .cast_to(&DataType::Int64, &input_schema)
398 .unwrap()
399 .cast_to(&DataType::Float64, &input_schema)
400 .unwrap()
401 .div(lit(1000.0)) }
403
404#[cfg(test)]
405mod test {
406 use datafusion::physical_planner::DefaultPhysicalPlanner;
407 use datafusion::prelude::SessionContext;
408
409 use super::*;
410
411 async fn do_empty_metric_test(
412 start: Millisecond,
413 end: Millisecond,
414 interval: Millisecond,
415 time_column_name: String,
416 field_column_name: String,
417 expected: String,
418 ) {
419 let session_context = SessionContext::default();
420 let df_default_physical_planner = DefaultPhysicalPlanner::default();
421 let time_expr = build_special_time_expr(&time_column_name);
422 let empty_metric = EmptyMetric::new(
423 start,
424 end,
425 interval,
426 time_column_name,
427 field_column_name,
428 Some(time_expr),
429 )
430 .unwrap();
431 let empty_metric_exec = empty_metric
432 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
433 .unwrap();
434
435 let result =
436 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
437 .await
438 .unwrap();
439 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
440 .unwrap()
441 .to_string();
442
443 assert_eq!(result_literal, expected);
444 }
445
446 #[tokio::test]
447 async fn normal_empty_metric_test() {
448 do_empty_metric_test(
449 0,
450 100,
451 10,
452 "time".to_string(),
453 "value".to_string(),
454 String::from(
455 "+-------------------------+-------+\
456 \n| time | value |\
457 \n+-------------------------+-------+\
458 \n| 1970-01-01T00:00:00 | 0.0 |\
459 \n| 1970-01-01T00:00:00.010 | 0.01 |\
460 \n| 1970-01-01T00:00:00.020 | 0.02 |\
461 \n| 1970-01-01T00:00:00.030 | 0.03 |\
462 \n| 1970-01-01T00:00:00.040 | 0.04 |\
463 \n| 1970-01-01T00:00:00.050 | 0.05 |\
464 \n| 1970-01-01T00:00:00.060 | 0.06 |\
465 \n| 1970-01-01T00:00:00.070 | 0.07 |\
466 \n| 1970-01-01T00:00:00.080 | 0.08 |\
467 \n| 1970-01-01T00:00:00.090 | 0.09 |\
468 \n| 1970-01-01T00:00:00.100 | 0.1 |\
469 \n+-------------------------+-------+",
470 ),
471 )
472 .await
473 }
474
475 #[tokio::test]
476 async fn unaligned_empty_metric_test() {
477 do_empty_metric_test(
478 0,
479 100,
480 11,
481 "time".to_string(),
482 "value".to_string(),
483 String::from(
484 "+-------------------------+-------+\
485 \n| time | value |\
486 \n+-------------------------+-------+\
487 \n| 1970-01-01T00:00:00 | 0.0 |\
488 \n| 1970-01-01T00:00:00.011 | 0.011 |\
489 \n| 1970-01-01T00:00:00.022 | 0.022 |\
490 \n| 1970-01-01T00:00:00.033 | 0.033 |\
491 \n| 1970-01-01T00:00:00.044 | 0.044 |\
492 \n| 1970-01-01T00:00:00.055 | 0.055 |\
493 \n| 1970-01-01T00:00:00.066 | 0.066 |\
494 \n| 1970-01-01T00:00:00.077 | 0.077 |\
495 \n| 1970-01-01T00:00:00.088 | 0.088 |\
496 \n| 1970-01-01T00:00:00.099 | 0.099 |\
497 \n+-------------------------+-------+",
498 ),
499 )
500 .await
501 }
502
503 #[tokio::test]
504 async fn one_row_empty_metric_test() {
505 do_empty_metric_test(
506 0,
507 100,
508 1000,
509 "time".to_string(),
510 "value".to_string(),
511 String::from(
512 "+---------------------+-------+\
513 \n| time | value |\
514 \n+---------------------+-------+\
515 \n| 1970-01-01T00:00:00 | 0.0 |\
516 \n+---------------------+-------+",
517 ),
518 )
519 .await
520 }
521
522 #[tokio::test]
523 async fn negative_range_empty_metric_test() {
524 do_empty_metric_test(
525 1000,
526 -1000,
527 10,
528 "time".to_string(),
529 "value".to_string(),
530 String::from(
531 "+------+-------+\
532 \n| time | value |\
533 \n+------+-------+\
534 \n+------+-------+",
535 ),
536 )
537 .await
538 }
539
540 #[tokio::test]
541 async fn no_field_expr() {
542 let session_context = SessionContext::default();
543 let df_default_physical_planner = DefaultPhysicalPlanner::default();
544 let empty_metric =
545 EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
546 let empty_metric_exec = empty_metric
547 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
548 .unwrap();
549
550 let result =
551 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
552 .await
553 .unwrap();
554 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
555 .unwrap()
556 .to_string();
557
558 let expected = String::from(
559 "+---------------------+\
560 \n| time |\
561 \n+---------------------+\
562 \n| 1970-01-01T00:00:00 |\
563 \n+---------------------+",
564 );
565 assert_eq!(result_literal, expected);
566 }
567}