1use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27    DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::datasource::{MemTable, provider_as_source};
30use datafusion::error::DataFusionError;
31use datafusion::execution::context::{SessionState, TaskContext};
32use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
33use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
34use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::{
37    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
38    SendableRecordBatchStream,
39};
40use datafusion::physical_planner::PhysicalPlanner;
41use datafusion::prelude::{Expr, col, lit};
42use datafusion_expr::LogicalPlanBuilder;
43use datatypes::arrow::array::TimestampMillisecondArray;
44use datatypes::arrow::datatypes::SchemaRef;
45use datatypes::arrow::record_batch::RecordBatch;
46use futures::Stream;
47
48use crate::extension_plan::Millisecond;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct EmptyMetric {
56    start: Millisecond,
57    end: Millisecond,
58    interval: Millisecond,
59    expr: Option<Expr>,
60    time_index_schema: DFSchemaRef,
63    result_schema: DFSchemaRef,
65    dummy_input: LogicalPlan,
71}
72
73impl EmptyMetric {
74    pub fn new(
75        start: Millisecond,
76        end: Millisecond,
77        interval: Millisecond,
78        time_index_column_name: String,
79        field_column_name: String,
80        field_expr: Option<Expr>,
81    ) -> DataFusionResult<Self> {
82        let qualifier = Some(TableReference::bare(""));
83        let ts_only_schema = build_ts_only_schema(&time_index_column_name);
84        let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
85        if let Some(field_expr) = &field_expr {
86            let field_data_type = field_expr.get_type(&ts_only_schema)?;
87            fields.push((
88                qualifier.clone(),
89                Arc::new(Field::new(field_column_name, field_data_type, true)),
90            ));
91        }
92        let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
93
94        let table = MemTable::try_new(Arc::new(schema.as_arrow().clone()), vec![vec![]])?;
95        let source = provider_as_source(Arc::new(table));
96        let dummy_input =
97            LogicalPlanBuilder::scan("dummy", source, None).and_then(|x| x.build())?;
98
99        Ok(Self {
100            start,
101            end,
102            interval,
103            time_index_schema: Arc::new(ts_only_schema),
104            result_schema: schema,
105            expr: field_expr,
106            dummy_input,
107        })
108    }
109
110    pub const fn name() -> &'static str {
111        "EmptyMetric"
112    }
113
114    pub fn to_execution_plan(
115        &self,
116        session_state: &SessionState,
117        physical_planner: &dyn PhysicalPlanner,
118    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
119        let physical_expr = self
120            .expr
121            .as_ref()
122            .map(|expr| {
123                physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
124            })
125            .transpose()?;
126        let result_schema: SchemaRef = self.result_schema.inner().clone();
127        let properties = Arc::new(PlanProperties::new(
128            EquivalenceProperties::new(result_schema.clone()),
129            Partitioning::UnknownPartitioning(1),
130            EmissionType::Incremental,
131            Boundedness::Bounded,
132        ));
133        Ok(Arc::new(EmptyMetricExec {
134            start: self.start,
135            end: self.end,
136            interval: self.interval,
137            time_index_schema: self.time_index_schema.inner().clone(),
138            result_schema,
139            expr: physical_expr,
140            properties,
141            metric: ExecutionPlanMetricsSet::new(),
142        }))
143    }
144}
145
146impl UserDefinedLogicalNodeCore for EmptyMetric {
147    fn name(&self) -> &str {
148        Self::name()
149    }
150
151    fn inputs(&self) -> Vec<&LogicalPlan> {
152        vec![&self.dummy_input]
153    }
154
155    fn schema(&self) -> &DFSchemaRef {
156        &self.result_schema
157    }
158
159    fn expressions(&self) -> Vec<Expr> {
160        if let Some(expr) = &self.expr {
161            vec![expr.clone()]
162        } else {
163            vec![]
164        }
165    }
166
167    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
168        write!(
169            f,
170            "EmptyMetric: range=[{}..{}], interval=[{}]",
171            self.start, self.end, self.interval,
172        )
173    }
174
175    fn with_exprs_and_inputs(
176        &self,
177        exprs: Vec<Expr>,
178        _inputs: Vec<LogicalPlan>,
179    ) -> DataFusionResult<Self> {
180        Ok(Self {
181            start: self.start,
182            end: self.end,
183            interval: self.interval,
184            expr: exprs.into_iter().next(),
185            time_index_schema: self.time_index_schema.clone(),
186            result_schema: self.result_schema.clone(),
187            dummy_input: self.dummy_input.clone(),
188        })
189    }
190}
191
192impl PartialOrd for EmptyMetric {
193    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
194        match self.start.partial_cmp(&other.start) {
196            Some(core::cmp::Ordering::Equal) => {}
197            ord => return ord,
198        }
199        match self.end.partial_cmp(&other.end) {
200            Some(core::cmp::Ordering::Equal) => {}
201            ord => return ord,
202        }
203        match self.interval.partial_cmp(&other.interval) {
204            Some(core::cmp::Ordering::Equal) => {}
205            ord => return ord,
206        }
207        self.expr.partial_cmp(&other.expr)
208    }
209}
210
211#[derive(Debug, Clone)]
212pub struct EmptyMetricExec {
213    start: Millisecond,
214    end: Millisecond,
215    interval: Millisecond,
216    time_index_schema: SchemaRef,
219    result_schema: SchemaRef,
221    expr: Option<PhysicalExprRef>,
222    properties: Arc<PlanProperties>,
223    metric: ExecutionPlanMetricsSet,
224}
225
226impl ExecutionPlan for EmptyMetricExec {
227    fn as_any(&self) -> &dyn Any {
228        self
229    }
230
231    fn schema(&self) -> SchemaRef {
232        self.result_schema.clone()
233    }
234
235    fn properties(&self) -> &PlanProperties {
236        self.properties.as_ref()
237    }
238
239    fn maintains_input_order(&self) -> Vec<bool> {
240        vec![]
241    }
242
243    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
244        vec![]
245    }
246
247    fn with_new_children(
248        self: Arc<Self>,
249        _children: Vec<Arc<dyn ExecutionPlan>>,
250    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
251        Ok(Arc::new(self.as_ref().clone()))
252    }
253
254    fn execute(
255        &self,
256        partition: usize,
257        _context: Arc<TaskContext>,
258    ) -> DataFusionResult<SendableRecordBatchStream> {
259        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
260        Ok(Box::pin(EmptyMetricStream {
261            start: self.start,
262            end: self.end,
263            interval: self.interval,
264            expr: self.expr.clone(),
265            is_first_poll: true,
266            time_index_schema: self.time_index_schema.clone(),
267            result_schema: self.result_schema.clone(),
268            metric: baseline_metric,
269        }))
270    }
271
272    fn metrics(&self) -> Option<MetricsSet> {
273        Some(self.metric.clone_inner())
274    }
275
276    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
277        if partition.is_some() {
278            return Ok(Statistics::new_unknown(self.schema().as_ref()));
279        }
280
281        let estimated_row_num = if self.end > self.start {
282            (self.end - self.start) as f64 / self.interval as f64
283        } else {
284            0.0
285        };
286        let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
287
288        Ok(Statistics {
289            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
290            total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
291            column_statistics: Statistics::unknown_column(&self.schema()),
292        })
293    }
294
295    fn name(&self) -> &str {
296        "EmptyMetricExec"
297    }
298}
299
300impl DisplayAs for EmptyMetricExec {
301    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
302        match t {
303            DisplayFormatType::Default
304            | DisplayFormatType::Verbose
305            | DisplayFormatType::TreeRender => write!(
306                f,
307                "EmptyMetric: range=[{}..{}], interval=[{}]",
308                self.start, self.end, self.interval,
309            ),
310        }
311    }
312}
313
314pub struct EmptyMetricStream {
315    start: Millisecond,
316    end: Millisecond,
317    interval: Millisecond,
318    expr: Option<PhysicalExprRef>,
319    is_first_poll: bool,
321    time_index_schema: SchemaRef,
324    result_schema: SchemaRef,
326    metric: BaselineMetrics,
327}
328
329impl RecordBatchStream for EmptyMetricStream {
330    fn schema(&self) -> SchemaRef {
331        self.result_schema.clone()
332    }
333}
334
335impl Stream for EmptyMetricStream {
336    type Item = DataFusionResult<RecordBatch>;
337
338    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
339        let result = if self.is_first_poll {
340            self.is_first_poll = false;
341            let _timer = self.metric.elapsed_compute().timer();
342
343            let time_array = (self.start..=self.end)
346                .step_by(self.interval as _)
347                .collect::<Vec<_>>();
348            let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
349            let num_rows = time_array.len();
350            let input_record_batch =
351                RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
352                    .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
353            let mut result_arrays: Vec<ArrayRef> = vec![time_array];
354
355            if let Some(field_expr) = &self.expr {
357                result_arrays.push(
358                    field_expr
359                        .evaluate(&input_record_batch)
360                        .and_then(|x| x.into_array(num_rows))?,
361                );
362            }
363
364            let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
366                .map_err(|e| DataFusionError::ArrowError(Box::new(e), None));
367
368            Poll::Ready(Some(batch))
369        } else {
370            Poll::Ready(None)
371        };
372        self.metric.record_poll(result)
373    }
374}
375
376fn build_ts_only_schema(column_name: &str) -> DFSchema {
378    let ts_field = Field::new(
379        column_name,
380        DataType::Timestamp(TimeUnit::Millisecond, None),
381        false,
382    );
383    DFSchema::new_with_metadata(
385        vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
386        HashMap::new(),
387    )
388    .unwrap()
389}
390
391pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
394    let input_schema = build_ts_only_schema(time_index_column_name);
395    col(time_index_column_name)
397        .cast_to(&DataType::Int64, &input_schema)
398        .unwrap()
399        .cast_to(&DataType::Float64, &input_schema)
400        .unwrap()
401        .div(lit(1000.0)) }
403
404#[cfg(test)]
405mod test {
406    use datafusion::physical_planner::DefaultPhysicalPlanner;
407    use datafusion::prelude::SessionContext;
408
409    use super::*;
410
411    async fn do_empty_metric_test(
412        start: Millisecond,
413        end: Millisecond,
414        interval: Millisecond,
415        time_column_name: String,
416        field_column_name: String,
417        expected: String,
418    ) {
419        let session_context = SessionContext::default();
420        let df_default_physical_planner = DefaultPhysicalPlanner::default();
421        let time_expr = build_special_time_expr(&time_column_name);
422        let empty_metric = EmptyMetric::new(
423            start,
424            end,
425            interval,
426            time_column_name,
427            field_column_name,
428            Some(time_expr),
429        )
430        .unwrap();
431        let empty_metric_exec = empty_metric
432            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
433            .unwrap();
434
435        let result =
436            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
437                .await
438                .unwrap();
439        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
440            .unwrap()
441            .to_string();
442
443        assert_eq!(result_literal, expected);
444    }
445
446    #[tokio::test]
447    async fn normal_empty_metric_test() {
448        do_empty_metric_test(
449            0,
450            100,
451            10,
452            "time".to_string(),
453            "value".to_string(),
454            String::from(
455                "+-------------------------+-------+\
456                \n| time                    | value |\
457                \n+-------------------------+-------+\
458                \n| 1970-01-01T00:00:00     | 0.0   |\
459                \n| 1970-01-01T00:00:00.010 | 0.01  |\
460                \n| 1970-01-01T00:00:00.020 | 0.02  |\
461                \n| 1970-01-01T00:00:00.030 | 0.03  |\
462                \n| 1970-01-01T00:00:00.040 | 0.04  |\
463                \n| 1970-01-01T00:00:00.050 | 0.05  |\
464                \n| 1970-01-01T00:00:00.060 | 0.06  |\
465                \n| 1970-01-01T00:00:00.070 | 0.07  |\
466                \n| 1970-01-01T00:00:00.080 | 0.08  |\
467                \n| 1970-01-01T00:00:00.090 | 0.09  |\
468                \n| 1970-01-01T00:00:00.100 | 0.1   |\
469                \n+-------------------------+-------+",
470            ),
471        )
472        .await
473    }
474
475    #[tokio::test]
476    async fn unaligned_empty_metric_test() {
477        do_empty_metric_test(
478            0,
479            100,
480            11,
481            "time".to_string(),
482            "value".to_string(),
483            String::from(
484                "+-------------------------+-------+\
485                \n| time                    | value |\
486                \n+-------------------------+-------+\
487                \n| 1970-01-01T00:00:00     | 0.0   |\
488                \n| 1970-01-01T00:00:00.011 | 0.011 |\
489                \n| 1970-01-01T00:00:00.022 | 0.022 |\
490                \n| 1970-01-01T00:00:00.033 | 0.033 |\
491                \n| 1970-01-01T00:00:00.044 | 0.044 |\
492                \n| 1970-01-01T00:00:00.055 | 0.055 |\
493                \n| 1970-01-01T00:00:00.066 | 0.066 |\
494                \n| 1970-01-01T00:00:00.077 | 0.077 |\
495                \n| 1970-01-01T00:00:00.088 | 0.088 |\
496                \n| 1970-01-01T00:00:00.099 | 0.099 |\
497                \n+-------------------------+-------+",
498            ),
499        )
500        .await
501    }
502
503    #[tokio::test]
504    async fn one_row_empty_metric_test() {
505        do_empty_metric_test(
506            0,
507            100,
508            1000,
509            "time".to_string(),
510            "value".to_string(),
511            String::from(
512                "+---------------------+-------+\
513                \n| time                | value |\
514                \n+---------------------+-------+\
515                \n| 1970-01-01T00:00:00 | 0.0   |\
516                \n+---------------------+-------+",
517            ),
518        )
519        .await
520    }
521
522    #[tokio::test]
523    async fn negative_range_empty_metric_test() {
524        do_empty_metric_test(
525            1000,
526            -1000,
527            10,
528            "time".to_string(),
529            "value".to_string(),
530            String::from(
531                "+------+-------+\
532                \n| time | value |\
533                \n+------+-------+\
534                \n+------+-------+",
535            ),
536        )
537        .await
538    }
539
540    #[tokio::test]
541    async fn no_field_expr() {
542        let session_context = SessionContext::default();
543        let df_default_physical_planner = DefaultPhysicalPlanner::default();
544        let empty_metric =
545            EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
546        let empty_metric_exec = empty_metric
547            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
548            .unwrap();
549
550        let result =
551            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
552                .await
553                .unwrap();
554        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
555            .unwrap()
556            .to_string();
557
558        let expected = String::from(
559            "+---------------------+\
560            \n| time                |\
561            \n+---------------------+\
562            \n| 1970-01-01T00:00:00 |\
563            \n+---------------------+",
564        );
565        assert_eq!(result_literal, expected);
566    }
567}