promql/extension_plan/
empty_metric.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27    DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::error::DataFusionError;
30use datafusion::execution::context::{SessionState, TaskContext};
31use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
35use datafusion::physical_plan::{
36    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
37    SendableRecordBatchStream,
38};
39use datafusion::physical_planner::PhysicalPlanner;
40use datafusion::prelude::{col, lit, Expr};
41use datatypes::arrow::array::TimestampMillisecondArray;
42use datatypes::arrow::datatypes::SchemaRef;
43use datatypes::arrow::record_batch::RecordBatch;
44use futures::Stream;
45
46use crate::extension_plan::Millisecond;
47
48/// Empty source plan that generate record batch with two columns:
49/// - time index column, computed from start, end and interval
50/// - value column, generated by the input expr. The expr should not
51///   reference any column except the time index column.
52#[derive(Debug, Clone, PartialEq, Eq, Hash)]
53pub struct EmptyMetric {
54    start: Millisecond,
55    end: Millisecond,
56    interval: Millisecond,
57    expr: Option<Expr>,
58    /// Schema that only contains the time index column.
59    /// This is for intermediate result only.
60    time_index_schema: DFSchemaRef,
61    /// Schema of the output record batch
62    result_schema: DFSchemaRef,
63}
64
65impl EmptyMetric {
66    pub fn new(
67        start: Millisecond,
68        end: Millisecond,
69        interval: Millisecond,
70        time_index_column_name: String,
71        field_column_name: String,
72        field_expr: Option<Expr>,
73    ) -> DataFusionResult<Self> {
74        let qualifier = Some(TableReference::bare(""));
75        let ts_only_schema = build_ts_only_schema(&time_index_column_name);
76        let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
77        if let Some(field_expr) = &field_expr {
78            let field_data_type = field_expr.get_type(&ts_only_schema)?;
79            fields.push((
80                qualifier.clone(),
81                Arc::new(Field::new(field_column_name, field_data_type, true)),
82            ));
83        }
84        let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
85
86        Ok(Self {
87            start,
88            end,
89            interval,
90            time_index_schema: Arc::new(ts_only_schema),
91            result_schema: schema,
92            expr: field_expr,
93        })
94    }
95
96    pub const fn name() -> &'static str {
97        "EmptyMetric"
98    }
99
100    pub fn to_execution_plan(
101        &self,
102        session_state: &SessionState,
103        physical_planner: &dyn PhysicalPlanner,
104    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
105        let physical_expr = self
106            .expr
107            .as_ref()
108            .map(|expr| {
109                physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
110            })
111            .transpose()?;
112        let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
113        let properties = Arc::new(PlanProperties::new(
114            EquivalenceProperties::new(result_schema.clone()),
115            Partitioning::UnknownPartitioning(1),
116            EmissionType::Incremental,
117            Boundedness::Bounded,
118        ));
119        Ok(Arc::new(EmptyMetricExec {
120            start: self.start,
121            end: self.end,
122            interval: self.interval,
123            time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
124            result_schema,
125            expr: physical_expr,
126            properties,
127            metric: ExecutionPlanMetricsSet::new(),
128        }))
129    }
130}
131
132impl UserDefinedLogicalNodeCore for EmptyMetric {
133    fn name(&self) -> &str {
134        Self::name()
135    }
136
137    fn inputs(&self) -> Vec<&LogicalPlan> {
138        vec![]
139    }
140
141    fn schema(&self) -> &DFSchemaRef {
142        &self.result_schema
143    }
144
145    fn expressions(&self) -> Vec<Expr> {
146        if let Some(expr) = &self.expr {
147            vec![expr.clone()]
148        } else {
149            vec![]
150        }
151    }
152
153    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154        write!(
155            f,
156            "EmptyMetric: range=[{}..{}], interval=[{}]",
157            self.start, self.end, self.interval,
158        )
159    }
160
161    fn with_exprs_and_inputs(
162        &self,
163        exprs: Vec<Expr>,
164        _inputs: Vec<LogicalPlan>,
165    ) -> DataFusionResult<Self> {
166        Ok(Self {
167            start: self.start,
168            end: self.end,
169            interval: self.interval,
170            expr: exprs.into_iter().next(),
171            time_index_schema: self.time_index_schema.clone(),
172            result_schema: self.result_schema.clone(),
173        })
174    }
175}
176
177impl PartialOrd for EmptyMetric {
178    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179        // Compare fields in order excluding schema fields
180        match self.start.partial_cmp(&other.start) {
181            Some(core::cmp::Ordering::Equal) => {}
182            ord => return ord,
183        }
184        match self.end.partial_cmp(&other.end) {
185            Some(core::cmp::Ordering::Equal) => {}
186            ord => return ord,
187        }
188        match self.interval.partial_cmp(&other.interval) {
189            Some(core::cmp::Ordering::Equal) => {}
190            ord => return ord,
191        }
192        self.expr.partial_cmp(&other.expr)
193    }
194}
195
196#[derive(Debug, Clone)]
197pub struct EmptyMetricExec {
198    start: Millisecond,
199    end: Millisecond,
200    interval: Millisecond,
201    /// Schema that only contains the time index column.
202    /// This is for intermediate result only.
203    time_index_schema: SchemaRef,
204    /// Schema of the output record batch
205    result_schema: SchemaRef,
206    expr: Option<PhysicalExprRef>,
207    properties: Arc<PlanProperties>,
208    metric: ExecutionPlanMetricsSet,
209}
210
211impl ExecutionPlan for EmptyMetricExec {
212    fn as_any(&self) -> &dyn Any {
213        self
214    }
215
216    fn schema(&self) -> SchemaRef {
217        self.result_schema.clone()
218    }
219
220    fn properties(&self) -> &PlanProperties {
221        self.properties.as_ref()
222    }
223
224    fn maintains_input_order(&self) -> Vec<bool> {
225        vec![]
226    }
227
228    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229        vec![]
230    }
231
232    fn with_new_children(
233        self: Arc<Self>,
234        _children: Vec<Arc<dyn ExecutionPlan>>,
235    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
236        Ok(Arc::new(self.as_ref().clone()))
237    }
238
239    fn execute(
240        &self,
241        partition: usize,
242        _context: Arc<TaskContext>,
243    ) -> DataFusionResult<SendableRecordBatchStream> {
244        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
245        Ok(Box::pin(EmptyMetricStream {
246            start: self.start,
247            end: self.end,
248            interval: self.interval,
249            expr: self.expr.clone(),
250            is_first_poll: true,
251            time_index_schema: self.time_index_schema.clone(),
252            result_schema: self.result_schema.clone(),
253            metric: baseline_metric,
254        }))
255    }
256
257    fn metrics(&self) -> Option<MetricsSet> {
258        Some(self.metric.clone_inner())
259    }
260
261    fn statistics(&self) -> DataFusionResult<Statistics> {
262        let estimated_row_num = if self.end > self.start {
263            (self.end - self.start) as f64 / self.interval as f64
264        } else {
265            0.0
266        };
267        let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
268
269        Ok(Statistics {
270            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
271            total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
272            column_statistics: Statistics::unknown_column(&self.schema()),
273        })
274    }
275
276    fn name(&self) -> &str {
277        "EmptyMetricExec"
278    }
279}
280
281impl DisplayAs for EmptyMetricExec {
282    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
283        match t {
284            DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
285                f,
286                "EmptyMetric: range=[{}..{}], interval=[{}]",
287                self.start, self.end, self.interval,
288            ),
289        }
290    }
291}
292
293pub struct EmptyMetricStream {
294    start: Millisecond,
295    end: Millisecond,
296    interval: Millisecond,
297    expr: Option<PhysicalExprRef>,
298    /// This stream only generate one record batch at the first poll
299    is_first_poll: bool,
300    /// Schema that only contains the time index column.
301    /// This is for intermediate result only.
302    time_index_schema: SchemaRef,
303    /// Schema of the output record batch
304    result_schema: SchemaRef,
305    metric: BaselineMetrics,
306}
307
308impl RecordBatchStream for EmptyMetricStream {
309    fn schema(&self) -> SchemaRef {
310        self.result_schema.clone()
311    }
312}
313
314impl Stream for EmptyMetricStream {
315    type Item = DataFusionResult<RecordBatch>;
316
317    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
318        let result = if self.is_first_poll {
319            self.is_first_poll = false;
320            let _timer = self.metric.elapsed_compute().timer();
321
322            // build the time index array, and a record batch that
323            // only contains that array as the input of field expr
324            let time_array = (self.start..=self.end)
325                .step_by(self.interval as _)
326                .collect::<Vec<_>>();
327            let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
328            let num_rows = time_array.len();
329            let input_record_batch =
330                RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
331                    .map_err(|e| DataFusionError::ArrowError(e, None))?;
332            let mut result_arrays: Vec<ArrayRef> = vec![time_array];
333
334            // evaluate the field expr and get the result
335            if let Some(field_expr) = &self.expr {
336                result_arrays.push(
337                    field_expr
338                        .evaluate(&input_record_batch)
339                        .and_then(|x| x.into_array(num_rows))?,
340                );
341            }
342
343            // assemble the output record batch
344            let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
345                .map_err(|e| DataFusionError::ArrowError(e, None));
346
347            Poll::Ready(Some(batch))
348        } else {
349            Poll::Ready(None)
350        };
351        self.metric.record_poll(result)
352    }
353}
354
355/// Build a schema that only contains **millisecond** timestamp column
356fn build_ts_only_schema(column_name: &str) -> DFSchema {
357    let ts_field = Field::new(
358        column_name,
359        DataType::Timestamp(TimeUnit::Millisecond, None),
360        false,
361    );
362    // safety: should not fail (UT covers this)
363    DFSchema::new_with_metadata(
364        vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
365        HashMap::new(),
366    )
367    .unwrap()
368}
369
370// Convert timestamp column to UNIX epoch second:
371// https://prometheus.io/docs/prometheus/latest/querying/functions/#time
372pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
373    let input_schema = build_ts_only_schema(time_index_column_name);
374    // safety: should not failed (UT covers this)
375    col(time_index_column_name)
376        .cast_to(&DataType::Int64, &input_schema)
377        .unwrap()
378        .cast_to(&DataType::Float64, &input_schema)
379        .unwrap()
380        .div(lit(1000.0)) // cast to second will lost precision, so we cast to float64 first and manually divide by 1000
381}
382
383#[cfg(test)]
384mod test {
385    use datafusion::physical_planner::DefaultPhysicalPlanner;
386    use datafusion::prelude::SessionContext;
387
388    use super::*;
389
390    async fn do_empty_metric_test(
391        start: Millisecond,
392        end: Millisecond,
393        interval: Millisecond,
394        time_column_name: String,
395        field_column_name: String,
396        expected: String,
397    ) {
398        let session_context = SessionContext::default();
399        let df_default_physical_planner = DefaultPhysicalPlanner::default();
400        let time_expr = build_special_time_expr(&time_column_name);
401        let empty_metric = EmptyMetric::new(
402            start,
403            end,
404            interval,
405            time_column_name,
406            field_column_name,
407            Some(time_expr),
408        )
409        .unwrap();
410        let empty_metric_exec = empty_metric
411            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
412            .unwrap();
413
414        let result =
415            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
416                .await
417                .unwrap();
418        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
419            .unwrap()
420            .to_string();
421
422        assert_eq!(result_literal, expected);
423    }
424
425    #[tokio::test]
426    async fn normal_empty_metric_test() {
427        do_empty_metric_test(
428            0,
429            100,
430            10,
431            "time".to_string(),
432            "value".to_string(),
433            String::from(
434                "+-------------------------+-------+\
435                \n| time                    | value |\
436                \n+-------------------------+-------+\
437                \n| 1970-01-01T00:00:00     | 0.0   |\
438                \n| 1970-01-01T00:00:00.010 | 0.01  |\
439                \n| 1970-01-01T00:00:00.020 | 0.02  |\
440                \n| 1970-01-01T00:00:00.030 | 0.03  |\
441                \n| 1970-01-01T00:00:00.040 | 0.04  |\
442                \n| 1970-01-01T00:00:00.050 | 0.05  |\
443                \n| 1970-01-01T00:00:00.060 | 0.06  |\
444                \n| 1970-01-01T00:00:00.070 | 0.07  |\
445                \n| 1970-01-01T00:00:00.080 | 0.08  |\
446                \n| 1970-01-01T00:00:00.090 | 0.09  |\
447                \n| 1970-01-01T00:00:00.100 | 0.1   |\
448                \n+-------------------------+-------+",
449            ),
450        )
451        .await
452    }
453
454    #[tokio::test]
455    async fn unaligned_empty_metric_test() {
456        do_empty_metric_test(
457            0,
458            100,
459            11,
460            "time".to_string(),
461            "value".to_string(),
462            String::from(
463                "+-------------------------+-------+\
464                \n| time                    | value |\
465                \n+-------------------------+-------+\
466                \n| 1970-01-01T00:00:00     | 0.0   |\
467                \n| 1970-01-01T00:00:00.011 | 0.011 |\
468                \n| 1970-01-01T00:00:00.022 | 0.022 |\
469                \n| 1970-01-01T00:00:00.033 | 0.033 |\
470                \n| 1970-01-01T00:00:00.044 | 0.044 |\
471                \n| 1970-01-01T00:00:00.055 | 0.055 |\
472                \n| 1970-01-01T00:00:00.066 | 0.066 |\
473                \n| 1970-01-01T00:00:00.077 | 0.077 |\
474                \n| 1970-01-01T00:00:00.088 | 0.088 |\
475                \n| 1970-01-01T00:00:00.099 | 0.099 |\
476                \n+-------------------------+-------+",
477            ),
478        )
479        .await
480    }
481
482    #[tokio::test]
483    async fn one_row_empty_metric_test() {
484        do_empty_metric_test(
485            0,
486            100,
487            1000,
488            "time".to_string(),
489            "value".to_string(),
490            String::from(
491                "+---------------------+-------+\
492                \n| time                | value |\
493                \n+---------------------+-------+\
494                \n| 1970-01-01T00:00:00 | 0.0   |\
495                \n+---------------------+-------+",
496            ),
497        )
498        .await
499    }
500
501    #[tokio::test]
502    async fn negative_range_empty_metric_test() {
503        do_empty_metric_test(
504            1000,
505            -1000,
506            10,
507            "time".to_string(),
508            "value".to_string(),
509            String::from(
510                "+------+-------+\
511                \n| time | value |\
512                \n+------+-------+\
513                \n+------+-------+",
514            ),
515        )
516        .await
517    }
518
519    #[tokio::test]
520    async fn no_field_expr() {
521        let session_context = SessionContext::default();
522        let df_default_physical_planner = DefaultPhysicalPlanner::default();
523        let empty_metric =
524            EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
525        let empty_metric_exec = empty_metric
526            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
527            .unwrap();
528
529        let result =
530            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
531                .await
532                .unwrap();
533        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
534            .unwrap()
535            .to_string();
536
537        let expected = String::from(
538            "+---------------------+\
539            \n| time                |\
540            \n+---------------------+\
541            \n| 1970-01-01T00:00:00 |\
542            \n+---------------------+",
543        );
544        assert_eq!(result_literal, expected);
545    }
546}