1use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27 EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34 RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::EmptyRelation;
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{Stream, StreamExt, ready};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
51
52const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57 start: Millisecond,
58 end: Millisecond,
59 step: Millisecond,
60 time_index_column: String,
61 value_column: String,
62 fake_labels: Vec<(String, String)>,
63 input: LogicalPlan,
64 output_schema: DFSchemaRef,
65 unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70 pub time_index_column_idx: u64,
71 pub value_column_idx: u64,
72}
73
74impl PartialOrd for Absent {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 (
78 self.start,
79 self.end,
80 self.step,
81 &self.time_index_column,
82 &self.value_column,
83 &self.fake_labels,
84 )
85 .partial_cmp(&(
86 other.start,
87 other.end,
88 other.step,
89 &other.time_index_column,
90 &other.value_column,
91 &other.fake_labels,
92 ))
93 }
94}
95
96impl UserDefinedLogicalNodeCore for Absent {
97 fn name(&self) -> &str {
98 Self::name()
99 }
100
101 fn inputs(&self) -> Vec<&LogicalPlan> {
102 vec![&self.input]
103 }
104
105 fn schema(&self) -> &DFSchemaRef {
106 &self.output_schema
107 }
108
109 fn expressions(&self) -> Vec<Expr> {
110 vec![]
111 }
112
113 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114 write!(
115 f,
116 "PromAbsent: start={}, end={}, step={}",
117 self.start, self.end, self.step
118 )
119 }
120
121 fn with_exprs_and_inputs(
122 &self,
123 _exprs: Vec<Expr>,
124 inputs: Vec<LogicalPlan>,
125 ) -> DataFusionResult<Self> {
126 if inputs.is_empty() {
127 return Err(datafusion::error::DataFusionError::Internal(
128 "Absent must have at least one input".to_string(),
129 ));
130 }
131
132 let input: LogicalPlan = inputs[0].clone();
133 let input_schema = input.schema();
134
135 if let Some(unfix) = &self.unfix {
136 let time_index_column = resolve_column_name(
138 unfix.time_index_column_idx,
139 input_schema,
140 "Absent",
141 "time index",
142 )?;
143
144 let value_column =
145 resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
146
147 Self::try_new(
149 self.start,
150 self.end,
151 self.step,
152 time_index_column,
153 value_column,
154 self.fake_labels.clone(),
155 input,
156 )
157 } else {
158 Ok(Self {
159 start: self.start,
160 end: self.end,
161 step: self.step,
162 time_index_column: self.time_index_column.clone(),
163 value_column: self.value_column.clone(),
164 fake_labels: self.fake_labels.clone(),
165 input,
166 output_schema: self.output_schema.clone(),
167 unfix: None,
168 })
169 }
170 }
171}
172
173impl Absent {
174 pub fn try_new(
175 start: Millisecond,
176 end: Millisecond,
177 step: Millisecond,
178 time_index_column: String,
179 value_column: String,
180 fake_labels: Vec<(String, String)>,
181 input: LogicalPlan,
182 ) -> DataFusionResult<Self> {
183 let mut fields = vec![
184 Field::new(
185 &time_index_column,
186 DataType::Timestamp(TimeUnit::Millisecond, None),
187 true,
188 ),
189 Field::new(&value_column, DataType::Float64, true),
190 ];
191
192 let mut fake_labels = fake_labels
194 .into_iter()
195 .collect::<HashMap<String, String>>()
196 .into_iter()
197 .collect::<Vec<_>>();
198 fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
199 for (name, _) in fake_labels.iter() {
200 fields.push(Field::new(name, DataType::Utf8, true));
201 }
202
203 let output_schema = Arc::new(DFSchema::from_unqualified_fields(
204 fields.into(),
205 HashMap::new(),
206 )?);
207
208 Ok(Self {
209 start,
210 end,
211 step,
212 time_index_column,
213 value_column,
214 fake_labels,
215 input,
216 output_schema,
217 unfix: None,
218 })
219 }
220
221 pub const fn name() -> &'static str {
222 "prom_absent"
223 }
224
225 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
226 let output_schema = Arc::new(self.output_schema.as_arrow().clone());
227 let properties = PlanProperties::new(
228 EquivalenceProperties::new(output_schema.clone()),
229 Partitioning::UnknownPartitioning(1),
230 EmissionType::Incremental,
231 Boundedness::Bounded,
232 );
233 Arc::new(AbsentExec {
234 start: self.start,
235 end: self.end,
236 step: self.step,
237 time_index_column: self.time_index_column.clone(),
238 value_column: self.value_column.clone(),
239 fake_labels: self.fake_labels.clone(),
240 output_schema: output_schema.clone(),
241 input: exec_input,
242 properties,
243 metric: ExecutionPlanMetricsSet::new(),
244 })
245 }
246
247 pub fn serialize(&self) -> Vec<u8> {
248 let time_index_column_idx =
249 serialize_column_index(self.input.schema(), &self.time_index_column);
250
251 let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
252
253 pb::Absent {
254 start: self.start,
255 end: self.end,
256 step: self.step,
257 time_index_column_idx,
258 value_column_idx,
259 fake_labels: self
260 .fake_labels
261 .iter()
262 .map(|(name, value)| pb::LabelPair {
263 key: name.clone(),
264 value: value.clone(),
265 })
266 .collect(),
267 ..Default::default()
268 }
269 .encode_to_vec()
270 }
271
272 pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
273 let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
274 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
275 produce_one_row: false,
276 schema: Arc::new(DFSchema::empty()),
277 });
278
279 let unfix = UnfixIndices {
280 time_index_column_idx: pb_absent.time_index_column_idx,
281 value_column_idx: pb_absent.value_column_idx,
282 };
283
284 Ok(Self {
285 start: pb_absent.start,
286 end: pb_absent.end,
287 step: pb_absent.step,
288 time_index_column: String::new(),
289 value_column: String::new(),
290 fake_labels: pb_absent
291 .fake_labels
292 .iter()
293 .map(|label| (label.key.clone(), label.value.clone()))
294 .collect(),
295 input: placeholder_plan,
296 output_schema: Arc::new(DFSchema::empty()),
297 unfix: Some(unfix),
298 })
299 }
300}
301
302#[derive(Debug)]
303pub struct AbsentExec {
304 start: Millisecond,
305 end: Millisecond,
306 step: Millisecond,
307 time_index_column: String,
308 value_column: String,
309 fake_labels: Vec<(String, String)>,
310 output_schema: SchemaRef,
311 input: Arc<dyn ExecutionPlan>,
312 properties: PlanProperties,
313 metric: ExecutionPlanMetricsSet,
314}
315
316impl ExecutionPlan for AbsentExec {
317 fn as_any(&self) -> &dyn Any {
318 self
319 }
320
321 fn schema(&self) -> SchemaRef {
322 self.output_schema.clone()
323 }
324
325 fn properties(&self) -> &PlanProperties {
326 &self.properties
327 }
328
329 fn required_input_distribution(&self) -> Vec<Distribution> {
330 vec![Distribution::SinglePartition]
331 }
332
333 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
334 let requirement = LexRequirement::from([PhysicalSortRequirement {
335 expr: Arc::new(
336 ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
337 ),
338 options: Some(SortOptions {
339 descending: false,
340 nulls_first: false,
341 }),
342 }]);
343 vec![Some(OrderingRequirements::new(requirement))]
344 }
345
346 fn maintains_input_order(&self) -> Vec<bool> {
347 vec![false]
348 }
349
350 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
351 vec![&self.input]
352 }
353
354 fn with_new_children(
355 self: Arc<Self>,
356 children: Vec<Arc<dyn ExecutionPlan>>,
357 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
358 assert!(!children.is_empty());
359 Ok(Arc::new(Self {
360 start: self.start,
361 end: self.end,
362 step: self.step,
363 time_index_column: self.time_index_column.clone(),
364 value_column: self.value_column.clone(),
365 fake_labels: self.fake_labels.clone(),
366 output_schema: self.output_schema.clone(),
367 input: children[0].clone(),
368 properties: self.properties.clone(),
369 metric: self.metric.clone(),
370 }))
371 }
372
373 fn execute(
374 &self,
375 partition: usize,
376 context: Arc<TaskContext>,
377 ) -> DataFusionResult<SendableRecordBatchStream> {
378 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
379 let input = self.input.execute(partition, context)?;
380
381 Ok(Box::pin(AbsentStream {
382 end: self.end,
383 step: self.step,
384 time_index_column_index: self
385 .input
386 .schema()
387 .column_with_name(&self.time_index_column)
388 .unwrap() .0,
390 output_schema: self.output_schema.clone(),
391 fake_labels: self.fake_labels.clone(),
392 input,
393 metric: baseline_metric,
394 output_timestamps: Vec::new(),
396 output_ts_cursor: self.start,
398 input_finished: false,
399 }))
400 }
401
402 fn metrics(&self) -> Option<MetricsSet> {
403 Some(self.metric.clone_inner())
404 }
405
406 fn name(&self) -> &str {
407 "AbsentExec"
408 }
409}
410
411impl DisplayAs for AbsentExec {
412 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
413 match t {
414 DisplayFormatType::Default
415 | DisplayFormatType::Verbose
416 | DisplayFormatType::TreeRender => {
417 write!(
418 f,
419 "PromAbsentExec: start={}, end={}, step={}",
420 self.start, self.end, self.step
421 )
422 }
423 }
424 }
425}
426
427pub struct AbsentStream {
428 end: Millisecond,
429 step: Millisecond,
430 time_index_column_index: usize,
431 output_schema: SchemaRef,
432 fake_labels: Vec<(String, String)>,
433 input: SendableRecordBatchStream,
434 metric: BaselineMetrics,
435 output_timestamps: Vec<Millisecond>,
437 output_ts_cursor: Millisecond,
439 input_finished: bool,
440}
441
442impl RecordBatchStream for AbsentStream {
443 fn schema(&self) -> SchemaRef {
444 self.output_schema.clone()
445 }
446}
447
448impl Stream for AbsentStream {
449 type Item = DataFusionResult<RecordBatch>;
450
451 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
452 loop {
453 if !self.input_finished {
454 match ready!(self.input.poll_next_unpin(cx)) {
455 Some(Ok(batch)) => {
456 let timer = std::time::Instant::now();
457 if let Err(e) = self.process_input_batch(&batch) {
458 return Poll::Ready(Some(Err(e)));
459 }
460 self.metric.elapsed_compute().add_elapsed(timer);
461
462 if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
464 let timer = std::time::Instant::now();
465 let result = self.flush_output_batch();
466 self.metric.elapsed_compute().add_elapsed(timer);
467
468 match result {
469 Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
470 Ok(None) => continue,
471 Err(e) => return Poll::Ready(Some(Err(e))),
472 }
473 }
474 }
475 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
476 None => {
477 self.input_finished = true;
478
479 let timer = std::time::Instant::now();
480 if let Err(e) = self.process_remaining_absent_timestamps() {
482 return Poll::Ready(Some(Err(e)));
483 }
484 let result = self.flush_output_batch();
485 self.metric.elapsed_compute().add_elapsed(timer);
486 return Poll::Ready(result.transpose());
487 }
488 }
489 } else {
490 return Poll::Ready(None);
491 }
492 }
493 }
494}
495
496impl AbsentStream {
497 fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
498 let timestamp_array = batch.column(self.time_index_column_index);
500 let milli_ts_array = arrow::compute::cast(
501 timestamp_array,
502 &DataType::Timestamp(TimeUnit::Millisecond, None),
503 )?;
504 let timestamp_array = milli_ts_array
505 .as_any()
506 .downcast_ref::<TimestampMillisecondArray>()
507 .unwrap();
508
509 for &input_ts in timestamp_array.values() {
511 while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
513 self.output_timestamps.push(self.output_ts_cursor);
514 self.output_ts_cursor += self.step;
515 }
516
517 if self.output_ts_cursor == input_ts {
519 self.output_ts_cursor += self.step;
520 }
521 }
522
523 Ok(())
524 }
525
526 fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
527 while self.output_ts_cursor <= self.end {
529 self.output_timestamps.push(self.output_ts_cursor);
530 self.output_ts_cursor += self.step;
531 }
532 Ok(())
533 }
534
535 fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
536 if self.output_timestamps.is_empty() {
537 return Ok(None);
538 }
539
540 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
541 let num_rows = self.output_timestamps.len();
542 columns.push(Arc::new(TimestampMillisecondArray::from(
543 self.output_timestamps.clone(),
544 )) as _);
545 columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
546
547 for (_, value) in self.fake_labels.iter() {
548 columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
549 Some(value.clone()),
550 num_rows,
551 ))) as _);
552 }
553
554 let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
555
556 self.output_timestamps.clear();
557 Ok(Some(batch))
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use std::sync::Arc;
564
565 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
566 use datafusion::arrow::record_batch::RecordBatch;
567 use datafusion::catalog::memory::DataSourceExec;
568 use datafusion::datasource::memory::MemorySourceConfig;
569 use datafusion::prelude::SessionContext;
570 use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
571
572 use super::*;
573
574 #[tokio::test]
575 async fn test_absent_basic() {
576 let schema = Arc::new(Schema::new(vec![
577 Field::new(
578 "timestamp",
579 DataType::Timestamp(TimeUnit::Millisecond, None),
580 true,
581 ),
582 Field::new("value", DataType::Float64, true),
583 ]));
584
585 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
587 let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
588 let batch =
589 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
590
591 let memory_exec = DataSourceExec::new(Arc::new(
592 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
593 ));
594
595 let output_schema = Arc::new(Schema::new(vec![
596 Field::new(
597 "timestamp",
598 DataType::Timestamp(TimeUnit::Millisecond, None),
599 true,
600 ),
601 Field::new("value", DataType::Float64, true),
602 ]));
603
604 let absent_exec = AbsentExec {
605 start: 0,
606 end: 5000,
607 step: 1000,
608 time_index_column: "timestamp".to_string(),
609 value_column: "value".to_string(),
610 fake_labels: vec![],
611 output_schema: output_schema.clone(),
612 input: Arc::new(memory_exec),
613 properties: PlanProperties::new(
614 EquivalenceProperties::new(output_schema.clone()),
615 Partitioning::UnknownPartitioning(1),
616 EmissionType::Incremental,
617 Boundedness::Bounded,
618 ),
619 metric: ExecutionPlanMetricsSet::new(),
620 };
621
622 let session_ctx = SessionContext::new();
623 let task_ctx = session_ctx.task_ctx();
624 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
625
626 let mut output_timestamps = Vec::new();
628 while let Some(batch_result) = stream.next().await {
629 let batch = batch_result.unwrap();
630 let ts_array = batch
631 .column(0)
632 .as_any()
633 .downcast_ref::<TimestampMillisecondArray>()
634 .unwrap();
635 for i in 0..ts_array.len() {
636 if !ts_array.is_null(i) {
637 let ts = ts_array.value(i);
638 output_timestamps.push(ts);
639 }
640 }
641 }
642
643 assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
646 }
647
648 #[tokio::test]
649 async fn test_absent_empty_input() {
650 let schema = Arc::new(Schema::new(vec![
651 Field::new(
652 "timestamp",
653 DataType::Timestamp(TimeUnit::Millisecond, None),
654 true,
655 ),
656 Field::new("value", DataType::Float64, true),
657 ]));
658
659 let memory_exec = DataSourceExec::new(Arc::new(
661 MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
662 ));
663
664 let output_schema = Arc::new(Schema::new(vec![
665 Field::new(
666 "timestamp",
667 DataType::Timestamp(TimeUnit::Millisecond, None),
668 true,
669 ),
670 Field::new("value", DataType::Float64, true),
671 ]));
672 let absent_exec = AbsentExec {
673 start: 0,
674 end: 2000,
675 step: 1000,
676 time_index_column: "timestamp".to_string(),
677 value_column: "value".to_string(),
678 fake_labels: vec![],
679 output_schema: output_schema.clone(),
680 input: Arc::new(memory_exec),
681 properties: PlanProperties::new(
682 EquivalenceProperties::new(output_schema.clone()),
683 Partitioning::UnknownPartitioning(1),
684 EmissionType::Incremental,
685 Boundedness::Bounded,
686 ),
687 metric: ExecutionPlanMetricsSet::new(),
688 };
689
690 let session_ctx = SessionContext::new();
691 let task_ctx = session_ctx.task_ctx();
692 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
693
694 let mut output_timestamps = Vec::new();
696 while let Some(batch_result) = stream.next().await {
697 let batch = batch_result.unwrap();
698 let ts_array = batch
699 .column(0)
700 .as_any()
701 .downcast_ref::<TimestampMillisecondArray>()
702 .unwrap();
703 for i in 0..ts_array.len() {
704 if !ts_array.is_null(i) {
705 let ts = ts_array.value(i);
706 output_timestamps.push(ts);
707 }
708 }
709 }
710
711 assert_eq!(output_timestamps, vec![0, 1000, 2000]);
713 }
714}