promql/extension_plan/
empty_metric.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::ops::Div;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::ArrayRef;
23use datafusion::arrow::datatypes::{DataType, TimeUnit};
24use datafusion::common::arrow::datatypes::Field;
25use datafusion::common::stats::Precision;
26use datafusion::common::{
27    DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics, TableReference,
28};
29use datafusion::error::DataFusionError;
30use datafusion::execution::context::{SessionState, TaskContext};
31use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::{EquivalenceProperties, PhysicalExprRef};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
35use datafusion::physical_plan::{
36    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
37    SendableRecordBatchStream,
38};
39use datafusion::physical_planner::PhysicalPlanner;
40use datafusion::prelude::{col, lit, Expr};
41use datatypes::arrow::array::TimestampMillisecondArray;
42use datatypes::arrow::datatypes::SchemaRef;
43use datatypes::arrow::record_batch::RecordBatch;
44use futures::Stream;
45
46use crate::extension_plan::Millisecond;
47
48/// Empty source plan that generate record batch with two columns:
49/// - time index column, computed from start, end and interval
50/// - value column, generated by the input expr. The expr should not
51///   reference any column except the time index column.
52#[derive(Debug, Clone, PartialEq, Eq, Hash)]
53pub struct EmptyMetric {
54    start: Millisecond,
55    end: Millisecond,
56    interval: Millisecond,
57    expr: Option<Expr>,
58    /// Schema that only contains the time index column.
59    /// This is for intermediate result only.
60    time_index_schema: DFSchemaRef,
61    /// Schema of the output record batch
62    result_schema: DFSchemaRef,
63}
64
65impl EmptyMetric {
66    pub fn new(
67        start: Millisecond,
68        end: Millisecond,
69        interval: Millisecond,
70        time_index_column_name: String,
71        field_column_name: String,
72        field_expr: Option<Expr>,
73    ) -> DataFusionResult<Self> {
74        let qualifier = Some(TableReference::bare(""));
75        let ts_only_schema = build_ts_only_schema(&time_index_column_name);
76        let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
77        if let Some(field_expr) = &field_expr {
78            let field_data_type = field_expr.get_type(&ts_only_schema)?;
79            fields.push((
80                qualifier.clone(),
81                Arc::new(Field::new(field_column_name, field_data_type, true)),
82            ));
83        }
84        let schema = Arc::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
85
86        Ok(Self {
87            start,
88            end,
89            interval,
90            time_index_schema: Arc::new(ts_only_schema),
91            result_schema: schema,
92            expr: field_expr,
93        })
94    }
95
96    pub const fn name() -> &'static str {
97        "EmptyMetric"
98    }
99
100    pub fn to_execution_plan(
101        &self,
102        session_state: &SessionState,
103        physical_planner: &dyn PhysicalPlanner,
104    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
105        let physical_expr = self
106            .expr
107            .as_ref()
108            .map(|expr| {
109                physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
110            })
111            .transpose()?;
112        let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
113        let properties = Arc::new(PlanProperties::new(
114            EquivalenceProperties::new(result_schema.clone()),
115            Partitioning::UnknownPartitioning(1),
116            EmissionType::Incremental,
117            Boundedness::Bounded,
118        ));
119        Ok(Arc::new(EmptyMetricExec {
120            start: self.start,
121            end: self.end,
122            interval: self.interval,
123            time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
124            result_schema,
125            expr: physical_expr,
126            properties,
127            metric: ExecutionPlanMetricsSet::new(),
128        }))
129    }
130}
131
132impl UserDefinedLogicalNodeCore for EmptyMetric {
133    fn name(&self) -> &str {
134        Self::name()
135    }
136
137    fn inputs(&self) -> Vec<&LogicalPlan> {
138        vec![]
139    }
140
141    fn schema(&self) -> &DFSchemaRef {
142        &self.result_schema
143    }
144
145    fn expressions(&self) -> Vec<Expr> {
146        if let Some(expr) = &self.expr {
147            vec![expr.clone()]
148        } else {
149            vec![]
150        }
151    }
152
153    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154        write!(
155            f,
156            "EmptyMetric: range=[{}..{}], interval=[{}]",
157            self.start, self.end, self.interval,
158        )
159    }
160
161    fn with_exprs_and_inputs(
162        &self,
163        exprs: Vec<Expr>,
164        _inputs: Vec<LogicalPlan>,
165    ) -> DataFusionResult<Self> {
166        Ok(Self {
167            start: self.start,
168            end: self.end,
169            interval: self.interval,
170            expr: exprs.into_iter().next(),
171            time_index_schema: self.time_index_schema.clone(),
172            result_schema: self.result_schema.clone(),
173        })
174    }
175}
176
177impl PartialOrd for EmptyMetric {
178    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179        // Compare fields in order excluding schema fields
180        match self.start.partial_cmp(&other.start) {
181            Some(core::cmp::Ordering::Equal) => {}
182            ord => return ord,
183        }
184        match self.end.partial_cmp(&other.end) {
185            Some(core::cmp::Ordering::Equal) => {}
186            ord => return ord,
187        }
188        match self.interval.partial_cmp(&other.interval) {
189            Some(core::cmp::Ordering::Equal) => {}
190            ord => return ord,
191        }
192        self.expr.partial_cmp(&other.expr)
193    }
194}
195
196#[derive(Debug, Clone)]
197pub struct EmptyMetricExec {
198    start: Millisecond,
199    end: Millisecond,
200    interval: Millisecond,
201    /// Schema that only contains the time index column.
202    /// This is for intermediate result only.
203    time_index_schema: SchemaRef,
204    /// Schema of the output record batch
205    result_schema: SchemaRef,
206    expr: Option<PhysicalExprRef>,
207    properties: Arc<PlanProperties>,
208    metric: ExecutionPlanMetricsSet,
209}
210
211impl ExecutionPlan for EmptyMetricExec {
212    fn as_any(&self) -> &dyn Any {
213        self
214    }
215
216    fn schema(&self) -> SchemaRef {
217        self.result_schema.clone()
218    }
219
220    fn properties(&self) -> &PlanProperties {
221        self.properties.as_ref()
222    }
223
224    fn maintains_input_order(&self) -> Vec<bool> {
225        vec![]
226    }
227
228    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229        vec![]
230    }
231
232    fn with_new_children(
233        self: Arc<Self>,
234        _children: Vec<Arc<dyn ExecutionPlan>>,
235    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
236        Ok(Arc::new(self.as_ref().clone()))
237    }
238
239    fn execute(
240        &self,
241        partition: usize,
242        _context: Arc<TaskContext>,
243    ) -> DataFusionResult<SendableRecordBatchStream> {
244        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
245        Ok(Box::pin(EmptyMetricStream {
246            start: self.start,
247            end: self.end,
248            interval: self.interval,
249            expr: self.expr.clone(),
250            is_first_poll: true,
251            time_index_schema: self.time_index_schema.clone(),
252            result_schema: self.result_schema.clone(),
253            metric: baseline_metric,
254        }))
255    }
256
257    fn metrics(&self) -> Option<MetricsSet> {
258        Some(self.metric.clone_inner())
259    }
260
261    fn statistics(&self) -> DataFusionResult<Statistics> {
262        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
263        let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
264
265        Ok(Statistics {
266            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
267            total_byte_size: Precision::Inexact(total_byte_size.floor() as _),
268            column_statistics: Statistics::unknown_column(&self.schema()),
269        })
270    }
271
272    fn name(&self) -> &str {
273        "EmptyMetricExec"
274    }
275}
276
277impl DisplayAs for EmptyMetricExec {
278    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
279        match t {
280            DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
281                f,
282                "EmptyMetric: range=[{}..{}], interval=[{}]",
283                self.start, self.end, self.interval,
284            ),
285        }
286    }
287}
288
289pub struct EmptyMetricStream {
290    start: Millisecond,
291    end: Millisecond,
292    interval: Millisecond,
293    expr: Option<PhysicalExprRef>,
294    /// This stream only generate one record batch at the first poll
295    is_first_poll: bool,
296    /// Schema that only contains the time index column.
297    /// This is for intermediate result only.
298    time_index_schema: SchemaRef,
299    /// Schema of the output record batch
300    result_schema: SchemaRef,
301    metric: BaselineMetrics,
302}
303
304impl RecordBatchStream for EmptyMetricStream {
305    fn schema(&self) -> SchemaRef {
306        self.result_schema.clone()
307    }
308}
309
310impl Stream for EmptyMetricStream {
311    type Item = DataFusionResult<RecordBatch>;
312
313    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314        let result = if self.is_first_poll {
315            self.is_first_poll = false;
316            let _timer = self.metric.elapsed_compute().timer();
317
318            // build the time index array, and a record batch that
319            // only contains that array as the input of field expr
320            let time_array = (self.start..=self.end)
321                .step_by(self.interval as _)
322                .collect::<Vec<_>>();
323            let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
324            let num_rows = time_array.len();
325            let input_record_batch =
326                RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
327                    .map_err(|e| DataFusionError::ArrowError(e, None))?;
328            let mut result_arrays: Vec<ArrayRef> = vec![time_array];
329
330            // evaluate the field expr and get the result
331            if let Some(field_expr) = &self.expr {
332                result_arrays.push(
333                    field_expr
334                        .evaluate(&input_record_batch)
335                        .and_then(|x| x.into_array(num_rows))?,
336                );
337            }
338
339            // assemble the output record batch
340            let batch = RecordBatch::try_new(self.result_schema.clone(), result_arrays)
341                .map_err(|e| DataFusionError::ArrowError(e, None));
342
343            Poll::Ready(Some(batch))
344        } else {
345            Poll::Ready(None)
346        };
347        self.metric.record_poll(result)
348    }
349}
350
351/// Build a schema that only contains **millisecond** timestamp column
352fn build_ts_only_schema(column_name: &str) -> DFSchema {
353    let ts_field = Field::new(
354        column_name,
355        DataType::Timestamp(TimeUnit::Millisecond, None),
356        false,
357    );
358    // safety: should not fail (UT covers this)
359    DFSchema::new_with_metadata(
360        vec![(Some(TableReference::bare("")), Arc::new(ts_field))],
361        HashMap::new(),
362    )
363    .unwrap()
364}
365
366// Convert timestamp column to UNIX epoch second:
367// https://prometheus.io/docs/prometheus/latest/querying/functions/#time
368pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
369    let input_schema = build_ts_only_schema(time_index_column_name);
370    // safety: should not failed (UT covers this)
371    col(time_index_column_name)
372        .cast_to(&DataType::Int64, &input_schema)
373        .unwrap()
374        .cast_to(&DataType::Float64, &input_schema)
375        .unwrap()
376        .div(lit(1000.0)) // cast to second will lost precision, so we cast to float64 first and manually divide by 1000
377}
378
379#[cfg(test)]
380mod test {
381    use datafusion::physical_planner::DefaultPhysicalPlanner;
382    use datafusion::prelude::SessionContext;
383
384    use super::*;
385
386    async fn do_empty_metric_test(
387        start: Millisecond,
388        end: Millisecond,
389        interval: Millisecond,
390        time_column_name: String,
391        field_column_name: String,
392        expected: String,
393    ) {
394        let session_context = SessionContext::default();
395        let df_default_physical_planner = DefaultPhysicalPlanner::default();
396        let time_expr = build_special_time_expr(&time_column_name);
397        let empty_metric = EmptyMetric::new(
398            start,
399            end,
400            interval,
401            time_column_name,
402            field_column_name,
403            Some(time_expr),
404        )
405        .unwrap();
406        let empty_metric_exec = empty_metric
407            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
408            .unwrap();
409
410        let result =
411            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
412                .await
413                .unwrap();
414        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
415            .unwrap()
416            .to_string();
417
418        assert_eq!(result_literal, expected);
419    }
420
421    #[tokio::test]
422    async fn normal_empty_metric_test() {
423        do_empty_metric_test(
424            0,
425            100,
426            10,
427            "time".to_string(),
428            "value".to_string(),
429            String::from(
430                "+-------------------------+-------+\
431                \n| time                    | value |\
432                \n+-------------------------+-------+\
433                \n| 1970-01-01T00:00:00     | 0.0   |\
434                \n| 1970-01-01T00:00:00.010 | 0.01  |\
435                \n| 1970-01-01T00:00:00.020 | 0.02  |\
436                \n| 1970-01-01T00:00:00.030 | 0.03  |\
437                \n| 1970-01-01T00:00:00.040 | 0.04  |\
438                \n| 1970-01-01T00:00:00.050 | 0.05  |\
439                \n| 1970-01-01T00:00:00.060 | 0.06  |\
440                \n| 1970-01-01T00:00:00.070 | 0.07  |\
441                \n| 1970-01-01T00:00:00.080 | 0.08  |\
442                \n| 1970-01-01T00:00:00.090 | 0.09  |\
443                \n| 1970-01-01T00:00:00.100 | 0.1   |\
444                \n+-------------------------+-------+",
445            ),
446        )
447        .await
448    }
449
450    #[tokio::test]
451    async fn unaligned_empty_metric_test() {
452        do_empty_metric_test(
453            0,
454            100,
455            11,
456            "time".to_string(),
457            "value".to_string(),
458            String::from(
459                "+-------------------------+-------+\
460                \n| time                    | value |\
461                \n+-------------------------+-------+\
462                \n| 1970-01-01T00:00:00     | 0.0   |\
463                \n| 1970-01-01T00:00:00.011 | 0.011 |\
464                \n| 1970-01-01T00:00:00.022 | 0.022 |\
465                \n| 1970-01-01T00:00:00.033 | 0.033 |\
466                \n| 1970-01-01T00:00:00.044 | 0.044 |\
467                \n| 1970-01-01T00:00:00.055 | 0.055 |\
468                \n| 1970-01-01T00:00:00.066 | 0.066 |\
469                \n| 1970-01-01T00:00:00.077 | 0.077 |\
470                \n| 1970-01-01T00:00:00.088 | 0.088 |\
471                \n| 1970-01-01T00:00:00.099 | 0.099 |\
472                \n+-------------------------+-------+",
473            ),
474        )
475        .await
476    }
477
478    #[tokio::test]
479    async fn one_row_empty_metric_test() {
480        do_empty_metric_test(
481            0,
482            100,
483            1000,
484            "time".to_string(),
485            "value".to_string(),
486            String::from(
487                "+---------------------+-------+\
488                \n| time                | value |\
489                \n+---------------------+-------+\
490                \n| 1970-01-01T00:00:00 | 0.0   |\
491                \n+---------------------+-------+",
492            ),
493        )
494        .await
495    }
496
497    #[tokio::test]
498    async fn negative_range_empty_metric_test() {
499        do_empty_metric_test(
500            1000,
501            -1000,
502            10,
503            "time".to_string(),
504            "value".to_string(),
505            String::from(
506                "+------+-------+\
507                \n| time | value |\
508                \n+------+-------+\
509                \n+------+-------+",
510            ),
511        )
512        .await
513    }
514
515    #[tokio::test]
516    async fn no_field_expr() {
517        let session_context = SessionContext::default();
518        let df_default_physical_planner = DefaultPhysicalPlanner::default();
519        let empty_metric =
520            EmptyMetric::new(0, 200, 1000, "time".to_string(), "value".to_string(), None).unwrap();
521        let empty_metric_exec = empty_metric
522            .to_execution_plan(&session_context.state(), &df_default_physical_planner)
523            .unwrap();
524
525        let result =
526            datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
527                .await
528                .unwrap();
529        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
530            .unwrap()
531            .to_string();
532
533        let expected = String::from(
534            "+---------------------+\
535            \n| time                |\
536            \n+---------------------+\
537            \n| 1970-01-01T00:00:00 |\
538            \n+---------------------+",
539        );
540        assert_eq!(result_literal, expected);
541    }
542}