1use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27 EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34 RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::{EmptyRelation, col};
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57 start: Millisecond,
58 end: Millisecond,
59 step: Millisecond,
60 time_index_column: String,
61 value_column: String,
62 fake_labels: Vec<(String, String)>,
63 input: LogicalPlan,
64 output_schema: DFSchemaRef,
65 unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70 pub time_index_column_idx: u64,
71 pub value_column_idx: u64,
72}
73
74impl PartialOrd for Absent {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 (
78 self.start,
79 self.end,
80 self.step,
81 &self.time_index_column,
82 &self.value_column,
83 &self.fake_labels,
84 )
85 .partial_cmp(&(
86 other.start,
87 other.end,
88 other.step,
89 &other.time_index_column,
90 &other.value_column,
91 &other.fake_labels,
92 ))
93 }
94}
95
96impl UserDefinedLogicalNodeCore for Absent {
97 fn name(&self) -> &str {
98 Self::name()
99 }
100
101 fn inputs(&self) -> Vec<&LogicalPlan> {
102 vec![&self.input]
103 }
104
105 fn schema(&self) -> &DFSchemaRef {
106 &self.output_schema
107 }
108
109 fn expressions(&self) -> Vec<Expr> {
110 if self.unfix.is_some() {
111 return vec![];
112 }
113
114 vec![col(&self.time_index_column)]
115 }
116
117 fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
118 if self.unfix.is_some() {
119 return None;
120 }
121
122 let input_schema = self.input.schema();
123 let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
124 Some(vec![vec![time_index_idx]])
125 }
126
127 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
128 write!(
129 f,
130 "PromAbsent: start={}, end={}, step={}",
131 self.start, self.end, self.step
132 )
133 }
134
135 fn with_exprs_and_inputs(
136 &self,
137 _exprs: Vec<Expr>,
138 inputs: Vec<LogicalPlan>,
139 ) -> DataFusionResult<Self> {
140 if inputs.is_empty() {
141 return Err(datafusion::error::DataFusionError::Internal(
142 "Absent must have at least one input".to_string(),
143 ));
144 }
145
146 let input: LogicalPlan = inputs[0].clone();
147 let input_schema = input.schema();
148
149 if let Some(unfix) = &self.unfix {
150 let time_index_column = resolve_column_name(
152 unfix.time_index_column_idx,
153 input_schema,
154 "Absent",
155 "time index",
156 )?;
157
158 let value_column =
159 resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
160
161 Self::try_new(
163 self.start,
164 self.end,
165 self.step,
166 time_index_column,
167 value_column,
168 self.fake_labels.clone(),
169 input,
170 )
171 } else {
172 Ok(Self {
173 start: self.start,
174 end: self.end,
175 step: self.step,
176 time_index_column: self.time_index_column.clone(),
177 value_column: self.value_column.clone(),
178 fake_labels: self.fake_labels.clone(),
179 input,
180 output_schema: self.output_schema.clone(),
181 unfix: None,
182 })
183 }
184 }
185}
186
187impl Absent {
188 pub fn try_new(
189 start: Millisecond,
190 end: Millisecond,
191 step: Millisecond,
192 time_index_column: String,
193 value_column: String,
194 fake_labels: Vec<(String, String)>,
195 input: LogicalPlan,
196 ) -> DataFusionResult<Self> {
197 let mut fields = vec![
198 Field::new(
199 &time_index_column,
200 DataType::Timestamp(TimeUnit::Millisecond, None),
201 true,
202 ),
203 Field::new(&value_column, DataType::Float64, true),
204 ];
205
206 let mut fake_labels = fake_labels
208 .into_iter()
209 .collect::<HashMap<String, String>>()
210 .into_iter()
211 .collect::<Vec<_>>();
212 fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
213 for (name, _) in fake_labels.iter() {
214 fields.push(Field::new(name, DataType::Utf8, true));
215 }
216
217 let output_schema = Arc::new(DFSchema::from_unqualified_fields(
218 fields.into(),
219 HashMap::new(),
220 )?);
221
222 Ok(Self {
223 start,
224 end,
225 step,
226 time_index_column,
227 value_column,
228 fake_labels,
229 input,
230 output_schema,
231 unfix: None,
232 })
233 }
234
235 pub const fn name() -> &'static str {
236 "prom_absent"
237 }
238
239 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
240 let output_schema = Arc::new(self.output_schema.as_arrow().clone());
241 let properties = PlanProperties::new(
242 EquivalenceProperties::new(output_schema.clone()),
243 Partitioning::UnknownPartitioning(1),
244 EmissionType::Incremental,
245 Boundedness::Bounded,
246 );
247 Arc::new(AbsentExec {
248 start: self.start,
249 end: self.end,
250 step: self.step,
251 time_index_column: self.time_index_column.clone(),
252 value_column: self.value_column.clone(),
253 fake_labels: self.fake_labels.clone(),
254 output_schema: output_schema.clone(),
255 input: exec_input,
256 properties,
257 metric: ExecutionPlanMetricsSet::new(),
258 })
259 }
260
261 pub fn serialize(&self) -> Vec<u8> {
262 let time_index_column_idx =
263 serialize_column_index(self.input.schema(), &self.time_index_column);
264
265 let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
266
267 pb::Absent {
268 start: self.start,
269 end: self.end,
270 step: self.step,
271 time_index_column_idx,
272 value_column_idx,
273 fake_labels: self
274 .fake_labels
275 .iter()
276 .map(|(name, value)| pb::LabelPair {
277 key: name.clone(),
278 value: value.clone(),
279 })
280 .collect(),
281 ..Default::default()
282 }
283 .encode_to_vec()
284 }
285
286 pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
287 let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
288 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
289 produce_one_row: false,
290 schema: Arc::new(DFSchema::empty()),
291 });
292
293 let unfix = UnfixIndices {
294 time_index_column_idx: pb_absent.time_index_column_idx,
295 value_column_idx: pb_absent.value_column_idx,
296 };
297
298 Ok(Self {
299 start: pb_absent.start,
300 end: pb_absent.end,
301 step: pb_absent.step,
302 time_index_column: String::new(),
303 value_column: String::new(),
304 fake_labels: pb_absent
305 .fake_labels
306 .iter()
307 .map(|label| (label.key.clone(), label.value.clone()))
308 .collect(),
309 input: placeholder_plan,
310 output_schema: Arc::new(DFSchema::empty()),
311 unfix: Some(unfix),
312 })
313 }
314}
315
316#[derive(Debug)]
317pub struct AbsentExec {
318 start: Millisecond,
319 end: Millisecond,
320 step: Millisecond,
321 time_index_column: String,
322 value_column: String,
323 fake_labels: Vec<(String, String)>,
324 output_schema: SchemaRef,
325 input: Arc<dyn ExecutionPlan>,
326 properties: PlanProperties,
327 metric: ExecutionPlanMetricsSet,
328}
329
330impl ExecutionPlan for AbsentExec {
331 fn as_any(&self) -> &dyn Any {
332 self
333 }
334
335 fn schema(&self) -> SchemaRef {
336 self.output_schema.clone()
337 }
338
339 fn properties(&self) -> &PlanProperties {
340 &self.properties
341 }
342
343 fn required_input_distribution(&self) -> Vec<Distribution> {
344 vec![Distribution::SinglePartition]
345 }
346
347 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
348 let requirement = LexRequirement::from([PhysicalSortRequirement {
349 expr: Arc::new(
350 ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
351 ),
352 options: Some(SortOptions {
353 descending: false,
354 nulls_first: false,
355 }),
356 }]);
357 vec![Some(OrderingRequirements::new(requirement))]
358 }
359
360 fn maintains_input_order(&self) -> Vec<bool> {
361 vec![false]
362 }
363
364 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
365 vec![&self.input]
366 }
367
368 fn with_new_children(
369 self: Arc<Self>,
370 children: Vec<Arc<dyn ExecutionPlan>>,
371 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
372 assert!(!children.is_empty());
373 Ok(Arc::new(Self {
374 start: self.start,
375 end: self.end,
376 step: self.step,
377 time_index_column: self.time_index_column.clone(),
378 value_column: self.value_column.clone(),
379 fake_labels: self.fake_labels.clone(),
380 output_schema: self.output_schema.clone(),
381 input: children[0].clone(),
382 properties: self.properties.clone(),
383 metric: self.metric.clone(),
384 }))
385 }
386
387 fn execute(
388 &self,
389 partition: usize,
390 context: Arc<TaskContext>,
391 ) -> DataFusionResult<SendableRecordBatchStream> {
392 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
393 let input = self.input.execute(partition, context)?;
394
395 Ok(Box::pin(AbsentStream {
396 end: self.end,
397 step: self.step,
398 time_index_column_index: self
399 .input
400 .schema()
401 .column_with_name(&self.time_index_column)
402 .unwrap() .0,
404 output_schema: self.output_schema.clone(),
405 fake_labels: self.fake_labels.clone(),
406 input,
407 metric: baseline_metric,
408 output_timestamps: Vec::new(),
410 output_ts_cursor: self.start,
412 input_finished: false,
413 }))
414 }
415
416 fn metrics(&self) -> Option<MetricsSet> {
417 Some(self.metric.clone_inner())
418 }
419
420 fn name(&self) -> &str {
421 "AbsentExec"
422 }
423}
424
425impl DisplayAs for AbsentExec {
426 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
427 match t {
428 DisplayFormatType::Default
429 | DisplayFormatType::Verbose
430 | DisplayFormatType::TreeRender => {
431 write!(
432 f,
433 "PromAbsentExec: start={}, end={}, step={}",
434 self.start, self.end, self.step
435 )
436 }
437 }
438 }
439}
440
441pub struct AbsentStream {
442 end: Millisecond,
443 step: Millisecond,
444 time_index_column_index: usize,
445 output_schema: SchemaRef,
446 fake_labels: Vec<(String, String)>,
447 input: SendableRecordBatchStream,
448 metric: BaselineMetrics,
449 output_timestamps: Vec<Millisecond>,
451 output_ts_cursor: Millisecond,
453 input_finished: bool,
454}
455
456impl RecordBatchStream for AbsentStream {
457 fn schema(&self) -> SchemaRef {
458 self.output_schema.clone()
459 }
460}
461
462impl Stream for AbsentStream {
463 type Item = DataFusionResult<RecordBatch>;
464
465 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
466 loop {
467 if !self.input_finished {
468 match ready!(self.input.poll_next_unpin(cx)) {
469 Some(Ok(batch)) => {
470 let timer = std::time::Instant::now();
471 if let Err(e) = self.process_input_batch(&batch) {
472 return Poll::Ready(Some(Err(e)));
473 }
474 self.metric.elapsed_compute().add_elapsed(timer);
475
476 if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
478 let timer = std::time::Instant::now();
479 let result = self.flush_output_batch();
480 self.metric.elapsed_compute().add_elapsed(timer);
481
482 match result {
483 Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
484 Ok(None) => continue,
485 Err(e) => return Poll::Ready(Some(Err(e))),
486 }
487 }
488 }
489 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
490 None => {
491 self.input_finished = true;
492
493 let timer = std::time::Instant::now();
494 if let Err(e) = self.process_remaining_absent_timestamps() {
496 return Poll::Ready(Some(Err(e)));
497 }
498 let result = self.flush_output_batch();
499 self.metric.elapsed_compute().add_elapsed(timer);
500 return Poll::Ready(result.transpose());
501 }
502 }
503 } else {
504 return Poll::Ready(None);
505 }
506 }
507 }
508}
509
510impl AbsentStream {
511 fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
512 let timestamp_array = batch.column(self.time_index_column_index);
514 let milli_ts_array = arrow::compute::cast(
515 timestamp_array,
516 &DataType::Timestamp(TimeUnit::Millisecond, None),
517 )?;
518 let timestamp_array = milli_ts_array
519 .as_any()
520 .downcast_ref::<TimestampMillisecondArray>()
521 .unwrap();
522
523 for &input_ts in timestamp_array.values() {
525 while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
527 self.output_timestamps.push(self.output_ts_cursor);
528 self.output_ts_cursor += self.step;
529 }
530
531 if self.output_ts_cursor == input_ts {
533 self.output_ts_cursor += self.step;
534 }
535 }
536
537 Ok(())
538 }
539
540 fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
541 while self.output_ts_cursor <= self.end {
543 self.output_timestamps.push(self.output_ts_cursor);
544 self.output_ts_cursor += self.step;
545 }
546 Ok(())
547 }
548
549 fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
550 if self.output_timestamps.is_empty() {
551 return Ok(None);
552 }
553
554 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
555 let num_rows = self.output_timestamps.len();
556 columns.push(Arc::new(TimestampMillisecondArray::from(
557 self.output_timestamps.clone(),
558 )) as _);
559 columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
560
561 for (_, value) in self.fake_labels.iter() {
562 columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
563 Some(value.clone()),
564 num_rows,
565 ))) as _);
566 }
567
568 let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
569
570 self.output_timestamps.clear();
571 Ok(Some(batch))
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use std::sync::Arc;
578
579 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
580 use datafusion::arrow::record_batch::RecordBatch;
581 use datafusion::catalog::memory::DataSourceExec;
582 use datafusion::datasource::memory::MemorySourceConfig;
583 use datafusion::prelude::SessionContext;
584 use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
585
586 use super::*;
587
588 #[tokio::test]
589 async fn test_absent_basic() {
590 let schema = Arc::new(Schema::new(vec![
591 Field::new(
592 "timestamp",
593 DataType::Timestamp(TimeUnit::Millisecond, None),
594 true,
595 ),
596 Field::new("value", DataType::Float64, true),
597 ]));
598
599 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
601 let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
602 let batch =
603 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
604
605 let memory_exec = DataSourceExec::new(Arc::new(
606 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
607 ));
608
609 let output_schema = Arc::new(Schema::new(vec![
610 Field::new(
611 "timestamp",
612 DataType::Timestamp(TimeUnit::Millisecond, None),
613 true,
614 ),
615 Field::new("value", DataType::Float64, true),
616 ]));
617
618 let absent_exec = AbsentExec {
619 start: 0,
620 end: 5000,
621 step: 1000,
622 time_index_column: "timestamp".to_string(),
623 value_column: "value".to_string(),
624 fake_labels: vec![],
625 output_schema: output_schema.clone(),
626 input: Arc::new(memory_exec),
627 properties: PlanProperties::new(
628 EquivalenceProperties::new(output_schema.clone()),
629 Partitioning::UnknownPartitioning(1),
630 EmissionType::Incremental,
631 Boundedness::Bounded,
632 ),
633 metric: ExecutionPlanMetricsSet::new(),
634 };
635
636 let session_ctx = SessionContext::new();
637 let task_ctx = session_ctx.task_ctx();
638 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
639
640 let mut output_timestamps = Vec::new();
642 while let Some(batch_result) = stream.next().await {
643 let batch = batch_result.unwrap();
644 let ts_array = batch
645 .column(0)
646 .as_any()
647 .downcast_ref::<TimestampMillisecondArray>()
648 .unwrap();
649 for i in 0..ts_array.len() {
650 if !ts_array.is_null(i) {
651 let ts = ts_array.value(i);
652 output_timestamps.push(ts);
653 }
654 }
655 }
656
657 assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
660 }
661
662 #[tokio::test]
663 async fn test_absent_empty_input() {
664 let schema = Arc::new(Schema::new(vec![
665 Field::new(
666 "timestamp",
667 DataType::Timestamp(TimeUnit::Millisecond, None),
668 true,
669 ),
670 Field::new("value", DataType::Float64, true),
671 ]));
672
673 let memory_exec = DataSourceExec::new(Arc::new(
675 MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
676 ));
677
678 let output_schema = Arc::new(Schema::new(vec![
679 Field::new(
680 "timestamp",
681 DataType::Timestamp(TimeUnit::Millisecond, None),
682 true,
683 ),
684 Field::new("value", DataType::Float64, true),
685 ]));
686 let absent_exec = AbsentExec {
687 start: 0,
688 end: 2000,
689 step: 1000,
690 time_index_column: "timestamp".to_string(),
691 value_column: "value".to_string(),
692 fake_labels: vec![],
693 output_schema: output_schema.clone(),
694 input: Arc::new(memory_exec),
695 properties: PlanProperties::new(
696 EquivalenceProperties::new(output_schema.clone()),
697 Partitioning::UnknownPartitioning(1),
698 EmissionType::Incremental,
699 Boundedness::Bounded,
700 ),
701 metric: ExecutionPlanMetricsSet::new(),
702 };
703
704 let session_ctx = SessionContext::new();
705 let task_ctx = session_ctx.task_ctx();
706 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
707
708 let mut output_timestamps = Vec::new();
710 while let Some(batch_result) = stream.next().await {
711 let batch = batch_result.unwrap();
712 let ts_array = batch
713 .column(0)
714 .as_any()
715 .downcast_ref::<TimestampMillisecondArray>()
716 .unwrap();
717 for i in 0..ts_array.len() {
718 if !ts_array.is_null(i) {
719 let ts = ts_array.value(i);
720 output_timestamps.push(ts);
721 }
722 }
723 }
724
725 assert_eq!(output_timestamps, vec![0, 1000, 2000]);
727 }
728}