promql/extension_plan/
absent.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27    EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34    RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::{EmptyRelation, col};
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52/// Maximum number of rows per output batch
53const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57    start: Millisecond,
58    end: Millisecond,
59    step: Millisecond,
60    time_index_column: String,
61    value_column: String,
62    fake_labels: Vec<(String, String)>,
63    input: LogicalPlan,
64    output_schema: DFSchemaRef,
65    unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70    pub time_index_column_idx: u64,
71    pub value_column_idx: u64,
72}
73
74impl PartialOrd for Absent {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        // compare on fields except schema and input
77        (
78            self.start,
79            self.end,
80            self.step,
81            &self.time_index_column,
82            &self.value_column,
83            &self.fake_labels,
84        )
85            .partial_cmp(&(
86                other.start,
87                other.end,
88                other.step,
89                &other.time_index_column,
90                &other.value_column,
91                &other.fake_labels,
92            ))
93    }
94}
95
96impl UserDefinedLogicalNodeCore for Absent {
97    fn name(&self) -> &str {
98        Self::name()
99    }
100
101    fn inputs(&self) -> Vec<&LogicalPlan> {
102        vec![&self.input]
103    }
104
105    fn schema(&self) -> &DFSchemaRef {
106        &self.output_schema
107    }
108
109    fn expressions(&self) -> Vec<Expr> {
110        if self.unfix.is_some() {
111            return vec![];
112        }
113
114        vec![col(&self.time_index_column)]
115    }
116
117    fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
118        if self.unfix.is_some() {
119            return None;
120        }
121
122        let input_schema = self.input.schema();
123        let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
124        Some(vec![vec![time_index_idx]])
125    }
126
127    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
128        write!(
129            f,
130            "PromAbsent: start={}, end={}, step={}",
131            self.start, self.end, self.step
132        )
133    }
134
135    fn with_exprs_and_inputs(
136        &self,
137        _exprs: Vec<Expr>,
138        inputs: Vec<LogicalPlan>,
139    ) -> DataFusionResult<Self> {
140        if inputs.is_empty() {
141            return Err(datafusion::error::DataFusionError::Internal(
142                "Absent must have at least one input".to_string(),
143            ));
144        }
145
146        let input: LogicalPlan = inputs[0].clone();
147        let input_schema = input.schema();
148
149        if let Some(unfix) = &self.unfix {
150            // transform indices to names
151            let time_index_column = resolve_column_name(
152                unfix.time_index_column_idx,
153                input_schema,
154                "Absent",
155                "time index",
156            )?;
157
158            let value_column =
159                resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
160
161            // Recreate output schema with actual field names
162            Self::try_new(
163                self.start,
164                self.end,
165                self.step,
166                time_index_column,
167                value_column,
168                self.fake_labels.clone(),
169                input,
170            )
171        } else {
172            Ok(Self {
173                start: self.start,
174                end: self.end,
175                step: self.step,
176                time_index_column: self.time_index_column.clone(),
177                value_column: self.value_column.clone(),
178                fake_labels: self.fake_labels.clone(),
179                input,
180                output_schema: self.output_schema.clone(),
181                unfix: None,
182            })
183        }
184    }
185}
186
187impl Absent {
188    pub fn try_new(
189        start: Millisecond,
190        end: Millisecond,
191        step: Millisecond,
192        time_index_column: String,
193        value_column: String,
194        fake_labels: Vec<(String, String)>,
195        input: LogicalPlan,
196    ) -> DataFusionResult<Self> {
197        let mut fields = vec![
198            Field::new(
199                &time_index_column,
200                DataType::Timestamp(TimeUnit::Millisecond, None),
201                true,
202            ),
203            Field::new(&value_column, DataType::Float64, true),
204        ];
205
206        // remove duplicate fake labels
207        let mut fake_labels = fake_labels
208            .into_iter()
209            .collect::<HashMap<String, String>>()
210            .into_iter()
211            .collect::<Vec<_>>();
212        fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
213        for (name, _) in fake_labels.iter() {
214            fields.push(Field::new(name, DataType::Utf8, true));
215        }
216
217        let output_schema = Arc::new(DFSchema::from_unqualified_fields(
218            fields.into(),
219            HashMap::new(),
220        )?);
221
222        Ok(Self {
223            start,
224            end,
225            step,
226            time_index_column,
227            value_column,
228            fake_labels,
229            input,
230            output_schema,
231            unfix: None,
232        })
233    }
234
235    pub const fn name() -> &'static str {
236        "prom_absent"
237    }
238
239    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
240        let output_schema = Arc::new(self.output_schema.as_arrow().clone());
241        let properties = PlanProperties::new(
242            EquivalenceProperties::new(output_schema.clone()),
243            Partitioning::UnknownPartitioning(1),
244            EmissionType::Incremental,
245            Boundedness::Bounded,
246        );
247        Arc::new(AbsentExec {
248            start: self.start,
249            end: self.end,
250            step: self.step,
251            time_index_column: self.time_index_column.clone(),
252            value_column: self.value_column.clone(),
253            fake_labels: self.fake_labels.clone(),
254            output_schema: output_schema.clone(),
255            input: exec_input,
256            properties,
257            metric: ExecutionPlanMetricsSet::new(),
258        })
259    }
260
261    pub fn serialize(&self) -> Vec<u8> {
262        let time_index_column_idx =
263            serialize_column_index(self.input.schema(), &self.time_index_column);
264
265        let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
266
267        pb::Absent {
268            start: self.start,
269            end: self.end,
270            step: self.step,
271            time_index_column_idx,
272            value_column_idx,
273            fake_labels: self
274                .fake_labels
275                .iter()
276                .map(|(name, value)| pb::LabelPair {
277                    key: name.clone(),
278                    value: value.clone(),
279                })
280                .collect(),
281            ..Default::default()
282        }
283        .encode_to_vec()
284    }
285
286    pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
287        let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
288        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
289            produce_one_row: false,
290            schema: Arc::new(DFSchema::empty()),
291        });
292
293        let unfix = UnfixIndices {
294            time_index_column_idx: pb_absent.time_index_column_idx,
295            value_column_idx: pb_absent.value_column_idx,
296        };
297
298        Ok(Self {
299            start: pb_absent.start,
300            end: pb_absent.end,
301            step: pb_absent.step,
302            time_index_column: String::new(),
303            value_column: String::new(),
304            fake_labels: pb_absent
305                .fake_labels
306                .iter()
307                .map(|label| (label.key.clone(), label.value.clone()))
308                .collect(),
309            input: placeholder_plan,
310            output_schema: Arc::new(DFSchema::empty()),
311            unfix: Some(unfix),
312        })
313    }
314}
315
316#[derive(Debug)]
317pub struct AbsentExec {
318    start: Millisecond,
319    end: Millisecond,
320    step: Millisecond,
321    time_index_column: String,
322    value_column: String,
323    fake_labels: Vec<(String, String)>,
324    output_schema: SchemaRef,
325    input: Arc<dyn ExecutionPlan>,
326    properties: PlanProperties,
327    metric: ExecutionPlanMetricsSet,
328}
329
330impl ExecutionPlan for AbsentExec {
331    fn as_any(&self) -> &dyn Any {
332        self
333    }
334
335    fn schema(&self) -> SchemaRef {
336        self.output_schema.clone()
337    }
338
339    fn properties(&self) -> &PlanProperties {
340        &self.properties
341    }
342
343    fn required_input_distribution(&self) -> Vec<Distribution> {
344        vec![Distribution::SinglePartition]
345    }
346
347    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
348        let requirement = LexRequirement::from([PhysicalSortRequirement {
349            expr: Arc::new(
350                ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
351            ),
352            options: Some(SortOptions {
353                descending: false,
354                nulls_first: false,
355            }),
356        }]);
357        vec![Some(OrderingRequirements::new(requirement))]
358    }
359
360    fn maintains_input_order(&self) -> Vec<bool> {
361        vec![false]
362    }
363
364    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
365        vec![&self.input]
366    }
367
368    fn with_new_children(
369        self: Arc<Self>,
370        children: Vec<Arc<dyn ExecutionPlan>>,
371    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
372        assert!(!children.is_empty());
373        Ok(Arc::new(Self {
374            start: self.start,
375            end: self.end,
376            step: self.step,
377            time_index_column: self.time_index_column.clone(),
378            value_column: self.value_column.clone(),
379            fake_labels: self.fake_labels.clone(),
380            output_schema: self.output_schema.clone(),
381            input: children[0].clone(),
382            properties: self.properties.clone(),
383            metric: self.metric.clone(),
384        }))
385    }
386
387    fn execute(
388        &self,
389        partition: usize,
390        context: Arc<TaskContext>,
391    ) -> DataFusionResult<SendableRecordBatchStream> {
392        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
393        let input = self.input.execute(partition, context)?;
394
395        Ok(Box::pin(AbsentStream {
396            end: self.end,
397            step: self.step,
398            time_index_column_index: self
399                .input
400                .schema()
401                .column_with_name(&self.time_index_column)
402                .unwrap() // Safety: we have checked the column name in `try_new`
403                .0,
404            output_schema: self.output_schema.clone(),
405            fake_labels: self.fake_labels.clone(),
406            input,
407            metric: baseline_metric,
408            // Buffer for streaming output timestamps
409            output_timestamps: Vec::new(),
410            // Current timestamp in the output range
411            output_ts_cursor: self.start,
412            input_finished: false,
413        }))
414    }
415
416    fn metrics(&self) -> Option<MetricsSet> {
417        Some(self.metric.clone_inner())
418    }
419
420    fn name(&self) -> &str {
421        "AbsentExec"
422    }
423}
424
425impl DisplayAs for AbsentExec {
426    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
427        match t {
428            DisplayFormatType::Default
429            | DisplayFormatType::Verbose
430            | DisplayFormatType::TreeRender => {
431                write!(
432                    f,
433                    "PromAbsentExec: start={}, end={}, step={}",
434                    self.start, self.end, self.step
435                )
436            }
437        }
438    }
439}
440
441pub struct AbsentStream {
442    end: Millisecond,
443    step: Millisecond,
444    time_index_column_index: usize,
445    output_schema: SchemaRef,
446    fake_labels: Vec<(String, String)>,
447    input: SendableRecordBatchStream,
448    metric: BaselineMetrics,
449    // Buffer for streaming output timestamps
450    output_timestamps: Vec<Millisecond>,
451    // Current timestamp in the output range
452    output_ts_cursor: Millisecond,
453    input_finished: bool,
454}
455
456impl RecordBatchStream for AbsentStream {
457    fn schema(&self) -> SchemaRef {
458        self.output_schema.clone()
459    }
460}
461
462impl Stream for AbsentStream {
463    type Item = DataFusionResult<RecordBatch>;
464
465    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
466        loop {
467            if !self.input_finished {
468                match ready!(self.input.poll_next_unpin(cx)) {
469                    Some(Ok(batch)) => {
470                        let timer = std::time::Instant::now();
471                        if let Err(e) = self.process_input_batch(&batch) {
472                            return Poll::Ready(Some(Err(e)));
473                        }
474                        self.metric.elapsed_compute().add_elapsed(timer);
475
476                        // If we have enough data for a batch, output it
477                        if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
478                            let timer = std::time::Instant::now();
479                            let result = self.flush_output_batch();
480                            self.metric.elapsed_compute().add_elapsed(timer);
481
482                            match result {
483                                Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
484                                Ok(None) => continue,
485                                Err(e) => return Poll::Ready(Some(Err(e))),
486                            }
487                        }
488                    }
489                    Some(Err(e)) => return Poll::Ready(Some(Err(e))),
490                    None => {
491                        self.input_finished = true;
492
493                        let timer = std::time::Instant::now();
494                        // Process any remaining absent timestamps
495                        if let Err(e) = self.process_remaining_absent_timestamps() {
496                            return Poll::Ready(Some(Err(e)));
497                        }
498                        let result = self.flush_output_batch();
499                        self.metric.elapsed_compute().add_elapsed(timer);
500                        return Poll::Ready(result.transpose());
501                    }
502                }
503            } else {
504                return Poll::Ready(None);
505            }
506        }
507    }
508}
509
510impl AbsentStream {
511    fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
512        // Extract timestamps from this batch
513        let timestamp_array = batch.column(self.time_index_column_index);
514        let milli_ts_array = arrow::compute::cast(
515            timestamp_array,
516            &DataType::Timestamp(TimeUnit::Millisecond, None),
517        )?;
518        let timestamp_array = milli_ts_array
519            .as_any()
520            .downcast_ref::<TimestampMillisecondArray>()
521            .unwrap();
522
523        // Process against current output cursor position
524        for &input_ts in timestamp_array.values() {
525            // Generate absent timestamps up to this input timestamp
526            while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
527                self.output_timestamps.push(self.output_ts_cursor);
528                self.output_ts_cursor += self.step;
529            }
530
531            // Skip the input timestamp if it matches our cursor
532            if self.output_ts_cursor == input_ts {
533                self.output_ts_cursor += self.step;
534            }
535        }
536
537        Ok(())
538    }
539
540    fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
541        // Generate all remaining absent timestamps (input is finished)
542        while self.output_ts_cursor <= self.end {
543            self.output_timestamps.push(self.output_ts_cursor);
544            self.output_ts_cursor += self.step;
545        }
546        Ok(())
547    }
548
549    fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
550        if self.output_timestamps.is_empty() {
551            return Ok(None);
552        }
553
554        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
555        let num_rows = self.output_timestamps.len();
556        columns.push(Arc::new(TimestampMillisecondArray::from(
557            self.output_timestamps.clone(),
558        )) as _);
559        columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
560
561        for (_, value) in self.fake_labels.iter() {
562            columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
563                Some(value.clone()),
564                num_rows,
565            ))) as _);
566        }
567
568        let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
569
570        self.output_timestamps.clear();
571        Ok(Some(batch))
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use std::sync::Arc;
578
579    use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
580    use datafusion::arrow::record_batch::RecordBatch;
581    use datafusion::catalog::memory::DataSourceExec;
582    use datafusion::datasource::memory::MemorySourceConfig;
583    use datafusion::prelude::SessionContext;
584    use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
585
586    use super::*;
587
588    #[tokio::test]
589    async fn test_absent_basic() {
590        let schema = Arc::new(Schema::new(vec![
591            Field::new(
592                "timestamp",
593                DataType::Timestamp(TimeUnit::Millisecond, None),
594                true,
595            ),
596            Field::new("value", DataType::Float64, true),
597        ]));
598
599        // Input has timestamps: 0, 2000, 4000
600        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
601        let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
602        let batch =
603            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
604
605        let memory_exec = DataSourceExec::new(Arc::new(
606            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
607        ));
608
609        let output_schema = Arc::new(Schema::new(vec![
610            Field::new(
611                "timestamp",
612                DataType::Timestamp(TimeUnit::Millisecond, None),
613                true,
614            ),
615            Field::new("value", DataType::Float64, true),
616        ]));
617
618        let absent_exec = AbsentExec {
619            start: 0,
620            end: 5000,
621            step: 1000,
622            time_index_column: "timestamp".to_string(),
623            value_column: "value".to_string(),
624            fake_labels: vec![],
625            output_schema: output_schema.clone(),
626            input: Arc::new(memory_exec),
627            properties: PlanProperties::new(
628                EquivalenceProperties::new(output_schema.clone()),
629                Partitioning::UnknownPartitioning(1),
630                EmissionType::Incremental,
631                Boundedness::Bounded,
632            ),
633            metric: ExecutionPlanMetricsSet::new(),
634        };
635
636        let session_ctx = SessionContext::new();
637        let task_ctx = session_ctx.task_ctx();
638        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
639
640        // Collect all output batches
641        let mut output_timestamps = Vec::new();
642        while let Some(batch_result) = stream.next().await {
643            let batch = batch_result.unwrap();
644            let ts_array = batch
645                .column(0)
646                .as_any()
647                .downcast_ref::<TimestampMillisecondArray>()
648                .unwrap();
649            for i in 0..ts_array.len() {
650                if !ts_array.is_null(i) {
651                    let ts = ts_array.value(i);
652                    output_timestamps.push(ts);
653                }
654            }
655        }
656
657        // Should output absent timestamps: 1000, 3000, 5000
658        // (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
659        assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
660    }
661
662    #[tokio::test]
663    async fn test_absent_empty_input() {
664        let schema = Arc::new(Schema::new(vec![
665            Field::new(
666                "timestamp",
667                DataType::Timestamp(TimeUnit::Millisecond, None),
668                true,
669            ),
670            Field::new("value", DataType::Float64, true),
671        ]));
672
673        // Empty input
674        let memory_exec = DataSourceExec::new(Arc::new(
675            MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
676        ));
677
678        let output_schema = Arc::new(Schema::new(vec![
679            Field::new(
680                "timestamp",
681                DataType::Timestamp(TimeUnit::Millisecond, None),
682                true,
683            ),
684            Field::new("value", DataType::Float64, true),
685        ]));
686        let absent_exec = AbsentExec {
687            start: 0,
688            end: 2000,
689            step: 1000,
690            time_index_column: "timestamp".to_string(),
691            value_column: "value".to_string(),
692            fake_labels: vec![],
693            output_schema: output_schema.clone(),
694            input: Arc::new(memory_exec),
695            properties: PlanProperties::new(
696                EquivalenceProperties::new(output_schema.clone()),
697                Partitioning::UnknownPartitioning(1),
698                EmissionType::Incremental,
699                Boundedness::Bounded,
700            ),
701            metric: ExecutionPlanMetricsSet::new(),
702        };
703
704        let session_ctx = SessionContext::new();
705        let task_ctx = session_ctx.task_ctx();
706        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
707
708        // Collect all output timestamps
709        let mut output_timestamps = Vec::new();
710        while let Some(batch_result) = stream.next().await {
711            let batch = batch_result.unwrap();
712            let ts_array = batch
713                .column(0)
714                .as_any()
715                .downcast_ref::<TimestampMillisecondArray>()
716                .unwrap();
717            for i in 0..ts_array.len() {
718                if !ts_array.is_null(i) {
719                    let ts = ts_array.value(i);
720                    output_timestamps.push(ts);
721                }
722            }
723        }
724
725        // Should output all timestamps in range: 0, 1000, 2000
726        assert_eq!(output_timestamps, vec![0, 1000, 2000]);
727    }
728}