promql/extension_plan/
absent.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27    EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34    RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::EmptyRelation;
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{ready, Stream, StreamExt};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::Millisecond;
51
52/// Maximum number of rows per output batch
53const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57    start: Millisecond,
58    end: Millisecond,
59    step: Millisecond,
60    time_index_column: String,
61    value_column: String,
62    fake_labels: Vec<(String, String)>,
63    input: LogicalPlan,
64    output_schema: DFSchemaRef,
65}
66
67impl PartialOrd for Absent {
68    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
69        // compare on fields except schema and input
70        (
71            self.start,
72            self.end,
73            self.step,
74            &self.time_index_column,
75            &self.value_column,
76            &self.fake_labels,
77        )
78            .partial_cmp(&(
79                other.start,
80                other.end,
81                other.step,
82                &other.time_index_column,
83                &other.value_column,
84                &other.fake_labels,
85            ))
86    }
87}
88
89impl UserDefinedLogicalNodeCore for Absent {
90    fn name(&self) -> &str {
91        Self::name()
92    }
93
94    fn inputs(&self) -> Vec<&LogicalPlan> {
95        vec![&self.input]
96    }
97
98    fn schema(&self) -> &DFSchemaRef {
99        &self.output_schema
100    }
101
102    fn expressions(&self) -> Vec<Expr> {
103        vec![]
104    }
105
106    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
107        write!(
108            f,
109            "PromAbsent: start={}, end={}, step={}",
110            self.start, self.end, self.step
111        )
112    }
113
114    fn with_exprs_and_inputs(
115        &self,
116        _exprs: Vec<Expr>,
117        inputs: Vec<LogicalPlan>,
118    ) -> DataFusionResult<Self> {
119        if inputs.is_empty() {
120            return Err(datafusion::error::DataFusionError::Internal(
121                "Absent must have at least one input".to_string(),
122            ));
123        }
124
125        Ok(Self {
126            start: self.start,
127            end: self.end,
128            step: self.step,
129            time_index_column: self.time_index_column.clone(),
130            value_column: self.value_column.clone(),
131            fake_labels: self.fake_labels.clone(),
132            input: inputs[0].clone(),
133            output_schema: self.output_schema.clone(),
134        })
135    }
136}
137
138impl Absent {
139    pub fn try_new(
140        start: Millisecond,
141        end: Millisecond,
142        step: Millisecond,
143        time_index_column: String,
144        value_column: String,
145        fake_labels: Vec<(String, String)>,
146        input: LogicalPlan,
147    ) -> DataFusionResult<Self> {
148        let mut fields = vec![
149            Field::new(
150                &time_index_column,
151                DataType::Timestamp(TimeUnit::Millisecond, None),
152                true,
153            ),
154            Field::new(&value_column, DataType::Float64, true),
155        ];
156
157        // remove duplicate fake labels
158        let mut fake_labels = fake_labels
159            .into_iter()
160            .collect::<HashMap<String, String>>()
161            .into_iter()
162            .collect::<Vec<_>>();
163        fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
164        for (name, _) in fake_labels.iter() {
165            fields.push(Field::new(name, DataType::Utf8, true));
166        }
167
168        let output_schema = Arc::new(DFSchema::from_unqualified_fields(
169            fields.into(),
170            HashMap::new(),
171        )?);
172
173        Ok(Self {
174            start,
175            end,
176            step,
177            time_index_column,
178            value_column,
179            fake_labels,
180            input,
181            output_schema,
182        })
183    }
184
185    pub const fn name() -> &'static str {
186        "prom_absent"
187    }
188
189    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
190        let output_schema = Arc::new(self.output_schema.as_arrow().clone());
191        let properties = PlanProperties::new(
192            EquivalenceProperties::new(output_schema.clone()),
193            Partitioning::UnknownPartitioning(1),
194            EmissionType::Incremental,
195            Boundedness::Bounded,
196        );
197        Arc::new(AbsentExec {
198            start: self.start,
199            end: self.end,
200            step: self.step,
201            time_index_column: self.time_index_column.clone(),
202            value_column: self.value_column.clone(),
203            fake_labels: self.fake_labels.clone(),
204            output_schema: output_schema.clone(),
205            input: exec_input,
206            properties,
207            metric: ExecutionPlanMetricsSet::new(),
208        })
209    }
210
211    pub fn serialize(&self) -> Vec<u8> {
212        pb::Absent {
213            start: self.start,
214            end: self.end,
215            step: self.step,
216            time_index_column: self.time_index_column.clone(),
217            value_column: self.value_column.clone(),
218            fake_labels: self
219                .fake_labels
220                .iter()
221                .map(|(name, value)| pb::LabelPair {
222                    key: name.clone(),
223                    value: value.clone(),
224                })
225                .collect(),
226        }
227        .encode_to_vec()
228    }
229
230    pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
231        let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
232        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
233            produce_one_row: false,
234            schema: Arc::new(DFSchema::empty()),
235        });
236        Self::try_new(
237            pb_absent.start,
238            pb_absent.end,
239            pb_absent.step,
240            pb_absent.time_index_column,
241            pb_absent.value_column,
242            pb_absent
243                .fake_labels
244                .iter()
245                .map(|label| (label.key.clone(), label.value.clone()))
246                .collect(),
247            placeholder_plan,
248        )
249    }
250}
251
252#[derive(Debug)]
253pub struct AbsentExec {
254    start: Millisecond,
255    end: Millisecond,
256    step: Millisecond,
257    time_index_column: String,
258    value_column: String,
259    fake_labels: Vec<(String, String)>,
260    output_schema: SchemaRef,
261    input: Arc<dyn ExecutionPlan>,
262    properties: PlanProperties,
263    metric: ExecutionPlanMetricsSet,
264}
265
266impl ExecutionPlan for AbsentExec {
267    fn as_any(&self) -> &dyn Any {
268        self
269    }
270
271    fn schema(&self) -> SchemaRef {
272        self.output_schema.clone()
273    }
274
275    fn properties(&self) -> &PlanProperties {
276        &self.properties
277    }
278
279    fn required_input_distribution(&self) -> Vec<Distribution> {
280        vec![Distribution::SinglePartition]
281    }
282
283    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
284        let requirement = LexRequirement::from([PhysicalSortRequirement {
285            expr: Arc::new(
286                ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
287            ),
288            options: Some(SortOptions {
289                descending: false,
290                nulls_first: false,
291            }),
292        }]);
293        vec![Some(OrderingRequirements::new(requirement))]
294    }
295
296    fn maintains_input_order(&self) -> Vec<bool> {
297        vec![false]
298    }
299
300    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
301        vec![&self.input]
302    }
303
304    fn with_new_children(
305        self: Arc<Self>,
306        children: Vec<Arc<dyn ExecutionPlan>>,
307    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
308        assert!(!children.is_empty());
309        Ok(Arc::new(Self {
310            start: self.start,
311            end: self.end,
312            step: self.step,
313            time_index_column: self.time_index_column.clone(),
314            value_column: self.value_column.clone(),
315            fake_labels: self.fake_labels.clone(),
316            output_schema: self.output_schema.clone(),
317            input: children[0].clone(),
318            properties: self.properties.clone(),
319            metric: self.metric.clone(),
320        }))
321    }
322
323    fn execute(
324        &self,
325        partition: usize,
326        context: Arc<TaskContext>,
327    ) -> DataFusionResult<SendableRecordBatchStream> {
328        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
329        let input = self.input.execute(partition, context)?;
330
331        Ok(Box::pin(AbsentStream {
332            end: self.end,
333            step: self.step,
334            time_index_column_index: self
335                .input
336                .schema()
337                .column_with_name(&self.time_index_column)
338                .unwrap() // Safety: we have checked the column name in `try_new`
339                .0,
340            output_schema: self.output_schema.clone(),
341            fake_labels: self.fake_labels.clone(),
342            input,
343            metric: baseline_metric,
344            // Buffer for streaming output timestamps
345            output_timestamps: Vec::new(),
346            // Current timestamp in the output range
347            output_ts_cursor: self.start,
348            input_finished: false,
349        }))
350    }
351
352    fn metrics(&self) -> Option<MetricsSet> {
353        Some(self.metric.clone_inner())
354    }
355
356    fn name(&self) -> &str {
357        "AbsentExec"
358    }
359}
360
361impl DisplayAs for AbsentExec {
362    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
363        match t {
364            DisplayFormatType::Default
365            | DisplayFormatType::Verbose
366            | DisplayFormatType::TreeRender => {
367                write!(
368                    f,
369                    "PromAbsentExec: start={}, end={}, step={}",
370                    self.start, self.end, self.step
371                )
372            }
373        }
374    }
375}
376
377pub struct AbsentStream {
378    end: Millisecond,
379    step: Millisecond,
380    time_index_column_index: usize,
381    output_schema: SchemaRef,
382    fake_labels: Vec<(String, String)>,
383    input: SendableRecordBatchStream,
384    metric: BaselineMetrics,
385    // Buffer for streaming output timestamps
386    output_timestamps: Vec<Millisecond>,
387    // Current timestamp in the output range
388    output_ts_cursor: Millisecond,
389    input_finished: bool,
390}
391
392impl RecordBatchStream for AbsentStream {
393    fn schema(&self) -> SchemaRef {
394        self.output_schema.clone()
395    }
396}
397
398impl Stream for AbsentStream {
399    type Item = DataFusionResult<RecordBatch>;
400
401    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
402        loop {
403            if !self.input_finished {
404                match ready!(self.input.poll_next_unpin(cx)) {
405                    Some(Ok(batch)) => {
406                        let timer = std::time::Instant::now();
407                        if let Err(e) = self.process_input_batch(&batch) {
408                            return Poll::Ready(Some(Err(e)));
409                        }
410                        self.metric.elapsed_compute().add_elapsed(timer);
411
412                        // If we have enough data for a batch, output it
413                        if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
414                            let timer = std::time::Instant::now();
415                            let result = self.flush_output_batch();
416                            self.metric.elapsed_compute().add_elapsed(timer);
417
418                            match result {
419                                Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
420                                Ok(None) => continue,
421                                Err(e) => return Poll::Ready(Some(Err(e))),
422                            }
423                        }
424                    }
425                    Some(Err(e)) => return Poll::Ready(Some(Err(e))),
426                    None => {
427                        self.input_finished = true;
428
429                        let timer = std::time::Instant::now();
430                        // Process any remaining absent timestamps
431                        if let Err(e) = self.process_remaining_absent_timestamps() {
432                            return Poll::Ready(Some(Err(e)));
433                        }
434                        let result = self.flush_output_batch();
435                        self.metric.elapsed_compute().add_elapsed(timer);
436                        return Poll::Ready(result.transpose());
437                    }
438                }
439            } else {
440                return Poll::Ready(None);
441            }
442        }
443    }
444}
445
446impl AbsentStream {
447    fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
448        // Extract timestamps from this batch
449        let timestamp_array = batch.column(self.time_index_column_index);
450        let milli_ts_array = arrow::compute::cast(
451            timestamp_array,
452            &DataType::Timestamp(TimeUnit::Millisecond, None),
453        )?;
454        let timestamp_array = milli_ts_array
455            .as_any()
456            .downcast_ref::<TimestampMillisecondArray>()
457            .unwrap();
458
459        // Process against current output cursor position
460        for &input_ts in timestamp_array.values() {
461            // Generate absent timestamps up to this input timestamp
462            while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
463                self.output_timestamps.push(self.output_ts_cursor);
464                self.output_ts_cursor += self.step;
465            }
466
467            // Skip the input timestamp if it matches our cursor
468            if self.output_ts_cursor == input_ts {
469                self.output_ts_cursor += self.step;
470            }
471        }
472
473        Ok(())
474    }
475
476    fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
477        // Generate all remaining absent timestamps (input is finished)
478        while self.output_ts_cursor <= self.end {
479            self.output_timestamps.push(self.output_ts_cursor);
480            self.output_ts_cursor += self.step;
481        }
482        Ok(())
483    }
484
485    fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
486        if self.output_timestamps.is_empty() {
487            return Ok(None);
488        }
489
490        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
491        let num_rows = self.output_timestamps.len();
492        columns.push(Arc::new(TimestampMillisecondArray::from(
493            self.output_timestamps.clone(),
494        )) as _);
495        columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
496
497        for (_, value) in self.fake_labels.iter() {
498            columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
499                Some(value.clone()),
500                num_rows,
501            ))) as _);
502        }
503
504        let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
505
506        self.output_timestamps.clear();
507        Ok(Some(batch))
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use std::sync::Arc;
514
515    use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
516    use datafusion::arrow::record_batch::RecordBatch;
517    use datafusion::catalog::memory::DataSourceExec;
518    use datafusion::datasource::memory::MemorySourceConfig;
519    use datafusion::prelude::SessionContext;
520    use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
521
522    use super::*;
523
524    #[tokio::test]
525    async fn test_absent_basic() {
526        let schema = Arc::new(Schema::new(vec![
527            Field::new(
528                "timestamp",
529                DataType::Timestamp(TimeUnit::Millisecond, None),
530                true,
531            ),
532            Field::new("value", DataType::Float64, true),
533        ]));
534
535        // Input has timestamps: 0, 2000, 4000
536        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
537        let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
538        let batch =
539            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
540
541        let memory_exec = DataSourceExec::new(Arc::new(
542            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
543        ));
544
545        let output_schema = Arc::new(Schema::new(vec![
546            Field::new(
547                "timestamp",
548                DataType::Timestamp(TimeUnit::Millisecond, None),
549                true,
550            ),
551            Field::new("value", DataType::Float64, true),
552        ]));
553
554        let absent_exec = AbsentExec {
555            start: 0,
556            end: 5000,
557            step: 1000,
558            time_index_column: "timestamp".to_string(),
559            value_column: "value".to_string(),
560            fake_labels: vec![],
561            output_schema: output_schema.clone(),
562            input: Arc::new(memory_exec),
563            properties: PlanProperties::new(
564                EquivalenceProperties::new(output_schema.clone()),
565                Partitioning::UnknownPartitioning(1),
566                EmissionType::Incremental,
567                Boundedness::Bounded,
568            ),
569            metric: ExecutionPlanMetricsSet::new(),
570        };
571
572        let session_ctx = SessionContext::new();
573        let task_ctx = session_ctx.task_ctx();
574        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
575
576        // Collect all output batches
577        let mut output_timestamps = Vec::new();
578        while let Some(batch_result) = stream.next().await {
579            let batch = batch_result.unwrap();
580            let ts_array = batch
581                .column(0)
582                .as_any()
583                .downcast_ref::<TimestampMillisecondArray>()
584                .unwrap();
585            for i in 0..ts_array.len() {
586                if !ts_array.is_null(i) {
587                    let ts = ts_array.value(i);
588                    output_timestamps.push(ts);
589                }
590            }
591        }
592
593        // Should output absent timestamps: 1000, 3000, 5000
594        // (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
595        assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
596    }
597
598    #[tokio::test]
599    async fn test_absent_empty_input() {
600        let schema = Arc::new(Schema::new(vec![
601            Field::new(
602                "timestamp",
603                DataType::Timestamp(TimeUnit::Millisecond, None),
604                true,
605            ),
606            Field::new("value", DataType::Float64, true),
607        ]));
608
609        // Empty input
610        let memory_exec = DataSourceExec::new(Arc::new(
611            MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
612        ));
613
614        let output_schema = Arc::new(Schema::new(vec![
615            Field::new(
616                "timestamp",
617                DataType::Timestamp(TimeUnit::Millisecond, None),
618                true,
619            ),
620            Field::new("value", DataType::Float64, true),
621        ]));
622        let absent_exec = AbsentExec {
623            start: 0,
624            end: 2000,
625            step: 1000,
626            time_index_column: "timestamp".to_string(),
627            value_column: "value".to_string(),
628            fake_labels: vec![],
629            output_schema: output_schema.clone(),
630            input: Arc::new(memory_exec),
631            properties: PlanProperties::new(
632                EquivalenceProperties::new(output_schema.clone()),
633                Partitioning::UnknownPartitioning(1),
634                EmissionType::Incremental,
635                Boundedness::Bounded,
636            ),
637            metric: ExecutionPlanMetricsSet::new(),
638        };
639
640        let session_ctx = SessionContext::new();
641        let task_ctx = session_ctx.task_ctx();
642        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
643
644        // Collect all output timestamps
645        let mut output_timestamps = Vec::new();
646        while let Some(batch_result) = stream.next().await {
647            let batch = batch_result.unwrap();
648            let ts_array = batch
649                .column(0)
650                .as_any()
651                .downcast_ref::<TimestampMillisecondArray>()
652                .unwrap();
653            for i in 0..ts_array.len() {
654                if !ts_array.is_null(i) {
655                    let ts = ts_array.value(i);
656                    output_timestamps.push(ts);
657                }
658            }
659        }
660
661        // Should output all timestamps in range: 0, 1000, 2000
662        assert_eq!(output_timestamps, vec![0, 1000, 2000]);
663    }
664}