promql/extension_plan/
absent.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27    EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34    RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::EmptyRelation;
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52/// Maximum number of rows per output batch
53const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57    start: Millisecond,
58    end: Millisecond,
59    step: Millisecond,
60    time_index_column: String,
61    value_column: String,
62    fake_labels: Vec<(String, String)>,
63    input: LogicalPlan,
64    output_schema: DFSchemaRef,
65    unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70    pub time_index_column_idx: u64,
71    pub value_column_idx: u64,
72}
73
74impl PartialOrd for Absent {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        // compare on fields except schema and input
77        (
78            self.start,
79            self.end,
80            self.step,
81            &self.time_index_column,
82            &self.value_column,
83            &self.fake_labels,
84        )
85            .partial_cmp(&(
86                other.start,
87                other.end,
88                other.step,
89                &other.time_index_column,
90                &other.value_column,
91                &other.fake_labels,
92            ))
93    }
94}
95
96impl UserDefinedLogicalNodeCore for Absent {
97    fn name(&self) -> &str {
98        Self::name()
99    }
100
101    fn inputs(&self) -> Vec<&LogicalPlan> {
102        vec![&self.input]
103    }
104
105    fn schema(&self) -> &DFSchemaRef {
106        &self.output_schema
107    }
108
109    fn expressions(&self) -> Vec<Expr> {
110        vec![]
111    }
112
113    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114        write!(
115            f,
116            "PromAbsent: start={}, end={}, step={}",
117            self.start, self.end, self.step
118        )
119    }
120
121    fn with_exprs_and_inputs(
122        &self,
123        _exprs: Vec<Expr>,
124        inputs: Vec<LogicalPlan>,
125    ) -> DataFusionResult<Self> {
126        if inputs.is_empty() {
127            return Err(datafusion::error::DataFusionError::Internal(
128                "Absent must have at least one input".to_string(),
129            ));
130        }
131
132        let input: LogicalPlan = inputs[0].clone();
133        let input_schema = input.schema();
134
135        if let Some(unfix) = &self.unfix {
136            // transform indices to names
137            let time_index_column = resolve_column_name(
138                unfix.time_index_column_idx,
139                input_schema,
140                "Absent",
141                "time index",
142            )?;
143
144            let value_column =
145                resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
146
147            // Recreate output schema with actual field names
148            Self::try_new(
149                self.start,
150                self.end,
151                self.step,
152                time_index_column,
153                value_column,
154                self.fake_labels.clone(),
155                input,
156            )
157        } else {
158            Ok(Self {
159                start: self.start,
160                end: self.end,
161                step: self.step,
162                time_index_column: self.time_index_column.clone(),
163                value_column: self.value_column.clone(),
164                fake_labels: self.fake_labels.clone(),
165                input,
166                output_schema: self.output_schema.clone(),
167                unfix: None,
168            })
169        }
170    }
171}
172
173impl Absent {
174    pub fn try_new(
175        start: Millisecond,
176        end: Millisecond,
177        step: Millisecond,
178        time_index_column: String,
179        value_column: String,
180        fake_labels: Vec<(String, String)>,
181        input: LogicalPlan,
182    ) -> DataFusionResult<Self> {
183        let mut fields = vec![
184            Field::new(
185                &time_index_column,
186                DataType::Timestamp(TimeUnit::Millisecond, None),
187                true,
188            ),
189            Field::new(&value_column, DataType::Float64, true),
190        ];
191
192        // remove duplicate fake labels
193        let mut fake_labels = fake_labels
194            .into_iter()
195            .collect::<HashMap<String, String>>()
196            .into_iter()
197            .collect::<Vec<_>>();
198        fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
199        for (name, _) in fake_labels.iter() {
200            fields.push(Field::new(name, DataType::Utf8, true));
201        }
202
203        let output_schema = Arc::new(DFSchema::from_unqualified_fields(
204            fields.into(),
205            HashMap::new(),
206        )?);
207
208        Ok(Self {
209            start,
210            end,
211            step,
212            time_index_column,
213            value_column,
214            fake_labels,
215            input,
216            output_schema,
217            unfix: None,
218        })
219    }
220
221    pub const fn name() -> &'static str {
222        "prom_absent"
223    }
224
225    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
226        let output_schema = Arc::new(self.output_schema.as_arrow().clone());
227        let properties = PlanProperties::new(
228            EquivalenceProperties::new(output_schema.clone()),
229            Partitioning::UnknownPartitioning(1),
230            EmissionType::Incremental,
231            Boundedness::Bounded,
232        );
233        Arc::new(AbsentExec {
234            start: self.start,
235            end: self.end,
236            step: self.step,
237            time_index_column: self.time_index_column.clone(),
238            value_column: self.value_column.clone(),
239            fake_labels: self.fake_labels.clone(),
240            output_schema: output_schema.clone(),
241            input: exec_input,
242            properties,
243            metric: ExecutionPlanMetricsSet::new(),
244        })
245    }
246
247    pub fn serialize(&self) -> Vec<u8> {
248        let time_index_column_idx =
249            serialize_column_index(self.input.schema(), &self.time_index_column);
250
251        let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
252
253        pb::Absent {
254            start: self.start,
255            end: self.end,
256            step: self.step,
257            time_index_column_idx,
258            value_column_idx,
259            fake_labels: self
260                .fake_labels
261                .iter()
262                .map(|(name, value)| pb::LabelPair {
263                    key: name.clone(),
264                    value: value.clone(),
265                })
266                .collect(),
267            ..Default::default()
268        }
269        .encode_to_vec()
270    }
271
272    pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
273        let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
274        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
275            produce_one_row: false,
276            schema: Arc::new(DFSchema::empty()),
277        });
278
279        let unfix = UnfixIndices {
280            time_index_column_idx: pb_absent.time_index_column_idx,
281            value_column_idx: pb_absent.value_column_idx,
282        };
283
284        Ok(Self {
285            start: pb_absent.start,
286            end: pb_absent.end,
287            step: pb_absent.step,
288            time_index_column: String::new(),
289            value_column: String::new(),
290            fake_labels: pb_absent
291                .fake_labels
292                .iter()
293                .map(|label| (label.key.clone(), label.value.clone()))
294                .collect(),
295            input: placeholder_plan,
296            output_schema: Arc::new(DFSchema::empty()),
297            unfix: Some(unfix),
298        })
299    }
300}
301
302#[derive(Debug)]
303pub struct AbsentExec {
304    start: Millisecond,
305    end: Millisecond,
306    step: Millisecond,
307    time_index_column: String,
308    value_column: String,
309    fake_labels: Vec<(String, String)>,
310    output_schema: SchemaRef,
311    input: Arc<dyn ExecutionPlan>,
312    properties: PlanProperties,
313    metric: ExecutionPlanMetricsSet,
314}
315
316impl ExecutionPlan for AbsentExec {
317    fn as_any(&self) -> &dyn Any {
318        self
319    }
320
321    fn schema(&self) -> SchemaRef {
322        self.output_schema.clone()
323    }
324
325    fn properties(&self) -> &PlanProperties {
326        &self.properties
327    }
328
329    fn required_input_distribution(&self) -> Vec<Distribution> {
330        vec![Distribution::SinglePartition]
331    }
332
333    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
334        let requirement = LexRequirement::from([PhysicalSortRequirement {
335            expr: Arc::new(
336                ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
337            ),
338            options: Some(SortOptions {
339                descending: false,
340                nulls_first: false,
341            }),
342        }]);
343        vec![Some(OrderingRequirements::new(requirement))]
344    }
345
346    fn maintains_input_order(&self) -> Vec<bool> {
347        vec![false]
348    }
349
350    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
351        vec![&self.input]
352    }
353
354    fn with_new_children(
355        self: Arc<Self>,
356        children: Vec<Arc<dyn ExecutionPlan>>,
357    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
358        assert!(!children.is_empty());
359        Ok(Arc::new(Self {
360            start: self.start,
361            end: self.end,
362            step: self.step,
363            time_index_column: self.time_index_column.clone(),
364            value_column: self.value_column.clone(),
365            fake_labels: self.fake_labels.clone(),
366            output_schema: self.output_schema.clone(),
367            input: children[0].clone(),
368            properties: self.properties.clone(),
369            metric: self.metric.clone(),
370        }))
371    }
372
373    fn execute(
374        &self,
375        partition: usize,
376        context: Arc<TaskContext>,
377    ) -> DataFusionResult<SendableRecordBatchStream> {
378        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
379        let input = self.input.execute(partition, context)?;
380
381        Ok(Box::pin(AbsentStream {
382            end: self.end,
383            step: self.step,
384            time_index_column_index: self
385                .input
386                .schema()
387                .column_with_name(&self.time_index_column)
388                .unwrap() // Safety: we have checked the column name in `try_new`
389                .0,
390            output_schema: self.output_schema.clone(),
391            fake_labels: self.fake_labels.clone(),
392            input,
393            metric: baseline_metric,
394            // Buffer for streaming output timestamps
395            output_timestamps: Vec::new(),
396            // Current timestamp in the output range
397            output_ts_cursor: self.start,
398            input_finished: false,
399        }))
400    }
401
402    fn metrics(&self) -> Option<MetricsSet> {
403        Some(self.metric.clone_inner())
404    }
405
406    fn name(&self) -> &str {
407        "AbsentExec"
408    }
409}
410
411impl DisplayAs for AbsentExec {
412    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
413        match t {
414            DisplayFormatType::Default
415            | DisplayFormatType::Verbose
416            | DisplayFormatType::TreeRender => {
417                write!(
418                    f,
419                    "PromAbsentExec: start={}, end={}, step={}",
420                    self.start, self.end, self.step
421                )
422            }
423        }
424    }
425}
426
427pub struct AbsentStream {
428    end: Millisecond,
429    step: Millisecond,
430    time_index_column_index: usize,
431    output_schema: SchemaRef,
432    fake_labels: Vec<(String, String)>,
433    input: SendableRecordBatchStream,
434    metric: BaselineMetrics,
435    // Buffer for streaming output timestamps
436    output_timestamps: Vec<Millisecond>,
437    // Current timestamp in the output range
438    output_ts_cursor: Millisecond,
439    input_finished: bool,
440}
441
442impl RecordBatchStream for AbsentStream {
443    fn schema(&self) -> SchemaRef {
444        self.output_schema.clone()
445    }
446}
447
448impl Stream for AbsentStream {
449    type Item = DataFusionResult<RecordBatch>;
450
451    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
452        loop {
453            if !self.input_finished {
454                match ready!(self.input.poll_next_unpin(cx)) {
455                    Some(Ok(batch)) => {
456                        let timer = std::time::Instant::now();
457                        if let Err(e) = self.process_input_batch(&batch) {
458                            return Poll::Ready(Some(Err(e)));
459                        }
460                        self.metric.elapsed_compute().add_elapsed(timer);
461
462                        // If we have enough data for a batch, output it
463                        if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
464                            let timer = std::time::Instant::now();
465                            let result = self.flush_output_batch();
466                            self.metric.elapsed_compute().add_elapsed(timer);
467
468                            match result {
469                                Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
470                                Ok(None) => continue,
471                                Err(e) => return Poll::Ready(Some(Err(e))),
472                            }
473                        }
474                    }
475                    Some(Err(e)) => return Poll::Ready(Some(Err(e))),
476                    None => {
477                        self.input_finished = true;
478
479                        let timer = std::time::Instant::now();
480                        // Process any remaining absent timestamps
481                        if let Err(e) = self.process_remaining_absent_timestamps() {
482                            return Poll::Ready(Some(Err(e)));
483                        }
484                        let result = self.flush_output_batch();
485                        self.metric.elapsed_compute().add_elapsed(timer);
486                        return Poll::Ready(result.transpose());
487                    }
488                }
489            } else {
490                return Poll::Ready(None);
491            }
492        }
493    }
494}
495
496impl AbsentStream {
497    fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
498        // Extract timestamps from this batch
499        let timestamp_array = batch.column(self.time_index_column_index);
500        let milli_ts_array = arrow::compute::cast(
501            timestamp_array,
502            &DataType::Timestamp(TimeUnit::Millisecond, None),
503        )?;
504        let timestamp_array = milli_ts_array
505            .as_any()
506            .downcast_ref::<TimestampMillisecondArray>()
507            .unwrap();
508
509        // Process against current output cursor position
510        for &input_ts in timestamp_array.values() {
511            // Generate absent timestamps up to this input timestamp
512            while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
513                self.output_timestamps.push(self.output_ts_cursor);
514                self.output_ts_cursor += self.step;
515            }
516
517            // Skip the input timestamp if it matches our cursor
518            if self.output_ts_cursor == input_ts {
519                self.output_ts_cursor += self.step;
520            }
521        }
522
523        Ok(())
524    }
525
526    fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
527        // Generate all remaining absent timestamps (input is finished)
528        while self.output_ts_cursor <= self.end {
529            self.output_timestamps.push(self.output_ts_cursor);
530            self.output_ts_cursor += self.step;
531        }
532        Ok(())
533    }
534
535    fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
536        if self.output_timestamps.is_empty() {
537            return Ok(None);
538        }
539
540        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
541        let num_rows = self.output_timestamps.len();
542        columns.push(Arc::new(TimestampMillisecondArray::from(
543            self.output_timestamps.clone(),
544        )) as _);
545        columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
546
547        for (_, value) in self.fake_labels.iter() {
548            columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
549                Some(value.clone()),
550                num_rows,
551            ))) as _);
552        }
553
554        let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
555
556        self.output_timestamps.clear();
557        Ok(Some(batch))
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use std::sync::Arc;
564
565    use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
566    use datafusion::arrow::record_batch::RecordBatch;
567    use datafusion::catalog::memory::DataSourceExec;
568    use datafusion::datasource::memory::MemorySourceConfig;
569    use datafusion::prelude::SessionContext;
570    use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
571
572    use super::*;
573
574    #[tokio::test]
575    async fn test_absent_basic() {
576        let schema = Arc::new(Schema::new(vec![
577            Field::new(
578                "timestamp",
579                DataType::Timestamp(TimeUnit::Millisecond, None),
580                true,
581            ),
582            Field::new("value", DataType::Float64, true),
583        ]));
584
585        // Input has timestamps: 0, 2000, 4000
586        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
587        let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
588        let batch =
589            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
590
591        let memory_exec = DataSourceExec::new(Arc::new(
592            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
593        ));
594
595        let output_schema = Arc::new(Schema::new(vec![
596            Field::new(
597                "timestamp",
598                DataType::Timestamp(TimeUnit::Millisecond, None),
599                true,
600            ),
601            Field::new("value", DataType::Float64, true),
602        ]));
603
604        let absent_exec = AbsentExec {
605            start: 0,
606            end: 5000,
607            step: 1000,
608            time_index_column: "timestamp".to_string(),
609            value_column: "value".to_string(),
610            fake_labels: vec![],
611            output_schema: output_schema.clone(),
612            input: Arc::new(memory_exec),
613            properties: PlanProperties::new(
614                EquivalenceProperties::new(output_schema.clone()),
615                Partitioning::UnknownPartitioning(1),
616                EmissionType::Incremental,
617                Boundedness::Bounded,
618            ),
619            metric: ExecutionPlanMetricsSet::new(),
620        };
621
622        let session_ctx = SessionContext::new();
623        let task_ctx = session_ctx.task_ctx();
624        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
625
626        // Collect all output batches
627        let mut output_timestamps = Vec::new();
628        while let Some(batch_result) = stream.next().await {
629            let batch = batch_result.unwrap();
630            let ts_array = batch
631                .column(0)
632                .as_any()
633                .downcast_ref::<TimestampMillisecondArray>()
634                .unwrap();
635            for i in 0..ts_array.len() {
636                if !ts_array.is_null(i) {
637                    let ts = ts_array.value(i);
638                    output_timestamps.push(ts);
639                }
640            }
641        }
642
643        // Should output absent timestamps: 1000, 3000, 5000
644        // (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
645        assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
646    }
647
648    #[tokio::test]
649    async fn test_absent_empty_input() {
650        let schema = Arc::new(Schema::new(vec![
651            Field::new(
652                "timestamp",
653                DataType::Timestamp(TimeUnit::Millisecond, None),
654                true,
655            ),
656            Field::new("value", DataType::Float64, true),
657        ]));
658
659        // Empty input
660        let memory_exec = DataSourceExec::new(Arc::new(
661            MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
662        ));
663
664        let output_schema = Arc::new(Schema::new(vec![
665            Field::new(
666                "timestamp",
667                DataType::Timestamp(TimeUnit::Millisecond, None),
668                true,
669            ),
670            Field::new("value", DataType::Float64, true),
671        ]));
672        let absent_exec = AbsentExec {
673            start: 0,
674            end: 2000,
675            step: 1000,
676            time_index_column: "timestamp".to_string(),
677            value_column: "value".to_string(),
678            fake_labels: vec![],
679            output_schema: output_schema.clone(),
680            input: Arc::new(memory_exec),
681            properties: PlanProperties::new(
682                EquivalenceProperties::new(output_schema.clone()),
683                Partitioning::UnknownPartitioning(1),
684                EmissionType::Incremental,
685                Boundedness::Bounded,
686            ),
687            metric: ExecutionPlanMetricsSet::new(),
688        };
689
690        let session_ctx = SessionContext::new();
691        let task_ctx = session_ctx.task_ctx();
692        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
693
694        // Collect all output timestamps
695        let mut output_timestamps = Vec::new();
696        while let Some(batch_result) = stream.next().await {
697            let batch = batch_result.unwrap();
698            let ts_array = batch
699                .column(0)
700                .as_any()
701                .downcast_ref::<TimestampMillisecondArray>()
702                .unwrap();
703            for i in 0..ts_array.len() {
704                if !ts_array.is_null(i) {
705                    let ts = ts_array.value(i);
706                    output_timestamps.push(ts);
707                }
708            }
709        }
710
711        // Should output all timestamps in range: 0, 1000, 2000
712        assert_eq!(output_timestamps, vec![0, 1000, 2000]);
713    }
714}