1use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27 EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34 RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::{EmptyRelation, col};
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52#[derive(Debug, PartialEq, Eq, Hash)]
53pub struct Absent {
54 start: Millisecond,
55 end: Millisecond,
56 step: Millisecond,
57 time_index_column: String,
58 value_column: String,
59 fake_labels: Vec<(String, String)>,
60 input: LogicalPlan,
61 output_schema: DFSchemaRef,
62 unfix: Option<UnfixIndices>,
63}
64
65#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
66struct UnfixIndices {
67 pub time_index_column_idx: u64,
68 pub value_column_idx: u64,
69}
70
71impl PartialOrd for Absent {
72 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73 (
75 self.start,
76 self.end,
77 self.step,
78 &self.time_index_column,
79 &self.value_column,
80 &self.fake_labels,
81 )
82 .partial_cmp(&(
83 other.start,
84 other.end,
85 other.step,
86 &other.time_index_column,
87 &other.value_column,
88 &other.fake_labels,
89 ))
90 }
91}
92
93impl UserDefinedLogicalNodeCore for Absent {
94 fn name(&self) -> &str {
95 Self::name()
96 }
97
98 fn inputs(&self) -> Vec<&LogicalPlan> {
99 vec![&self.input]
100 }
101
102 fn schema(&self) -> &DFSchemaRef {
103 &self.output_schema
104 }
105
106 fn expressions(&self) -> Vec<Expr> {
107 if self.unfix.is_some() {
108 return vec![];
109 }
110
111 vec![col(&self.time_index_column)]
112 }
113
114 fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
115 if self.unfix.is_some() {
116 return None;
117 }
118
119 let input_schema = self.input.schema();
120 let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
121 Some(vec![vec![time_index_idx]])
122 }
123
124 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125 write!(
126 f,
127 "PromAbsent: start={}, end={}, step={}",
128 self.start, self.end, self.step
129 )
130 }
131
132 fn with_exprs_and_inputs(
133 &self,
134 _exprs: Vec<Expr>,
135 inputs: Vec<LogicalPlan>,
136 ) -> DataFusionResult<Self> {
137 if inputs.is_empty() {
138 return Err(datafusion::error::DataFusionError::Internal(
139 "Absent must have at least one input".to_string(),
140 ));
141 }
142
143 let input: LogicalPlan = inputs[0].clone();
144 let input_schema = input.schema();
145
146 if let Some(unfix) = &self.unfix {
147 let time_index_column = resolve_column_name(
149 unfix.time_index_column_idx,
150 input_schema,
151 "Absent",
152 "time index",
153 )?;
154
155 let value_column =
156 resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
157
158 Self::try_new(
160 self.start,
161 self.end,
162 self.step,
163 time_index_column,
164 value_column,
165 self.fake_labels.clone(),
166 input,
167 )
168 } else {
169 Ok(Self {
170 start: self.start,
171 end: self.end,
172 step: self.step,
173 time_index_column: self.time_index_column.clone(),
174 value_column: self.value_column.clone(),
175 fake_labels: self.fake_labels.clone(),
176 input,
177 output_schema: self.output_schema.clone(),
178 unfix: None,
179 })
180 }
181 }
182}
183
184impl Absent {
185 pub fn try_new(
186 start: Millisecond,
187 end: Millisecond,
188 step: Millisecond,
189 time_index_column: String,
190 value_column: String,
191 fake_labels: Vec<(String, String)>,
192 input: LogicalPlan,
193 ) -> DataFusionResult<Self> {
194 let mut fields = vec![
195 Field::new(
196 &time_index_column,
197 DataType::Timestamp(TimeUnit::Millisecond, None),
198 true,
199 ),
200 Field::new(&value_column, DataType::Float64, true),
201 ];
202
203 let mut fake_labels = fake_labels
205 .into_iter()
206 .collect::<HashMap<String, String>>()
207 .into_iter()
208 .collect::<Vec<_>>();
209 fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
210 for (name, _) in fake_labels.iter() {
211 fields.push(Field::new(name, DataType::Utf8, true));
212 }
213
214 let output_schema = Arc::new(DFSchema::from_unqualified_fields(
215 fields.into(),
216 HashMap::new(),
217 )?);
218
219 Ok(Self {
220 start,
221 end,
222 step,
223 time_index_column,
224 value_column,
225 fake_labels,
226 input,
227 output_schema,
228 unfix: None,
229 })
230 }
231
232 pub const fn name() -> &'static str {
233 "prom_absent"
234 }
235
236 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
237 let output_schema = Arc::new(self.output_schema.as_arrow().clone());
238 let properties = Arc::new(PlanProperties::new(
239 EquivalenceProperties::new(output_schema.clone()),
240 Partitioning::UnknownPartitioning(1),
241 EmissionType::Incremental,
242 Boundedness::Bounded,
243 ));
244 Arc::new(AbsentExec {
245 start: self.start,
246 end: self.end,
247 step: self.step,
248 time_index_column: self.time_index_column.clone(),
249 value_column: self.value_column.clone(),
250 fake_labels: self.fake_labels.clone(),
251 output_schema: output_schema.clone(),
252 input: exec_input,
253 properties,
254 metric: ExecutionPlanMetricsSet::new(),
255 })
256 }
257
258 pub fn serialize(&self) -> Vec<u8> {
259 let time_index_column_idx =
260 serialize_column_index(self.input.schema(), &self.time_index_column);
261
262 let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
263
264 pb::Absent {
265 start: self.start,
266 end: self.end,
267 step: self.step,
268 time_index_column_idx,
269 value_column_idx,
270 fake_labels: self
271 .fake_labels
272 .iter()
273 .map(|(name, value)| pb::LabelPair {
274 key: name.clone(),
275 value: value.clone(),
276 })
277 .collect(),
278 ..Default::default()
279 }
280 .encode_to_vec()
281 }
282
283 pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
284 let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
285 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
286 produce_one_row: false,
287 schema: Arc::new(DFSchema::empty()),
288 });
289
290 let unfix = UnfixIndices {
291 time_index_column_idx: pb_absent.time_index_column_idx,
292 value_column_idx: pb_absent.value_column_idx,
293 };
294
295 Ok(Self {
296 start: pb_absent.start,
297 end: pb_absent.end,
298 step: pb_absent.step,
299 time_index_column: String::new(),
300 value_column: String::new(),
301 fake_labels: pb_absent
302 .fake_labels
303 .iter()
304 .map(|label| (label.key.clone(), label.value.clone()))
305 .collect(),
306 input: placeholder_plan,
307 output_schema: Arc::new(DFSchema::empty()),
308 unfix: Some(unfix),
309 })
310 }
311}
312
313#[derive(Debug)]
314pub struct AbsentExec {
315 start: Millisecond,
316 end: Millisecond,
317 step: Millisecond,
318 time_index_column: String,
319 value_column: String,
320 fake_labels: Vec<(String, String)>,
321 output_schema: SchemaRef,
322 input: Arc<dyn ExecutionPlan>,
323 properties: Arc<PlanProperties>,
324 metric: ExecutionPlanMetricsSet,
325}
326
327impl ExecutionPlan for AbsentExec {
328 fn as_any(&self) -> &dyn Any {
329 self
330 }
331
332 fn schema(&self) -> SchemaRef {
333 self.output_schema.clone()
334 }
335
336 fn properties(&self) -> &Arc<PlanProperties> {
337 &self.properties
338 }
339
340 fn required_input_distribution(&self) -> Vec<Distribution> {
341 vec![Distribution::SinglePartition]
342 }
343
344 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
345 let requirement = LexRequirement::from([PhysicalSortRequirement {
346 expr: Arc::new(
347 ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
348 ),
349 options: Some(SortOptions {
350 descending: false,
351 nulls_first: false,
352 }),
353 }]);
354 vec![Some(OrderingRequirements::new(requirement))]
355 }
356
357 fn maintains_input_order(&self) -> Vec<bool> {
358 vec![false]
359 }
360
361 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
362 vec![&self.input]
363 }
364
365 fn with_new_children(
366 self: Arc<Self>,
367 children: Vec<Arc<dyn ExecutionPlan>>,
368 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
369 assert!(!children.is_empty());
370 Ok(Arc::new(Self {
371 start: self.start,
372 end: self.end,
373 step: self.step,
374 time_index_column: self.time_index_column.clone(),
375 value_column: self.value_column.clone(),
376 fake_labels: self.fake_labels.clone(),
377 output_schema: self.output_schema.clone(),
378 input: children[0].clone(),
379 properties: self.properties.clone(),
380 metric: self.metric.clone(),
381 }))
382 }
383
384 fn execute(
385 &self,
386 partition: usize,
387 context: Arc<TaskContext>,
388 ) -> DataFusionResult<SendableRecordBatchStream> {
389 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
390 let batch_size = context.session_config().batch_size();
391 let input = self.input.execute(partition, context)?;
392
393 Ok(Box::pin(AbsentStream {
394 end: self.end,
395 step: self.step,
396 batch_size,
397 time_index_column_index: self
398 .input
399 .schema()
400 .column_with_name(&self.time_index_column)
401 .unwrap() .0,
403 output_schema: self.output_schema.clone(),
404 fake_labels: self.fake_labels.clone(),
405 input,
406 metric: baseline_metric,
407 output_timestamps: Vec::new(),
409 input_timestamps: Vec::new(),
410 input_timestamp_offset: 0,
411 output_ts_cursor: self.start,
413 input_finished: false,
414 }))
415 }
416
417 fn metrics(&self) -> Option<MetricsSet> {
418 Some(self.metric.clone_inner())
419 }
420
421 fn name(&self) -> &str {
422 "AbsentExec"
423 }
424}
425
426impl DisplayAs for AbsentExec {
427 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
428 match t {
429 DisplayFormatType::Default
430 | DisplayFormatType::Verbose
431 | DisplayFormatType::TreeRender => {
432 write!(
433 f,
434 "PromAbsentExec: start={}, end={}, step={}",
435 self.start, self.end, self.step
436 )
437 }
438 }
439 }
440}
441
442pub struct AbsentStream {
443 end: Millisecond,
444 step: Millisecond,
445 batch_size: usize,
446 time_index_column_index: usize,
447 output_schema: SchemaRef,
448 fake_labels: Vec<(String, String)>,
449 input: SendableRecordBatchStream,
450 metric: BaselineMetrics,
451 output_timestamps: Vec<Millisecond>,
453 input_timestamps: Vec<Millisecond>,
455 input_timestamp_offset: usize,
456 output_ts_cursor: Millisecond,
458 input_finished: bool,
459}
460
461impl RecordBatchStream for AbsentStream {
462 fn schema(&self) -> SchemaRef {
463 self.output_schema.clone()
464 }
465}
466
467impl Stream for AbsentStream {
468 type Item = DataFusionResult<RecordBatch>;
469
470 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
471 loop {
472 if self.has_pending_input_timestamps() {
473 let timer = std::time::Instant::now();
474 if let Err(e) = self.process_input_batch() {
475 return Poll::Ready(Some(Err(e)));
476 }
477 self.metric.elapsed_compute().add_elapsed(timer);
478
479 match self.flush_output_batch() {
480 Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
481 Ok(None) => continue,
482 Err(e) => return Poll::Ready(Some(Err(e))),
483 }
484 }
485
486 if self.input_finished {
487 let timer = std::time::Instant::now();
488 if let Err(e) = self.process_remaining_absent_timestamps() {
489 return Poll::Ready(Some(Err(e)));
490 }
491 self.metric.elapsed_compute().add_elapsed(timer);
492
493 match self.flush_output_batch() {
494 Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
495 Ok(None) => return Poll::Ready(None),
496 Err(e) => return Poll::Ready(Some(Err(e))),
497 }
498 }
499
500 match ready!(self.input.poll_next_unpin(cx)) {
501 Some(Ok(batch)) => {
502 let timer = std::time::Instant::now();
503 if let Err(e) = self.buffer_input_timestamps(&batch) {
504 return Poll::Ready(Some(Err(e)));
505 }
506 self.metric.elapsed_compute().add_elapsed(timer);
507 }
508 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
509 None => {
510 self.input_finished = true;
511 }
512 }
513 }
514 }
515}
516
517impl AbsentStream {
518 fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
519 let timestamp_array = batch.column(self.time_index_column_index);
520 let milli_ts_array = arrow::compute::cast(
521 timestamp_array,
522 &DataType::Timestamp(TimeUnit::Millisecond, None),
523 )?;
524 let timestamp_array = milli_ts_array
525 .as_any()
526 .downcast_ref::<TimestampMillisecondArray>()
527 .unwrap();
528 self.input_timestamps.clear();
529 self.input_timestamps
530 .extend_from_slice(timestamp_array.values());
531 self.input_timestamp_offset = 0;
532 Ok(())
533 }
534
535 fn has_pending_input_timestamps(&self) -> bool {
536 self.input_timestamp_offset < self.input_timestamps.len()
537 }
538
539 fn process_input_batch(&mut self) -> DataFusionResult<()> {
540 while self.input_timestamp_offset < self.input_timestamps.len() {
541 let input_ts = self.input_timestamps[self.input_timestamp_offset];
542
543 while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
545 self.output_timestamps.push(self.output_ts_cursor);
546 self.output_ts_cursor += self.step;
547
548 if self.output_timestamps.len() >= self.batch_size {
549 return Ok(());
550 }
551 }
552
553 if self.output_ts_cursor == input_ts {
555 self.output_ts_cursor += self.step;
556 }
557
558 self.input_timestamp_offset += 1;
559 }
560
561 self.input_timestamps.clear();
562 self.input_timestamp_offset = 0;
563 Ok(())
564 }
565
566 fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
567 while self.output_ts_cursor <= self.end {
568 self.output_timestamps.push(self.output_ts_cursor);
569 self.output_ts_cursor += self.step;
570
571 if self.output_timestamps.len() >= self.batch_size {
572 return Ok(());
573 }
574 }
575 Ok(())
576 }
577
578 fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
579 if self.output_timestamps.is_empty() {
580 return Ok(None);
581 }
582
583 let timestamps = if self.output_timestamps.len() <= self.batch_size {
584 std::mem::take(&mut self.output_timestamps)
585 } else {
586 let remaining = self.output_timestamps.split_off(self.batch_size);
587 std::mem::replace(&mut self.output_timestamps, remaining)
588 };
589
590 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
591 let num_rows = timestamps.len();
592 columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _);
593 columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
594
595 for (_, value) in self.fake_labels.iter() {
596 columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
597 Some(value.clone()),
598 num_rows,
599 ))) as _);
600 }
601
602 let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
603
604 Ok(Some(batch))
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use std::sync::Arc;
611
612 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
613 use datafusion::arrow::record_batch::RecordBatch;
614 use datafusion::catalog::memory::DataSourceExec;
615 use datafusion::datasource::memory::MemorySourceConfig;
616 use datafusion::prelude::{SessionConfig, SessionContext};
617 use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
618
619 use super::*;
620
621 #[tokio::test]
622 async fn test_absent_basic() {
623 let schema = Arc::new(Schema::new(vec![
624 Field::new(
625 "timestamp",
626 DataType::Timestamp(TimeUnit::Millisecond, None),
627 true,
628 ),
629 Field::new("value", DataType::Float64, true),
630 ]));
631
632 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
634 let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
635 let batch =
636 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
637
638 let memory_exec = DataSourceExec::new(Arc::new(
639 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
640 ));
641
642 let output_schema = Arc::new(Schema::new(vec![
643 Field::new(
644 "timestamp",
645 DataType::Timestamp(TimeUnit::Millisecond, None),
646 true,
647 ),
648 Field::new("value", DataType::Float64, true),
649 ]));
650
651 let absent_exec = AbsentExec {
652 start: 0,
653 end: 5000,
654 step: 1000,
655 time_index_column: "timestamp".to_string(),
656 value_column: "value".to_string(),
657 fake_labels: vec![],
658 output_schema: output_schema.clone(),
659 input: Arc::new(memory_exec),
660 properties: Arc::new(PlanProperties::new(
661 EquivalenceProperties::new(output_schema.clone()),
662 Partitioning::UnknownPartitioning(1),
663 EmissionType::Incremental,
664 Boundedness::Bounded,
665 )),
666 metric: ExecutionPlanMetricsSet::new(),
667 };
668
669 let session_ctx = SessionContext::new();
670 let task_ctx = session_ctx.task_ctx();
671 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
672
673 let mut output_timestamps = Vec::new();
675 while let Some(batch_result) = stream.next().await {
676 let batch = batch_result.unwrap();
677 let ts_array = batch
678 .column(0)
679 .as_any()
680 .downcast_ref::<TimestampMillisecondArray>()
681 .unwrap();
682 for i in 0..ts_array.len() {
683 if !ts_array.is_null(i) {
684 let ts = ts_array.value(i);
685 output_timestamps.push(ts);
686 }
687 }
688 }
689
690 assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
693 }
694
695 #[tokio::test]
696 async fn test_absent_empty_input() {
697 let schema = Arc::new(Schema::new(vec![
698 Field::new(
699 "timestamp",
700 DataType::Timestamp(TimeUnit::Millisecond, None),
701 true,
702 ),
703 Field::new("value", DataType::Float64, true),
704 ]));
705
706 let memory_exec = DataSourceExec::new(Arc::new(
708 MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
709 ));
710
711 let output_schema = Arc::new(Schema::new(vec![
712 Field::new(
713 "timestamp",
714 DataType::Timestamp(TimeUnit::Millisecond, None),
715 true,
716 ),
717 Field::new("value", DataType::Float64, true),
718 ]));
719 let absent_exec = AbsentExec {
720 start: 0,
721 end: 2000,
722 step: 1000,
723 time_index_column: "timestamp".to_string(),
724 value_column: "value".to_string(),
725 fake_labels: vec![],
726 output_schema: output_schema.clone(),
727 input: Arc::new(memory_exec),
728 properties: Arc::new(PlanProperties::new(
729 EquivalenceProperties::new(output_schema.clone()),
730 Partitioning::UnknownPartitioning(1),
731 EmissionType::Incremental,
732 Boundedness::Bounded,
733 )),
734 metric: ExecutionPlanMetricsSet::new(),
735 };
736
737 let session_ctx = SessionContext::new();
738 let task_ctx = session_ctx.task_ctx();
739 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
740
741 let mut output_timestamps = Vec::new();
743 while let Some(batch_result) = stream.next().await {
744 let batch = batch_result.unwrap();
745 let ts_array = batch
746 .column(0)
747 .as_any()
748 .downcast_ref::<TimestampMillisecondArray>()
749 .unwrap();
750 for i in 0..ts_array.len() {
751 if !ts_array.is_null(i) {
752 let ts = ts_array.value(i);
753 output_timestamps.push(ts);
754 }
755 }
756 }
757
758 assert_eq!(output_timestamps, vec![0, 1000, 2000]);
760 }
761
762 #[tokio::test]
763 async fn test_absent_respects_session_batch_size_for_large_gap() {
764 let schema = Arc::new(Schema::new(vec![
765 Field::new(
766 "timestamp",
767 DataType::Timestamp(TimeUnit::Millisecond, None),
768 true,
769 ),
770 Field::new("value", DataType::Float64, true),
771 ]));
772
773 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
774 let value_array = Arc::new(Float64Array::from(vec![1.0]));
775 let batch =
776 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
777
778 let memory_exec = DataSourceExec::new(Arc::new(
779 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
780 ));
781
782 let output_schema = Arc::new(Schema::new(vec![
783 Field::new(
784 "timestamp",
785 DataType::Timestamp(TimeUnit::Millisecond, None),
786 true,
787 ),
788 Field::new("value", DataType::Float64, true),
789 ]));
790
791 let absent_exec = AbsentExec {
792 start: 0,
793 end: 10,
794 step: 1,
795 time_index_column: "timestamp".to_string(),
796 value_column: "value".to_string(),
797 fake_labels: vec![],
798 output_schema: output_schema.clone(),
799 input: Arc::new(memory_exec),
800 properties: Arc::new(PlanProperties::new(
801 EquivalenceProperties::new(output_schema.clone()),
802 Partitioning::UnknownPartitioning(1),
803 EmissionType::Incremental,
804 Boundedness::Bounded,
805 )),
806 metric: ExecutionPlanMetricsSet::new(),
807 };
808
809 let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
810 let task_ctx = session_ctx.task_ctx();
811 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
812
813 let mut batch_sizes = Vec::new();
814 let mut output_timestamps = Vec::new();
815 while let Some(batch_result) = stream.next().await {
816 let batch = batch_result.unwrap();
817 batch_sizes.push(batch.num_rows());
818
819 let ts_array = batch
820 .column(0)
821 .as_any()
822 .downcast_ref::<TimestampMillisecondArray>()
823 .unwrap();
824 for i in 0..ts_array.len() {
825 if !ts_array.is_null(i) {
826 output_timestamps.push(ts_array.value(i));
827 }
828 }
829 }
830
831 assert_eq!(batch_sizes, vec![3, 3, 3, 1]);
832 assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]);
833 }
834
835 #[tokio::test]
836 async fn test_absent_resumes_same_input_timestamp_after_batch_flush() {
837 let schema = Arc::new(Schema::new(vec![
838 Field::new(
839 "timestamp",
840 DataType::Timestamp(TimeUnit::Millisecond, None),
841 true,
842 ),
843 Field::new("value", DataType::Float64, true),
844 ]));
845
846 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
847 let value_array = Arc::new(Float64Array::from(vec![1.0]));
848 let batch =
849 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
850
851 let memory_exec = DataSourceExec::new(Arc::new(
852 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
853 ));
854
855 let output_schema = Arc::new(Schema::new(vec![
856 Field::new(
857 "timestamp",
858 DataType::Timestamp(TimeUnit::Millisecond, None),
859 true,
860 ),
861 Field::new("value", DataType::Float64, true),
862 ]));
863
864 let absent_exec = AbsentExec {
865 start: 0,
866 end: 9,
867 step: 1,
868 time_index_column: "timestamp".to_string(),
869 value_column: "value".to_string(),
870 fake_labels: vec![],
871 output_schema: output_schema.clone(),
872 input: Arc::new(memory_exec),
873 properties: Arc::new(PlanProperties::new(
874 EquivalenceProperties::new(output_schema.clone()),
875 Partitioning::UnknownPartitioning(1),
876 EmissionType::Incremental,
877 Boundedness::Bounded,
878 )),
879 metric: ExecutionPlanMetricsSet::new(),
880 };
881
882 let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
883 let task_ctx = session_ctx.task_ctx();
884 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
885
886 let mut output_timestamps = Vec::new();
887 while let Some(batch_result) = stream.next().await {
888 let batch = batch_result.unwrap();
889 let ts_array = batch
890 .column(0)
891 .as_any()
892 .downcast_ref::<TimestampMillisecondArray>()
893 .unwrap();
894 for i in 0..ts_array.len() {
895 if !ts_array.is_null(i) {
896 output_timestamps.push(ts_array.value(i));
897 }
898 }
899 }
900
901 assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
902 }
903}