1use std::any::Any;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use datafusion::arrow::array::Array;
23use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::{
27 EquivalenceProperties, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
28};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
34 RecordBatchStream, SendableRecordBatchStream,
35};
36use datafusion_common::DFSchema;
37use datafusion_expr::EmptyRelation;
38use datatypes::arrow;
39use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
40use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::arrow_array::StringArray;
43use datatypes::compute::SortOptions;
44use futures::{ready, Stream, StreamExt};
45use greptime_proto::substrait_extension as pb;
46use prost::Message;
47use snafu::ResultExt;
48
49use crate::error::DeserializeSnafu;
50use crate::extension_plan::Millisecond;
51
52const ABSENT_BATCH_SIZE: usize = 8192;
54
55#[derive(Debug, PartialEq, Eq, Hash)]
56pub struct Absent {
57 start: Millisecond,
58 end: Millisecond,
59 step: Millisecond,
60 time_index_column: String,
61 value_column: String,
62 fake_labels: Vec<(String, String)>,
63 input: LogicalPlan,
64 output_schema: DFSchemaRef,
65}
66
67impl PartialOrd for Absent {
68 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
69 (
71 self.start,
72 self.end,
73 self.step,
74 &self.time_index_column,
75 &self.value_column,
76 &self.fake_labels,
77 )
78 .partial_cmp(&(
79 other.start,
80 other.end,
81 other.step,
82 &other.time_index_column,
83 &other.value_column,
84 &other.fake_labels,
85 ))
86 }
87}
88
89impl UserDefinedLogicalNodeCore for Absent {
90 fn name(&self) -> &str {
91 Self::name()
92 }
93
94 fn inputs(&self) -> Vec<&LogicalPlan> {
95 vec![&self.input]
96 }
97
98 fn schema(&self) -> &DFSchemaRef {
99 &self.output_schema
100 }
101
102 fn expressions(&self) -> Vec<Expr> {
103 vec![]
104 }
105
106 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
107 write!(
108 f,
109 "PromAbsent: start={}, end={}, step={}",
110 self.start, self.end, self.step
111 )
112 }
113
114 fn with_exprs_and_inputs(
115 &self,
116 _exprs: Vec<Expr>,
117 inputs: Vec<LogicalPlan>,
118 ) -> DataFusionResult<Self> {
119 if inputs.is_empty() {
120 return Err(datafusion::error::DataFusionError::Internal(
121 "Absent must have at least one input".to_string(),
122 ));
123 }
124
125 Ok(Self {
126 start: self.start,
127 end: self.end,
128 step: self.step,
129 time_index_column: self.time_index_column.clone(),
130 value_column: self.value_column.clone(),
131 fake_labels: self.fake_labels.clone(),
132 input: inputs[0].clone(),
133 output_schema: self.output_schema.clone(),
134 })
135 }
136}
137
138impl Absent {
139 pub fn try_new(
140 start: Millisecond,
141 end: Millisecond,
142 step: Millisecond,
143 time_index_column: String,
144 value_column: String,
145 fake_labels: Vec<(String, String)>,
146 input: LogicalPlan,
147 ) -> DataFusionResult<Self> {
148 let mut fields = vec![
149 Field::new(
150 &time_index_column,
151 DataType::Timestamp(TimeUnit::Millisecond, None),
152 true,
153 ),
154 Field::new(&value_column, DataType::Float64, true),
155 ];
156
157 let mut fake_labels = fake_labels
159 .into_iter()
160 .collect::<HashMap<String, String>>()
161 .into_iter()
162 .collect::<Vec<_>>();
163 fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
164 for (name, _) in fake_labels.iter() {
165 fields.push(Field::new(name, DataType::Utf8, true));
166 }
167
168 let output_schema = Arc::new(DFSchema::from_unqualified_fields(
169 fields.into(),
170 HashMap::new(),
171 )?);
172
173 Ok(Self {
174 start,
175 end,
176 step,
177 time_index_column,
178 value_column,
179 fake_labels,
180 input,
181 output_schema,
182 })
183 }
184
185 pub const fn name() -> &'static str {
186 "prom_absent"
187 }
188
189 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
190 let output_schema = Arc::new(self.output_schema.as_arrow().clone());
191 let properties = PlanProperties::new(
192 EquivalenceProperties::new(output_schema.clone()),
193 Partitioning::UnknownPartitioning(1),
194 EmissionType::Incremental,
195 Boundedness::Bounded,
196 );
197 Arc::new(AbsentExec {
198 start: self.start,
199 end: self.end,
200 step: self.step,
201 time_index_column: self.time_index_column.clone(),
202 value_column: self.value_column.clone(),
203 fake_labels: self.fake_labels.clone(),
204 output_schema: output_schema.clone(),
205 input: exec_input,
206 properties,
207 metric: ExecutionPlanMetricsSet::new(),
208 })
209 }
210
211 pub fn serialize(&self) -> Vec<u8> {
212 pb::Absent {
213 start: self.start,
214 end: self.end,
215 step: self.step,
216 time_index_column: self.time_index_column.clone(),
217 value_column: self.value_column.clone(),
218 fake_labels: self
219 .fake_labels
220 .iter()
221 .map(|(name, value)| pb::LabelPair {
222 key: name.clone(),
223 value: value.clone(),
224 })
225 .collect(),
226 }
227 .encode_to_vec()
228 }
229
230 pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
231 let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
232 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
233 produce_one_row: false,
234 schema: Arc::new(DFSchema::empty()),
235 });
236 Self::try_new(
237 pb_absent.start,
238 pb_absent.end,
239 pb_absent.step,
240 pb_absent.time_index_column,
241 pb_absent.value_column,
242 pb_absent
243 .fake_labels
244 .iter()
245 .map(|label| (label.key.clone(), label.value.clone()))
246 .collect(),
247 placeholder_plan,
248 )
249 }
250}
251
252#[derive(Debug)]
253pub struct AbsentExec {
254 start: Millisecond,
255 end: Millisecond,
256 step: Millisecond,
257 time_index_column: String,
258 value_column: String,
259 fake_labels: Vec<(String, String)>,
260 output_schema: SchemaRef,
261 input: Arc<dyn ExecutionPlan>,
262 properties: PlanProperties,
263 metric: ExecutionPlanMetricsSet,
264}
265
266impl ExecutionPlan for AbsentExec {
267 fn as_any(&self) -> &dyn Any {
268 self
269 }
270
271 fn schema(&self) -> SchemaRef {
272 self.output_schema.clone()
273 }
274
275 fn properties(&self) -> &PlanProperties {
276 &self.properties
277 }
278
279 fn required_input_distribution(&self) -> Vec<Distribution> {
280 vec![Distribution::SinglePartition]
281 }
282
283 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
284 let requirement = LexRequirement::from([PhysicalSortRequirement {
285 expr: Arc::new(
286 ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
287 ),
288 options: Some(SortOptions {
289 descending: false,
290 nulls_first: false,
291 }),
292 }]);
293 vec![Some(OrderingRequirements::new(requirement))]
294 }
295
296 fn maintains_input_order(&self) -> Vec<bool> {
297 vec![false]
298 }
299
300 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
301 vec![&self.input]
302 }
303
304 fn with_new_children(
305 self: Arc<Self>,
306 children: Vec<Arc<dyn ExecutionPlan>>,
307 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
308 assert!(!children.is_empty());
309 Ok(Arc::new(Self {
310 start: self.start,
311 end: self.end,
312 step: self.step,
313 time_index_column: self.time_index_column.clone(),
314 value_column: self.value_column.clone(),
315 fake_labels: self.fake_labels.clone(),
316 output_schema: self.output_schema.clone(),
317 input: children[0].clone(),
318 properties: self.properties.clone(),
319 metric: self.metric.clone(),
320 }))
321 }
322
323 fn execute(
324 &self,
325 partition: usize,
326 context: Arc<TaskContext>,
327 ) -> DataFusionResult<SendableRecordBatchStream> {
328 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
329 let input = self.input.execute(partition, context)?;
330
331 Ok(Box::pin(AbsentStream {
332 end: self.end,
333 step: self.step,
334 time_index_column_index: self
335 .input
336 .schema()
337 .column_with_name(&self.time_index_column)
338 .unwrap() .0,
340 output_schema: self.output_schema.clone(),
341 fake_labels: self.fake_labels.clone(),
342 input,
343 metric: baseline_metric,
344 output_timestamps: Vec::new(),
346 output_ts_cursor: self.start,
348 input_finished: false,
349 }))
350 }
351
352 fn metrics(&self) -> Option<MetricsSet> {
353 Some(self.metric.clone_inner())
354 }
355
356 fn name(&self) -> &str {
357 "AbsentExec"
358 }
359}
360
361impl DisplayAs for AbsentExec {
362 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
363 match t {
364 DisplayFormatType::Default
365 | DisplayFormatType::Verbose
366 | DisplayFormatType::TreeRender => {
367 write!(
368 f,
369 "PromAbsentExec: start={}, end={}, step={}",
370 self.start, self.end, self.step
371 )
372 }
373 }
374 }
375}
376
377pub struct AbsentStream {
378 end: Millisecond,
379 step: Millisecond,
380 time_index_column_index: usize,
381 output_schema: SchemaRef,
382 fake_labels: Vec<(String, String)>,
383 input: SendableRecordBatchStream,
384 metric: BaselineMetrics,
385 output_timestamps: Vec<Millisecond>,
387 output_ts_cursor: Millisecond,
389 input_finished: bool,
390}
391
392impl RecordBatchStream for AbsentStream {
393 fn schema(&self) -> SchemaRef {
394 self.output_schema.clone()
395 }
396}
397
398impl Stream for AbsentStream {
399 type Item = DataFusionResult<RecordBatch>;
400
401 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
402 loop {
403 if !self.input_finished {
404 match ready!(self.input.poll_next_unpin(cx)) {
405 Some(Ok(batch)) => {
406 let timer = std::time::Instant::now();
407 if let Err(e) = self.process_input_batch(&batch) {
408 return Poll::Ready(Some(Err(e)));
409 }
410 self.metric.elapsed_compute().add_elapsed(timer);
411
412 if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
414 let timer = std::time::Instant::now();
415 let result = self.flush_output_batch();
416 self.metric.elapsed_compute().add_elapsed(timer);
417
418 match result {
419 Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
420 Ok(None) => continue,
421 Err(e) => return Poll::Ready(Some(Err(e))),
422 }
423 }
424 }
425 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
426 None => {
427 self.input_finished = true;
428
429 let timer = std::time::Instant::now();
430 if let Err(e) = self.process_remaining_absent_timestamps() {
432 return Poll::Ready(Some(Err(e)));
433 }
434 let result = self.flush_output_batch();
435 self.metric.elapsed_compute().add_elapsed(timer);
436 return Poll::Ready(result.transpose());
437 }
438 }
439 } else {
440 return Poll::Ready(None);
441 }
442 }
443 }
444}
445
446impl AbsentStream {
447 fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
448 let timestamp_array = batch.column(self.time_index_column_index);
450 let milli_ts_array = arrow::compute::cast(
451 timestamp_array,
452 &DataType::Timestamp(TimeUnit::Millisecond, None),
453 )?;
454 let timestamp_array = milli_ts_array
455 .as_any()
456 .downcast_ref::<TimestampMillisecondArray>()
457 .unwrap();
458
459 for &input_ts in timestamp_array.values() {
461 while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
463 self.output_timestamps.push(self.output_ts_cursor);
464 self.output_ts_cursor += self.step;
465 }
466
467 if self.output_ts_cursor == input_ts {
469 self.output_ts_cursor += self.step;
470 }
471 }
472
473 Ok(())
474 }
475
476 fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
477 while self.output_ts_cursor <= self.end {
479 self.output_timestamps.push(self.output_ts_cursor);
480 self.output_ts_cursor += self.step;
481 }
482 Ok(())
483 }
484
485 fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
486 if self.output_timestamps.is_empty() {
487 return Ok(None);
488 }
489
490 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
491 let num_rows = self.output_timestamps.len();
492 columns.push(Arc::new(TimestampMillisecondArray::from(
493 self.output_timestamps.clone(),
494 )) as _);
495 columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
496
497 for (_, value) in self.fake_labels.iter() {
498 columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
499 Some(value.clone()),
500 num_rows,
501 ))) as _);
502 }
503
504 let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
505
506 self.output_timestamps.clear();
507 Ok(Some(batch))
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use std::sync::Arc;
514
515 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
516 use datafusion::arrow::record_batch::RecordBatch;
517 use datafusion::catalog::memory::DataSourceExec;
518 use datafusion::datasource::memory::MemorySourceConfig;
519 use datafusion::prelude::SessionContext;
520 use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
521
522 use super::*;
523
524 #[tokio::test]
525 async fn test_absent_basic() {
526 let schema = Arc::new(Schema::new(vec![
527 Field::new(
528 "timestamp",
529 DataType::Timestamp(TimeUnit::Millisecond, None),
530 true,
531 ),
532 Field::new("value", DataType::Float64, true),
533 ]));
534
535 let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
537 let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
538 let batch =
539 RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
540
541 let memory_exec = DataSourceExec::new(Arc::new(
542 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
543 ));
544
545 let output_schema = Arc::new(Schema::new(vec![
546 Field::new(
547 "timestamp",
548 DataType::Timestamp(TimeUnit::Millisecond, None),
549 true,
550 ),
551 Field::new("value", DataType::Float64, true),
552 ]));
553
554 let absent_exec = AbsentExec {
555 start: 0,
556 end: 5000,
557 step: 1000,
558 time_index_column: "timestamp".to_string(),
559 value_column: "value".to_string(),
560 fake_labels: vec![],
561 output_schema: output_schema.clone(),
562 input: Arc::new(memory_exec),
563 properties: PlanProperties::new(
564 EquivalenceProperties::new(output_schema.clone()),
565 Partitioning::UnknownPartitioning(1),
566 EmissionType::Incremental,
567 Boundedness::Bounded,
568 ),
569 metric: ExecutionPlanMetricsSet::new(),
570 };
571
572 let session_ctx = SessionContext::new();
573 let task_ctx = session_ctx.task_ctx();
574 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
575
576 let mut output_timestamps = Vec::new();
578 while let Some(batch_result) = stream.next().await {
579 let batch = batch_result.unwrap();
580 let ts_array = batch
581 .column(0)
582 .as_any()
583 .downcast_ref::<TimestampMillisecondArray>()
584 .unwrap();
585 for i in 0..ts_array.len() {
586 if !ts_array.is_null(i) {
587 let ts = ts_array.value(i);
588 output_timestamps.push(ts);
589 }
590 }
591 }
592
593 assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
596 }
597
598 #[tokio::test]
599 async fn test_absent_empty_input() {
600 let schema = Arc::new(Schema::new(vec![
601 Field::new(
602 "timestamp",
603 DataType::Timestamp(TimeUnit::Millisecond, None),
604 true,
605 ),
606 Field::new("value", DataType::Float64, true),
607 ]));
608
609 let memory_exec = DataSourceExec::new(Arc::new(
611 MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap(),
612 ));
613
614 let output_schema = Arc::new(Schema::new(vec![
615 Field::new(
616 "timestamp",
617 DataType::Timestamp(TimeUnit::Millisecond, None),
618 true,
619 ),
620 Field::new("value", DataType::Float64, true),
621 ]));
622 let absent_exec = AbsentExec {
623 start: 0,
624 end: 2000,
625 step: 1000,
626 time_index_column: "timestamp".to_string(),
627 value_column: "value".to_string(),
628 fake_labels: vec![],
629 output_schema: output_schema.clone(),
630 input: Arc::new(memory_exec),
631 properties: PlanProperties::new(
632 EquivalenceProperties::new(output_schema.clone()),
633 Partitioning::UnknownPartitioning(1),
634 EmissionType::Incremental,
635 Boundedness::Bounded,
636 ),
637 metric: ExecutionPlanMetricsSet::new(),
638 };
639
640 let session_ctx = SessionContext::new();
641 let task_ctx = session_ctx.task_ctx();
642 let mut stream = absent_exec.execute(0, task_ctx).unwrap();
643
644 let mut output_timestamps = Vec::new();
646 while let Some(batch_result) = stream.next().await {
647 let batch = batch_result.unwrap();
648 let ts_array = batch
649 .column(0)
650 .as_any()
651 .downcast_ref::<TimestampMillisecondArray>()
652 .unwrap();
653 for i in 0..ts_array.len() {
654 if !ts_array.is_null(i) {
655 let ts = ts_array.value(i);
656 output_timestamps.push(ts);
657 }
658 }
659 }
660
661 assert_eq!(output_timestamps, vec![0, 1000, 2000]);
663 }
664}