Skip to main content

promql/extension_plan/
absent.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27    EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34    RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::{EmptyRelation, col};
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52#[derive(Debug, PartialEq, Eq, Hash)]
53pub struct Absent {
54    start: Millisecond,
55    end: Millisecond,
56    step: Millisecond,
57    time_index_column: String,
58    value_column: String,
59    fake_labels: Vec<(String, String)>,
60    input: LogicalPlan,
61    output_schema: DFSchemaRef,
62    unfix: Option<UnfixIndices>,
63}
64
65#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
66struct UnfixIndices {
67    pub time_index_column_idx: u64,
68    pub value_column_idx: u64,
69}
70
71impl PartialOrd for Absent {
72    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73        // compare on fields except schema and input
74        (
75            self.start,
76            self.end,
77            self.step,
78            &self.time_index_column,
79            &self.value_column,
80            &self.fake_labels,
81        )
82            .partial_cmp(&(
83                other.start,
84                other.end,
85                other.step,
86                &other.time_index_column,
87                &other.value_column,
88                &other.fake_labels,
89            ))
90    }
91}
92
93impl UserDefinedLogicalNodeCore for Absent {
94    fn name(&self) -> &str {
95        Self::name()
96    }
97
98    fn inputs(&self) -> Vec<&LogicalPlan> {
99        vec![&self.input]
100    }
101
102    fn schema(&self) -> &DFSchemaRef {
103        &self.output_schema
104    }
105
106    fn expressions(&self) -> Vec<Expr> {
107        if self.unfix.is_some() {
108            return vec![];
109        }
110
111        vec![col(&self.time_index_column)]
112    }
113
114    fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
115        if self.unfix.is_some() {
116            return None;
117        }
118
119        let input_schema = self.input.schema();
120        let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
121        Some(vec![vec![time_index_idx]])
122    }
123
124    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125        write!(
126            f,
127            "PromAbsent: start={}, end={}, step={}",
128            self.start, self.end, self.step
129        )
130    }
131
132    fn with_exprs_and_inputs(
133        &self,
134        _exprs: Vec<Expr>,
135        inputs: Vec<LogicalPlan>,
136    ) -> DataFusionResult<Self> {
137        if inputs.is_empty() {
138            return Err(datafusion::error::DataFusionError::Internal(
139                "Absent must have at least one input".to_string(),
140            ));
141        }
142
143        let input: LogicalPlan = inputs[0].clone();
144        let input_schema = input.schema();
145
146        if let Some(unfix) = &self.unfix {
147            // transform indices to names
148            let time_index_column = resolve_column_name(
149                unfix.time_index_column_idx,
150                input_schema,
151                "Absent",
152                "time index",
153            )?;
154
155            let value_column =
156                resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
157
158            // Recreate output schema with actual field names
159            Self::try_new(
160                self.start,
161                self.end,
162                self.step,
163                time_index_column,
164                value_column,
165                self.fake_labels.clone(),
166                input,
167            )
168        } else {
169            Ok(Self {
170                start: self.start,
171                end: self.end,
172                step: self.step,
173                time_index_column: self.time_index_column.clone(),
174                value_column: self.value_column.clone(),
175                fake_labels: self.fake_labels.clone(),
176                input,
177                output_schema: self.output_schema.clone(),
178                unfix: None,
179            })
180        }
181    }
182}
183
184impl Absent {
185    pub fn try_new(
186        start: Millisecond,
187        end: Millisecond,
188        step: Millisecond,
189        time_index_column: String,
190        value_column: String,
191        fake_labels: Vec<(String, String)>,
192        input: LogicalPlan,
193    ) -> DataFusionResult<Self> {
194        let mut fields = vec![
195            Field::new(
196                &time_index_column,
197                DataType::Timestamp(TimeUnit::Millisecond, None),
198                true,
199            ),
200            Field::new(&value_column, DataType::Float64, true),
201        ];
202
203        // remove duplicate fake labels
204        let mut fake_labels = fake_labels
205            .into_iter()
206            .collect::<HashMap<String, String>>()
207            .into_iter()
208            .collect::<Vec<_>>();
209        fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
210        for (name, _) in fake_labels.iter() {
211            fields.push(Field::new(name, DataType::Utf8, true));
212        }
213
214        let output_schema = Arc::new(DFSchema::from_unqualified_fields(
215            fields.into(),
216            HashMap::new(),
217        )?);
218
219        Ok(Self {
220            start,
221            end,
222            step,
223            time_index_column,
224            value_column,
225            fake_labels,
226            input,
227            output_schema,
228            unfix: None,
229        })
230    }
231
232    pub const fn name() -> &'static str {
233        "prom_absent"
234    }
235
236    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
237        let output_schema = Arc::new(self.output_schema.as_arrow().clone());
238        let properties = Arc::new(PlanProperties::new(
239            EquivalenceProperties::new(output_schema.clone()),
240            Partitioning::UnknownPartitioning(1),
241            EmissionType::Incremental,
242            Boundedness::Bounded,
243        ));
244        Arc::new(AbsentExec {
245            start: self.start,
246            end: self.end,
247            step: self.step,
248            time_index_column: self.time_index_column.clone(),
249            value_column: self.value_column.clone(),
250            fake_labels: self.fake_labels.clone(),
251            output_schema: output_schema.clone(),
252            input: exec_input,
253            properties,
254            metric: ExecutionPlanMetricsSet::new(),
255        })
256    }
257
258    pub fn serialize(&self) -> Vec<u8> {
259        let time_index_column_idx =
260            serialize_column_index(self.input.schema(), &self.time_index_column);
261
262        let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
263
264        pb::Absent {
265            start: self.start,
266            end: self.end,
267            step: self.step,
268            time_index_column_idx,
269            value_column_idx,
270            fake_labels: self
271                .fake_labels
272                .iter()
273                .map(|(name, value)| pb::LabelPair {
274                    key: name.clone(),
275                    value: value.clone(),
276                })
277                .collect(),
278            ..Default::default()
279        }
280        .encode_to_vec()
281    }
282
283    pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
284        let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
285        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
286            produce_one_row: false,
287            schema: Arc::new(DFSchema::empty()),
288        });
289
290        let unfix = UnfixIndices {
291            time_index_column_idx: pb_absent.time_index_column_idx,
292            value_column_idx: pb_absent.value_column_idx,
293        };
294
295        Ok(Self {
296            start: pb_absent.start,
297            end: pb_absent.end,
298            step: pb_absent.step,
299            time_index_column: String::new(),
300            value_column: String::new(),
301            fake_labels: pb_absent
302                .fake_labels
303                .iter()
304                .map(|label| (label.key.clone(), label.value.clone()))
305                .collect(),
306            input: placeholder_plan,
307            output_schema: Arc::new(DFSchema::empty()),
308            unfix: Some(unfix),
309        })
310    }
311}
312
313#[derive(Debug)]
314pub struct AbsentExec {
315    start: Millisecond,
316    end: Millisecond,
317    step: Millisecond,
318    time_index_column: String,
319    value_column: String,
320    fake_labels: Vec<(String, String)>,
321    output_schema: SchemaRef,
322    input: Arc<dyn ExecutionPlan>,
323    properties: Arc<PlanProperties>,
324    metric: ExecutionPlanMetricsSet,
325}
326
327impl ExecutionPlan for AbsentExec {
328    fn as_any(&self) -> &dyn Any {
329        self
330    }
331
332    fn schema(&self) -> SchemaRef {
333        self.output_schema.clone()
334    }
335
336    fn properties(&self) -> &Arc<PlanProperties> {
337        &self.properties
338    }
339
340    fn required_input_distribution(&self) -> Vec<Distribution> {
341        vec![Distribution::SinglePartition]
342    }
343
344    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
345        let requirement = LexRequirement::from([PhysicalSortRequirement {
346            expr: Arc::new(
347                ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
348            ),
349            options: Some(SortOptions {
350                descending: false,
351                nulls_first: false,
352            }),
353        }]);
354        vec![Some(OrderingRequirements::new(requirement))]
355    }
356
357    fn maintains_input_order(&self) -> Vec<bool> {
358        vec![false]
359    }
360
361    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
362        vec![&self.input]
363    }
364
365    fn with_new_children(
366        self: Arc<Self>,
367        children: Vec<Arc<dyn ExecutionPlan>>,
368    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
369        assert!(!children.is_empty());
370        Ok(Arc::new(Self {
371            start: self.start,
372            end: self.end,
373            step: self.step,
374            time_index_column: self.time_index_column.clone(),
375            value_column: self.value_column.clone(),
376            fake_labels: self.fake_labels.clone(),
377            output_schema: self.output_schema.clone(),
378            input: children[0].clone(),
379            properties: self.properties.clone(),
380            metric: self.metric.clone(),
381        }))
382    }
383
384    fn execute(
385        &self,
386        partition: usize,
387        context: Arc<TaskContext>,
388    ) -> DataFusionResult<SendableRecordBatchStream> {
389        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
390        let batch_size = context.session_config().batch_size();
391        let input = self.input.execute(partition, context)?;
392
393        Ok(Box::pin(AbsentStream {
394            end: self.end,
395            step: self.step,
396            batch_size,
397            time_index_column_index: self
398                .input
399                .schema()
400                .column_with_name(&self.time_index_column)
401                .unwrap() // Safety: we have checked the column name in `try_new`
402                .0,
403            output_schema: self.output_schema.clone(),
404            fake_labels: self.fake_labels.clone(),
405            input,
406            metric: baseline_metric,
407            // Buffer for streaming output timestamps
408            output_timestamps: Vec::new(),
409            input_timestamps: Vec::new(),
410            input_timestamp_offset: 0,
411            // Current timestamp in the output range
412            output_ts_cursor: self.start,
413            input_finished: false,
414        }))
415    }
416
417    fn metrics(&self) -> Option<MetricsSet> {
418        Some(self.metric.clone_inner())
419    }
420
421    fn name(&self) -> &str {
422        "AbsentExec"
423    }
424}
425
426impl DisplayAs for AbsentExec {
427    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
428        match t {
429            DisplayFormatType::Default
430            | DisplayFormatType::Verbose
431            | DisplayFormatType::TreeRender => {
432                write!(
433                    f,
434                    "PromAbsentExec: start={}, end={}, step={}",
435                    self.start, self.end, self.step
436                )
437            }
438        }
439    }
440}
441
442pub struct AbsentStream {
443    end: Millisecond,
444    step: Millisecond,
445    batch_size: usize,
446    time_index_column_index: usize,
447    output_schema: SchemaRef,
448    fake_labels: Vec<(String, String)>,
449    input: SendableRecordBatchStream,
450    metric: BaselineMetrics,
451    // Buffer for streaming output timestamps
452    output_timestamps: Vec<Millisecond>,
453    // Current input timestamps being processed incrementally.
454    input_timestamps: Vec<Millisecond>,
455    input_timestamp_offset: usize,
456    // Current timestamp in the output range
457    output_ts_cursor: Millisecond,
458    input_finished: bool,
459}
460
461impl RecordBatchStream for AbsentStream {
462    fn schema(&self) -> SchemaRef {
463        self.output_schema.clone()
464    }
465}
466
467impl Stream for AbsentStream {
468    type Item = DataFusionResult<RecordBatch>;
469
470    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
471        loop {
472            if self.has_pending_input_timestamps() {
473                let timer = std::time::Instant::now();
474                if let Err(e) = self.process_input_batch() {
475                    return Poll::Ready(Some(Err(e)));
476                }
477                self.metric.elapsed_compute().add_elapsed(timer);
478
479                match self.flush_output_batch() {
480                    Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
481                    Ok(None) => continue,
482                    Err(e) => return Poll::Ready(Some(Err(e))),
483                }
484            }
485
486            if self.input_finished {
487                let timer = std::time::Instant::now();
488                if let Err(e) = self.process_remaining_absent_timestamps() {
489                    return Poll::Ready(Some(Err(e)));
490                }
491                self.metric.elapsed_compute().add_elapsed(timer);
492
493                match self.flush_output_batch() {
494                    Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
495                    Ok(None) => return Poll::Ready(None),
496                    Err(e) => return Poll::Ready(Some(Err(e))),
497                }
498            }
499
500            match ready!(self.input.poll_next_unpin(cx)) {
501                Some(Ok(batch)) => {
502                    let timer = std::time::Instant::now();
503                    if let Err(e) = self.buffer_input_timestamps(&batch) {
504                        return Poll::Ready(Some(Err(e)));
505                    }
506                    self.metric.elapsed_compute().add_elapsed(timer);
507                }
508                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
509                None => {
510                    self.input_finished = true;
511                }
512            }
513        }
514    }
515}
516
517impl AbsentStream {
518    fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
519        let timestamp_array = batch.column(self.time_index_column_index);
520        let milli_ts_array = arrow::compute::cast(
521            timestamp_array,
522            &DataType::Timestamp(TimeUnit::Millisecond, None),
523        )?;
524        let timestamp_array = milli_ts_array
525            .as_any()
526            .downcast_ref::<TimestampMillisecondArray>()
527            .unwrap();
528        self.input_timestamps.clear();
529        self.input_timestamps
530            .extend_from_slice(timestamp_array.values());
531        self.input_timestamp_offset = 0;
532        Ok(())
533    }
534
535    fn has_pending_input_timestamps(&self) -> bool {
536        self.input_timestamp_offset < self.input_timestamps.len()
537    }
538
539    fn process_input_batch(&mut self) -> DataFusionResult<()> {
540        while self.input_timestamp_offset < self.input_timestamps.len() {
541            let input_ts = self.input_timestamps[self.input_timestamp_offset];
542
543            // Generate absent timestamps up to this input timestamp
544            while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
545                self.output_timestamps.push(self.output_ts_cursor);
546                self.output_ts_cursor += self.step;
547
548                if self.output_timestamps.len() >= self.batch_size {
549                    return Ok(());
550                }
551            }
552
553            // Skip the input timestamp if it matches our cursor
554            if self.output_ts_cursor == input_ts {
555                self.output_ts_cursor += self.step;
556            }
557
558            self.input_timestamp_offset += 1;
559        }
560
561        self.input_timestamps.clear();
562        self.input_timestamp_offset = 0;
563        Ok(())
564    }
565
566    fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
567        while self.output_ts_cursor <= self.end {
568            self.output_timestamps.push(self.output_ts_cursor);
569            self.output_ts_cursor += self.step;
570
571            if self.output_timestamps.len() >= self.batch_size {
572                return Ok(());
573            }
574        }
575        Ok(())
576    }
577
578    fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
579        if self.output_timestamps.is_empty() {
580            return Ok(None);
581        }
582
583        let timestamps = if self.output_timestamps.len() <= self.batch_size {
584            std::mem::take(&mut self.output_timestamps)
585        } else {
586            let remaining = self.output_timestamps.split_off(self.batch_size);
587            std::mem::replace(&mut self.output_timestamps, remaining)
588        };
589
590        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
591        let num_rows = timestamps.len();
592        columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _);
593        columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
594
595        for (_, value) in self.fake_labels.iter() {
596            columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
597                Some(value.clone()),
598                num_rows,
599            ))) as _);
600        }
601
602        let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
603
604        Ok(Some(batch))
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use std::sync::Arc;
611
612    use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
613    use datafusion::arrow::record_batch::RecordBatch;
614    use datafusion::catalog::memory::DataSourceExec;
615    use datafusion::datasource::memory::MemorySourceConfig;
616    use datafusion::prelude::{SessionConfig, SessionContext};
617    use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
618
619    use super::*;
620
621    #[tokio::test]
622    async fn test_absent_basic() {
623        let schema = Arc::new(Schema::new(vec![
624            Field::new(
625                "timestamp",
626                DataType::Timestamp(TimeUnit::Millisecond, None),
627                true,
628            ),
629            Field::new("value", DataType::Float64, true),
630        ]));
631
632        // Input has timestamps: 0, 2000, 4000
633        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
634        let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
635        let batch =
636            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
637
638        let memory_exec = DataSourceExec::new(Arc::new(
639            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
640        ));
641
642        let output_schema = Arc::new(Schema::new(vec![
643            Field::new(
644                "timestamp",
645                DataType::Timestamp(TimeUnit::Millisecond, None),
646                true,
647            ),
648            Field::new("value", DataType::Float64, true),
649        ]));
650
651        let absent_exec = AbsentExec {
652            start: 0,
653            end: 5000,
654            step: 1000,
655            time_index_column: "timestamp".to_string(),
656            value_column: "value".to_string(),
657            fake_labels: vec![],
658            output_schema: output_schema.clone(),
659            input: Arc::new(memory_exec),
660            properties: Arc::new(PlanProperties::new(
661                EquivalenceProperties::new(output_schema.clone()),
662                Partitioning::UnknownPartitioning(1),
663                EmissionType::Incremental,
664                Boundedness::Bounded,
665            )),
666            metric: ExecutionPlanMetricsSet::new(),
667        };
668
669        let session_ctx = SessionContext::new();
670        let task_ctx = session_ctx.task_ctx();
671        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
672
673        // Collect all output batches
674        let mut output_timestamps = Vec::new();
675        while let Some(batch_result) = stream.next().await {
676            let batch = batch_result.unwrap();
677            let ts_array = batch
678                .column(0)
679                .as_any()
680                .downcast_ref::<TimestampMillisecondArray>()
681                .unwrap();
682            for i in 0..ts_array.len() {
683                if !ts_array.is_null(i) {
684                    let ts = ts_array.value(i);
685                    output_timestamps.push(ts);
686                }
687            }
688        }
689
690        // Should output absent timestamps: 1000, 3000, 5000
691        // (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
692        assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
693    }
694
695    #[tokio::test]
696    async fn test_absent_empty_input() {
697        let schema = Arc::new(Schema::new(vec![
698            Field::new(
699                "timestamp",
700                DataType::Timestamp(TimeUnit::Millisecond, None),
701                true,
702            ),
703            Field::new("value", DataType::Float64, true),
704        ]));
705
706        // Empty input
707        let memory_exec = DataSourceExec::new(Arc::new(
708            MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
709        ));
710
711        let output_schema = Arc::new(Schema::new(vec![
712            Field::new(
713                "timestamp",
714                DataType::Timestamp(TimeUnit::Millisecond, None),
715                true,
716            ),
717            Field::new("value", DataType::Float64, true),
718        ]));
719        let absent_exec = AbsentExec {
720            start: 0,
721            end: 2000,
722            step: 1000,
723            time_index_column: "timestamp".to_string(),
724            value_column: "value".to_string(),
725            fake_labels: vec![],
726            output_schema: output_schema.clone(),
727            input: Arc::new(memory_exec),
728            properties: Arc::new(PlanProperties::new(
729                EquivalenceProperties::new(output_schema.clone()),
730                Partitioning::UnknownPartitioning(1),
731                EmissionType::Incremental,
732                Boundedness::Bounded,
733            )),
734            metric: ExecutionPlanMetricsSet::new(),
735        };
736
737        let session_ctx = SessionContext::new();
738        let task_ctx = session_ctx.task_ctx();
739        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
740
741        // Collect all output timestamps
742        let mut output_timestamps = Vec::new();
743        while let Some(batch_result) = stream.next().await {
744            let batch = batch_result.unwrap();
745            let ts_array = batch
746                .column(0)
747                .as_any()
748                .downcast_ref::<TimestampMillisecondArray>()
749                .unwrap();
750            for i in 0..ts_array.len() {
751                if !ts_array.is_null(i) {
752                    let ts = ts_array.value(i);
753                    output_timestamps.push(ts);
754                }
755            }
756        }
757
758        // Should output all timestamps in range: 0, 1000, 2000
759        assert_eq!(output_timestamps, vec![0, 1000, 2000]);
760    }
761
762    #[tokio::test]
763    async fn test_absent_respects_session_batch_size_for_large_gap() {
764        let schema = Arc::new(Schema::new(vec![
765            Field::new(
766                "timestamp",
767                DataType::Timestamp(TimeUnit::Millisecond, None),
768                true,
769            ),
770            Field::new("value", DataType::Float64, true),
771        ]));
772
773        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
774        let value_array = Arc::new(Float64Array::from(vec![1.0]));
775        let batch =
776            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
777
778        let memory_exec = DataSourceExec::new(Arc::new(
779            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
780        ));
781
782        let output_schema = Arc::new(Schema::new(vec![
783            Field::new(
784                "timestamp",
785                DataType::Timestamp(TimeUnit::Millisecond, None),
786                true,
787            ),
788            Field::new("value", DataType::Float64, true),
789        ]));
790
791        let absent_exec = AbsentExec {
792            start: 0,
793            end: 10,
794            step: 1,
795            time_index_column: "timestamp".to_string(),
796            value_column: "value".to_string(),
797            fake_labels: vec![],
798            output_schema: output_schema.clone(),
799            input: Arc::new(memory_exec),
800            properties: Arc::new(PlanProperties::new(
801                EquivalenceProperties::new(output_schema.clone()),
802                Partitioning::UnknownPartitioning(1),
803                EmissionType::Incremental,
804                Boundedness::Bounded,
805            )),
806            metric: ExecutionPlanMetricsSet::new(),
807        };
808
809        let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
810        let task_ctx = session_ctx.task_ctx();
811        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
812
813        let mut batch_sizes = Vec::new();
814        let mut output_timestamps = Vec::new();
815        while let Some(batch_result) = stream.next().await {
816            let batch = batch_result.unwrap();
817            batch_sizes.push(batch.num_rows());
818
819            let ts_array = batch
820                .column(0)
821                .as_any()
822                .downcast_ref::<TimestampMillisecondArray>()
823                .unwrap();
824            for i in 0..ts_array.len() {
825                if !ts_array.is_null(i) {
826                    output_timestamps.push(ts_array.value(i));
827                }
828            }
829        }
830
831        assert_eq!(batch_sizes, vec![3, 3, 3, 1]);
832        assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]);
833    }
834
835    #[tokio::test]
836    async fn test_absent_resumes_same_input_timestamp_after_batch_flush() {
837        let schema = Arc::new(Schema::new(vec![
838            Field::new(
839                "timestamp",
840                DataType::Timestamp(TimeUnit::Millisecond, None),
841                true,
842            ),
843            Field::new("value", DataType::Float64, true),
844        ]));
845
846        let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
847        let value_array = Arc::new(Float64Array::from(vec![1.0]));
848        let batch =
849            RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
850
851        let memory_exec = DataSourceExec::new(Arc::new(
852            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
853        ));
854
855        let output_schema = Arc::new(Schema::new(vec![
856            Field::new(
857                "timestamp",
858                DataType::Timestamp(TimeUnit::Millisecond, None),
859                true,
860            ),
861            Field::new("value", DataType::Float64, true),
862        ]));
863
864        let absent_exec = AbsentExec {
865            start: 0,
866            end: 9,
867            step: 1,
868            time_index_column: "timestamp".to_string(),
869            value_column: "value".to_string(),
870            fake_labels: vec![],
871            output_schema: output_schema.clone(),
872            input: Arc::new(memory_exec),
873            properties: Arc::new(PlanProperties::new(
874                EquivalenceProperties::new(output_schema.clone()),
875                Partitioning::UnknownPartitioning(1),
876                EmissionType::Incremental,
877                Boundedness::Bounded,
878            )),
879            metric: ExecutionPlanMetricsSet::new(),
880        };
881
882        let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
883        let task_ctx = session_ctx.task_ctx();
884        let mut stream = absent_exec.execute(0, task_ctx).unwrap();
885
886        let mut output_timestamps = Vec::new();
887        while let Some(batch_result) = stream.next().await {
888            let batch = batch_result.unwrap();
889            let ts_array = batch
890                .column(0)
891                .as_any()
892                .downcast_ref::<TimestampMillisecondArray>()
893                .unwrap();
894            for i in 0..ts_array.len() {
895                if !ts_array.is_null(i) {
896                    output_timestamps.push(ts_array.value(i));
897                }
898            }
899        }
900
901        assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
902    }
903}