1use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27 DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::error::DataFusionError;
30use datafusion::execution::context::{SessionState, TaskContext};
31use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream,
38};
39use datafusion::physical_planner::PhysicalPlanner;
40use datafusion::prelude::{col, lit, Expr};
41use datatypes::arrow::array::TimestampMillisecondArray;
42use datatypes::arrow::datatypes::SchemaRef;
43use datatypes::arrow::record_batch::RecordBatch;
44use futures::Stream;
45
46use crate::extension_plan::Millisecond;
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
53pub struct EmptyMetric {
54 start: Millisecond,
55 end: Millisecond,
56 interval: Millisecond,
57 expr: Option<Expr>,
58 time_index_schema: DFSchemaRef,
61 result_schema: DFSchemaRef,
63}
64
65impl EmptyMetric {
66 pub fn new(
67 start: Millisecond,
68 end: Millisecond,
69 interval: Millisecond,
70 time_index_column_name: String,
71 field_column_name: String,
72 field_expr: Option<Expr>,
73 ) -> DataFusionResult<Self> {
74 let qualifier = Some(TableReference::bare(""));
75 let ts_only_schema = build_ts_only_schema(&time_index_column_name);
76 let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
77 if let Some(field_expr) = &field_expr {
78 let field_data_type = field_expr.get_type(&ts_only_schema)?;
79 fields.push((
80 qualifier.clone(),
81 Arc::new(Field::new(field_column_name, field_data_type, true)),
82 ));
83 }
84 let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
85
86 Ok(Self {
87 start,
88 end,
89 interval,
90 time_index_schema: Arc::new(ts_only_schema),
91 result_schema: schema,
92 expr: field_expr,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "EmptyMetric"
98 }
99
100 pub fn to_execution_plan(
101 &self,
102 session_state: &SessionState,
103 physical_planner: &dyn PhysicalPlanner,
104 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
105 let physical_expr = self
106 .expr
107 .as_ref()
108 .map(|expr| {
109 physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
110 })
111 .transpose()?;
112 let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
113 let properties = Arc::new(PlanProperties::new(
114 EquivalenceProperties::new(result_schema.clone()),
115 Partitioning::UnknownPartitioning(1),
116 EmissionType::Incremental,
117 Boundedness::Bounded,
118 ));
119 Ok(Arc::new(EmptyMetricExec {
120 start: self.start,
121 end: self.end,
122 interval: self.interval,
123 time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
124 result_schema,
125 expr: physical_expr,
126 properties,
127 metric: ExecutionPlanMetricsSet::new(),
128 }))
129 }
130}
131
132impl UserDefinedLogicalNodeCore for EmptyMetric {
133 fn name(&self) -> &str {
134 Self::name()
135 }
136
137 fn inputs(&self) -> Vec<&LogicalPlan> {
138 vec![]
139 }
140
141 fn schema(&self) -> &DFSchemaRef {
142 &self.result_schema
143 }
144
145 fn expressions(&self) -> Vec<Expr> {
146 if let Some(expr) = &self.expr {
147 vec![expr.clone()]
148 } else {
149 vec![]
150 }
151 }
152
153 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154 write!(
155 f,
156 "EmptyMetric: range=[{}..{}], interval=[{}]",
157 self.start, self.end, self.interval,
158 )
159 }
160
161 fn with_exprs_and_inputs(
162 &self,
163 exprs: Vec<Expr>,
164 _inputs: Vec<LogicalPlan>,
165 ) -> DataFusionResult<Self> {
166 Ok(Self {
167 start: self.start,
168 end: self.end,
169 interval: self.interval,
170 expr: exprs.into_iter().next(),
171 time_index_schema: self.time_index_schema.clone(),
172 result_schema: self.result_schema.clone(),
173 })
174 }
175}
176
177impl PartialOrd for EmptyMetric {
178 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179 match self.start.partial_cmp(&other.start) {
181 Some(core::cmp::Ordering::Equal) => {}
182 ord => return ord,
183 }
184 match self.end.partial_cmp(&other.end) {
185 Some(core::cmp::Ordering::Equal) => {}
186 ord => return ord,
187 }
188 match self.interval.partial_cmp(&other.interval) {
189 Some(core::cmp::Ordering::Equal) => {}
190 ord => return ord,
191 }
192 self.expr.partial_cmp(&other.expr)
193 }
194}
195
196#[derive(Debug, Clone)]
197pub struct EmptyMetricExec {
198 start: Millisecond,
199 end: Millisecond,
200 interval: Millisecond,
201 time_index_schema: SchemaRef,
204 result_schema: SchemaRef,
206 expr: Option<PhysicalExprRef>,
207 properties: Arc<PlanProperties>,
208 metric: ExecutionPlanMetricsSet,
209}
210
211impl ExecutionPlan for EmptyMetricExec {
212 fn as_any(&self) -> &dyn Any {
213 self
214 }
215
216 fn schema(&self) -> SchemaRef {
217 self.result_schema.clone()
218 }
219
220 fn properties(&self) -> &PlanProperties {
221 self.properties.as_ref()
222 }
223
224 fn maintains_input_order(&self) -> Vec<bool> {
225 vec![]
226 }
227
228 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229 vec![]
230 }
231
232 fn with_new_children(
233 self: Arc<Self>,
234 _children: Vec<Arc<dyn ExecutionPlan>>,
235 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
236 Ok(Arc::new(self.as_ref().clone()))
237 }
238
239 fn execute(
240 &self,
241 partition: usize,
242 _context: Arc<TaskContext>,
243 ) -> DataFusionResult<SendableRecordBatchStream> {
244 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
245 Ok(Box::pin(EmptyMetricStream {
246 start: self.start,
247 end: self.end,
248 interval: self.interval,
249 expr: self.expr.clone(),
250 is_first_poll: true,
251 time_index_schema: self.time_index_schema.clone(),
252 result_schema: self.result_schema.clone(),
253 metric: baseline_metric,
254 }))
255 }
256
257 fn metrics(&self) -> Option<MetricsSet> {
258 Some(self.metric.clone_inner())
259 }
260
261 fn statistics(&self) -> DataFusionResult<Statistics> {
262 let estimated_row_num = if self.end > self.start {
263 (self.end - self.start) as f64 / self.interval as f64
264 } else {
265 0.0
266 };
267 let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
268
269 Ok(Statistics {
270 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
271 total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
272 column_statistics: Statistics::unknown_column(&self.schema()),
273 })
274 }
275
276 fn name(&self) -> &str {
277 "EmptyMetricExec"
278 }
279}
280
281impl DisplayAs for EmptyMetricExec {
282 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
283 match t {
284 DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
285 f,
286 "EmptyMetric: range=[{}..{}], interval=[{}]",
287 self.start, self.end, self.interval,
288 ),
289 }
290 }
291}
292
293pub struct EmptyMetricStream {
294 start: Millisecond,
295 end: Millisecond,
296 interval: Millisecond,
297 expr: Option<PhysicalExprRef>,
298 is_first_poll: bool,
300 time_index_schema: SchemaRef,
303 result_schema: SchemaRef,
305 metric: BaselineMetrics,
306}
307
308impl RecordBatchStream for EmptyMetricStream {
309 fn schema(&self) -> SchemaRef {
310 self.result_schema.clone()
311 }
312}
313
314impl Stream for EmptyMetricStream {
315 type Item = DataFusionResult<RecordBatch>;
316
317 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
318 let result = if self.is_first_poll {
319 self.is_first_poll = false;
320 let _timer = self.metric.elapsed_compute().timer();
321
322 let time_array = (self.start..=self.end)
325 .step_by(self.interval as _)
326 .collect::<Vec<_>>();
327 let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
328 let num_rows = time_array.len();
329 let input_record_batch =
330 RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
331 .map_err(|e| DataFusionError::ArrowError(e, None))?;
332 let mut result_arrays: Vec<ArrayRef> = vec![time_array];
333
334 if let Some(field_expr) = &self.expr {
336 result_arrays.push(
337 field_expr
338 .evaluate(&input_record_batch)
339 .and_then(|x| x.into_array(num_rows))?,
340 );
341 }
342
343 let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
345 .map_err(|e| DataFusionError::ArrowError(e, None));
346
347 Poll::Ready(Some(batch))
348 } else {
349 Poll::Ready(None)
350 };
351 self.metric.record_poll(result)
352 }
353}
354
355fn build_ts_only_schema(column_name: &str) -> DFSchema {
357 let ts_field = Field::new(
358 column_name,
359 DataType::Timestamp(TimeUnit::Millisecond, None),
360 false,
361 );
362 DFSchema::new_with_metadata(
364 vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
365 HashMap::new(),
366 )
367 .unwrap()
368}
369
370pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
373 let input_schema = build_ts_only_schema(time_index_column_name);
374 col(time_index_column_name)
376 .cast_to(&DataType::Int64, &input_schema)
377 .unwrap()
378 .cast_to(&DataType::Float64, &input_schema)
379 .unwrap()
380 .div(lit(1000.0)) }
382
383#[cfg(test)]
384mod test {
385 use datafusion::physical_planner::DefaultPhysicalPlanner;
386 use datafusion::prelude::SessionContext;
387
388 use super::*;
389
390 async fn do_empty_metric_test(
391 start: Millisecond,
392 end: Millisecond,
393 interval: Millisecond,
394 time_column_name: String,
395 field_column_name: String,
396 expected: String,
397 ) {
398 let session_context = SessionContext::default();
399 let df_default_physical_planner = DefaultPhysicalPlanner::default();
400 let time_expr = build_special_time_expr(&time_column_name);
401 let empty_metric = EmptyMetric::new(
402 start,
403 end,
404 interval,
405 time_column_name,
406 field_column_name,
407 Some(time_expr),
408 )
409 .unwrap();
410 let empty_metric_exec = empty_metric
411 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
412 .unwrap();
413
414 let result =
415 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
416 .await
417 .unwrap();
418 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
419 .unwrap()
420 .to_string();
421
422 assert_eq!(result_literal, expected);
423 }
424
425 #[tokio::test]
426 async fn normal_empty_metric_test() {
427 do_empty_metric_test(
428 0,
429 100,
430 10,
431 "time".to_string(),
432 "value".to_string(),
433 String::from(
434 "+-------------------------+-------+\
435 \n| time | value |\
436 \n+-------------------------+-------+\
437 \n| 1970-01-01T00:00:00 | 0.0 |\
438 \n| 1970-01-01T00:00:00.010 | 0.01 |\
439 \n| 1970-01-01T00:00:00.020 | 0.02 |\
440 \n| 1970-01-01T00:00:00.030 | 0.03 |\
441 \n| 1970-01-01T00:00:00.040 | 0.04 |\
442 \n| 1970-01-01T00:00:00.050 | 0.05 |\
443 \n| 1970-01-01T00:00:00.060 | 0.06 |\
444 \n| 1970-01-01T00:00:00.070 | 0.07 |\
445 \n| 1970-01-01T00:00:00.080 | 0.08 |\
446 \n| 1970-01-01T00:00:00.090 | 0.09 |\
447 \n| 1970-01-01T00:00:00.100 | 0.1 |\
448 \n+-------------------------+-------+",
449 ),
450 )
451 .await
452 }
453
454 #[tokio::test]
455 async fn unaligned_empty_metric_test() {
456 do_empty_metric_test(
457 0,
458 100,
459 11,
460 "time".to_string(),
461 "value".to_string(),
462 String::from(
463 "+-------------------------+-------+\
464 \n| time | value |\
465 \n+-------------------------+-------+\
466 \n| 1970-01-01T00:00:00 | 0.0 |\
467 \n| 1970-01-01T00:00:00.011 | 0.011 |\
468 \n| 1970-01-01T00:00:00.022 | 0.022 |\
469 \n| 1970-01-01T00:00:00.033 | 0.033 |\
470 \n| 1970-01-01T00:00:00.044 | 0.044 |\
471 \n| 1970-01-01T00:00:00.055 | 0.055 |\
472 \n| 1970-01-01T00:00:00.066 | 0.066 |\
473 \n| 1970-01-01T00:00:00.077 | 0.077 |\
474 \n| 1970-01-01T00:00:00.088 | 0.088 |\
475 \n| 1970-01-01T00:00:00.099 | 0.099 |\
476 \n+-------------------------+-------+",
477 ),
478 )
479 .await
480 }
481
482 #[tokio::test]
483 async fn one_row_empty_metric_test() {
484 do_empty_metric_test(
485 0,
486 100,
487 1000,
488 "time".to_string(),
489 "value".to_string(),
490 String::from(
491 "+---------------------+-------+\
492 \n| time | value |\
493 \n+---------------------+-------+\
494 \n| 1970-01-01T00:00:00 | 0.0 |\
495 \n+---------------------+-------+",
496 ),
497 )
498 .await
499 }
500
501 #[tokio::test]
502 async fn negative_range_empty_metric_test() {
503 do_empty_metric_test(
504 1000,
505 -1000,
506 10,
507 "time".to_string(),
508 "value".to_string(),
509 String::from(
510 "+------+-------+\
511 \n| time | value |\
512 \n+------+-------+\
513 \n+------+-------+",
514 ),
515 )
516 .await
517 }
518
519 #[tokio::test]
520 async fn no_field_expr() {
521 let session_context = SessionContext::default();
522 let df_default_physical_planner = DefaultPhysicalPlanner::default();
523 let empty_metric =
524 EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
525 let empty_metric_exec = empty_metric
526 .to_execution_plan(&session_context.state(), &df_default_physical_planner)
527 .unwrap();
528
529 let result =
530 datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
531 .await
532 .unwrap();
533 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
534 .unwrap()
535 .to_string();
536
537 let expected = String::from(
538 "+---------------------+\
539 \n| time |\
540 \n+---------------------+\
541 \n| 1970-01-01T00:00:00 |\
542 \n+---------------------+",
543 );
544 assert_eq!(result_literal, expected);
545 }
546}