1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::{DataType, SchemaRef};
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{
29 EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore,
30};
31use datafusion::physical_plan::metrics::{
32 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
33};
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
36 SendableRecordBatchStream, Statistics,
37};
38use datafusion_expr::col;
39use datatypes::arrow::compute;
40use futures::{Stream, StreamExt, ready};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::series_divide::SeriesDivide;
47use crate::extension_plan::{
48 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
49};
50use crate::metrics::PROMQL_SERIES_COUNT;
51
52const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000;
53
54#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
60pub struct InstantManipulate {
61 start: Millisecond,
62 end: Millisecond,
63 lookback_delta: Millisecond,
64 interval: Millisecond,
65 time_index_column: String,
66 tag_columns: Vec<String>,
68 field_column: Option<String>,
70 input: LogicalPlan,
71 unfix: Option<UnfixIndices>,
72}
73
74#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
75struct UnfixIndices {
76 pub time_index_idx: u64,
77 pub field_index_idx: u64,
78}
79
80impl UserDefinedLogicalNodeCore for InstantManipulate {
81 fn name(&self) -> &str {
82 Self::name()
83 }
84
85 fn inputs(&self) -> Vec<&LogicalPlan> {
86 vec![&self.input]
87 }
88
89 fn schema(&self) -> &DFSchemaRef {
90 self.input.schema()
91 }
92
93 fn expressions(&self) -> Vec<Expr> {
94 if self.unfix.is_some() {
95 return vec![];
96 }
97
98 let mut exprs = vec![col(&self.time_index_column)];
99 if let Some(field) = &self.field_column {
100 exprs.push(col(field));
101 }
102 exprs
103 }
104
105 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
106 if self.unfix.is_some() {
107 return None;
108 }
109
110 let input_schema = self.input.schema();
111 if output_columns.is_empty() {
112 let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
113 return Some(vec![indices]);
114 }
115
116 let mut required = output_columns.to_vec();
117 required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
118 if let Some(field) = &self.field_column {
119 required.push(input_schema.index_of_column_by_name(None, field)?);
120 }
121
122 required.sort_unstable();
123 required.dedup();
124 Some(vec![required])
125 }
126
127 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
128 write!(
129 f,
130 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
131 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
132 )
133 }
134
135 fn with_exprs_and_inputs(
136 &self,
137 _exprs: Vec<Expr>,
138 inputs: Vec<LogicalPlan>,
139 ) -> DataFusionResult<Self> {
140 if inputs.len() != 1 {
141 return Err(DataFusionError::Internal(
142 "InstantManipulate should have exact one input".to_string(),
143 ));
144 }
145
146 let input: LogicalPlan = inputs.into_iter().next().unwrap();
147 let input_schema = input.schema();
148
149 if let Some(unfix) = &self.unfix {
150 let time_index_column = resolve_column_name(
152 unfix.time_index_idx,
153 input_schema,
154 "InstantManipulate",
155 "time index",
156 )?;
157
158 let field_column = if unfix.field_index_idx == u64::MAX {
159 None
160 } else {
161 Some(resolve_column_name(
162 unfix.field_index_idx,
163 input_schema,
164 "InstantManipulate",
165 "field",
166 )?)
167 };
168
169 Ok(Self {
170 start: self.start,
171 end: self.end,
172 lookback_delta: self.lookback_delta,
173 interval: self.interval,
174 time_index_column,
175 tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
176 field_column,
177 input,
178 unfix: None,
179 })
180 } else {
181 Ok(Self {
182 start: self.start,
183 end: self.end,
184 lookback_delta: self.lookback_delta,
185 interval: self.interval,
186 time_index_column: self.time_index_column.clone(),
187 tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
188 field_column: self.field_column.clone(),
189 input,
190 unfix: None,
191 })
192 }
193 }
194}
195
196impl InstantManipulate {
197 #[allow(clippy::too_many_arguments)]
198 pub fn new(
199 start: Millisecond,
200 end: Millisecond,
201 lookback_delta: Millisecond,
202 interval: Millisecond,
203 time_index_column: String,
204 tag_columns: Vec<String>,
205 field_column: Option<String>,
206 input: LogicalPlan,
207 ) -> Self {
208 Self {
209 start,
210 end,
211 lookback_delta,
212 interval,
213 time_index_column,
214 tag_columns,
215 field_column,
216 input,
217 unfix: None,
218 }
219 }
220
221 pub const fn name() -> &'static str {
222 "InstantManipulate"
223 }
224
225 fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec<String> {
226 if !tag_columns.is_empty() {
227 return tag_columns.to_vec();
228 }
229
230 Self::find_series_divide_tags(input).unwrap_or_default()
231 }
232
233 fn find_series_divide_tags(plan: &LogicalPlan) -> Option<Vec<String>> {
234 if let LogicalPlan::Extension(Extension { node }) = plan
235 && let Some(series_divide) = node.as_any().downcast_ref::<SeriesDivide>()
236 {
237 return Some(series_divide.tags().to_vec());
238 }
239
240 plan.inputs()
241 .into_iter()
242 .find_map(Self::find_series_divide_tags)
243 }
244
245 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
246 let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
247
248 Arc::new(InstantManipulateExec {
249 start: self.start,
250 end: self.end,
251 lookback_delta: self.lookback_delta,
252 interval: self.interval,
253 time_index_column: self.time_index_column.clone(),
254 field_column: self.field_column.clone(),
255 reuse_tsid_column,
256 input: exec_input,
257 metric: ExecutionPlanMetricsSet::new(),
258 })
259 }
260
261 pub fn serialize(&self) -> Vec<u8> {
262 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
263
264 let field_index_idx = self
265 .field_column
266 .as_ref()
267 .map(|name| serialize_column_index(self.input.schema(), name))
268 .unwrap_or(u64::MAX);
269
270 pb::InstantManipulate {
271 start: self.start,
272 end: self.end,
273 interval: self.interval,
274 lookback_delta: self.lookback_delta,
275 time_index_idx,
276 field_index_idx,
277 ..Default::default()
278 }
279 .encode_to_vec()
280 }
281
282 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
283 let pb_instant_manipulate =
284 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
285 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
286 produce_one_row: false,
287 schema: Arc::new(DFSchema::empty()),
288 });
289
290 let unfix = UnfixIndices {
291 time_index_idx: pb_instant_manipulate.time_index_idx,
292 field_index_idx: pb_instant_manipulate.field_index_idx,
293 };
294
295 Ok(Self {
296 start: pb_instant_manipulate.start,
297 end: pb_instant_manipulate.end,
298 lookback_delta: pb_instant_manipulate.lookback_delta,
299 interval: pb_instant_manipulate.interval,
300 time_index_column: String::new(),
301 tag_columns: Vec::new(),
302 field_column: None,
303 input: placeholder_plan,
304 unfix: Some(unfix),
305 })
306 }
307}
308
309#[derive(Debug)]
310pub struct InstantManipulateExec {
311 start: Millisecond,
312 end: Millisecond,
313 lookback_delta: Millisecond,
314 interval: Millisecond,
315 time_index_column: String,
316 field_column: Option<String>,
317 reuse_tsid_column: bool,
318
319 input: Arc<dyn ExecutionPlan>,
320 metric: ExecutionPlanMetricsSet,
321}
322
323impl ExecutionPlan for InstantManipulateExec {
324 fn as_any(&self) -> &dyn Any {
325 self
326 }
327
328 fn schema(&self) -> SchemaRef {
329 self.input.schema()
330 }
331
332 fn properties(&self) -> &Arc<PlanProperties> {
333 self.input.properties()
334 }
335
336 fn required_input_distribution(&self) -> Vec<Distribution> {
337 self.input.required_input_distribution()
338 }
339
340 fn maintains_input_order(&self) -> Vec<bool> {
342 vec![false; self.children().len()]
343 }
344
345 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
346 vec![&self.input]
347 }
348
349 fn with_new_children(
350 self: Arc<Self>,
351 children: Vec<Arc<dyn ExecutionPlan>>,
352 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
353 assert!(!children.is_empty());
354 Ok(Arc::new(Self {
355 start: self.start,
356 end: self.end,
357 lookback_delta: self.lookback_delta,
358 interval: self.interval,
359 time_index_column: self.time_index_column.clone(),
360 field_column: self.field_column.clone(),
361 reuse_tsid_column: self.reuse_tsid_column,
362 input: children[0].clone(),
363 metric: self.metric.clone(),
364 }))
365 }
366
367 fn execute(
368 &self,
369 partition: usize,
370 context: Arc<TaskContext>,
371 ) -> DataFusionResult<SendableRecordBatchStream> {
372 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
373 let num_series = Count::new();
374 MetricBuilder::new(&self.metric)
375 .with_partition(partition)
376 .build(MetricValue::Count {
377 name: METRIC_NUM_SERIES.into(),
378 count: num_series.clone(),
379 });
380
381 let input = self.input.execute(partition, context)?;
382 let schema = input.schema();
383 let time_index = schema
384 .column_with_name(&self.time_index_column)
385 .expect("time index column not found")
386 .0;
387 let field_index = self
388 .field_column
389 .as_ref()
390 .and_then(|name| schema.column_with_name(name))
391 .map(|x| x.0);
392 let tsid_index = schema
393 .column_with_name("__tsid")
394 .filter(|(_, field)| field.data_type() == &DataType::UInt64)
395 .map(|(index, _)| index);
396 Ok(Box::pin(InstantManipulateStream {
397 start: self.start,
398 end: self.end,
399 lookback_delta: self.lookback_delta,
400 interval: self.interval,
401 time_index,
402 field_index,
403 tsid_index,
404 reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(),
405 schema,
406 input,
407 metric: baseline_metric,
408 num_series,
409 }))
410 }
411
412 fn metrics(&self) -> Option<MetricsSet> {
413 Some(self.metric.clone_inner())
414 }
415
416 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
417 let input_stats = self.input.partition_statistics(partition)?;
418
419 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
420 let estimated_total_bytes = input_stats
421 .total_byte_size
422 .get_value()
423 .zip(input_stats.num_rows.get_value())
424 .map(|(size, rows)| {
425 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
426 })
427 .unwrap_or(Precision::Absent);
428
429 Ok(Statistics {
430 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
431 total_byte_size: estimated_total_bytes,
432 column_statistics: Statistics::unknown_column(&self.schema()),
434 })
435 }
436
437 fn name(&self) -> &str {
438 "InstantManipulateExec"
439 }
440}
441
442impl DisplayAs for InstantManipulateExec {
443 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
444 match t {
445 DisplayFormatType::Default
446 | DisplayFormatType::Verbose
447 | DisplayFormatType::TreeRender => {
448 write!(
449 f,
450 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
451 self.start,
452 self.end,
453 self.lookback_delta,
454 self.interval,
455 self.time_index_column
456 )
457 }
458 }
459 }
460}
461
462pub struct InstantManipulateStream {
463 start: Millisecond,
464 end: Millisecond,
465 lookback_delta: Millisecond,
466 interval: Millisecond,
467 time_index: usize,
469 field_index: Option<usize>,
470 tsid_index: Option<usize>,
471 reuse_tsid_column: bool,
472
473 schema: SchemaRef,
474 input: SendableRecordBatchStream,
475 metric: BaselineMetrics,
476 num_series: Count,
478}
479
480impl RecordBatchStream for InstantManipulateStream {
481 fn schema(&self) -> SchemaRef {
482 self.schema.clone()
483 }
484}
485
486impl Stream for InstantManipulateStream {
487 type Item = DataFusionResult<RecordBatch>;
488
489 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
490 let poll = match ready!(self.input.poll_next_unpin(cx)) {
491 Some(Ok(batch)) => {
492 if batch.num_rows() == 0 {
493 return Poll::Pending;
494 }
495 let timer = std::time::Instant::now();
496 self.num_series.add(1);
497 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
498 self.metric.elapsed_compute().add_elapsed(timer);
499 Poll::Ready(Some(result))
500 }
501 None => {
502 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
503 Poll::Ready(None)
504 }
505 Some(Err(e)) => Poll::Ready(Some(Err(e))),
506 };
507 self.metric.record_poll(poll)
508 }
509}
510
511impl InstantManipulateStream {
512 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
518 let ts_column = input
519 .column(self.time_index)
520 .as_any()
521 .downcast_ref::<TimestampMillisecondArray>()
522 .ok_or_else(|| {
523 DataFusionError::Execution(
524 "Time index Column downcast to TimestampMillisecondArray failed".into(),
525 )
526 })?;
527
528 if ts_column.is_empty() {
530 return Ok(input);
531 }
532
533 let field_column = self
535 .field_index
536 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
537
538 let first_ts = ts_column.value(0);
540 let last_ts = ts_column.value(ts_column.len() - 1);
541 let last_useful = if self.lookback_delta > 0 {
546 last_ts + self.lookback_delta - 1
547 } else {
548 last_ts
549 };
550
551 let max_start = first_ts.max(self.start);
552 let min_end = last_useful.min(self.end);
553
554 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
555 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
556
557 let estimated_points = if aligned_end >= aligned_start {
558 ((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize
559 } else {
560 0
561 };
562 if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS {
563 return Err(DataFusionError::Execution(format!(
564 "InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}"
565 )));
566 }
567 let mut take_indices = Vec::with_capacity(estimated_points);
568
569 let mut cursor = 0;
570
571 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
572 let mut aligned_ts = Vec::with_capacity(estimated_points);
573
574 'next: for expected_ts in aligned_ts_iter {
576 while cursor < ts_column.len() {
578 let curr = ts_column.value(cursor);
579 match curr.cmp(&expected_ts) {
580 Ordering::Equal => {
581 if let Some(field_column) = &field_column
582 && field_column.value(cursor).is_nan()
583 {
584 } else {
586 take_indices.push(cursor as u64);
587 aligned_ts.push(expected_ts);
588 }
589 continue 'next;
590 }
591 Ordering::Greater => break,
592 Ordering::Less => {}
593 }
594 cursor += 1;
595 }
596 if cursor == ts_column.len() {
597 cursor -= 1;
598 if ts_column.value(cursor) + self.lookback_delta <= expected_ts {
600 break;
601 }
602 }
603
604 let curr_ts = ts_column.value(cursor);
606 if curr_ts + self.lookback_delta <= expected_ts {
607 continue;
608 }
609 if curr_ts > expected_ts {
610 if let Some(prev_cursor) = cursor.checked_sub(1) {
612 let prev_ts = ts_column.value(prev_cursor);
613 if prev_ts + self.lookback_delta > expected_ts {
614 if let Some(field_column) = &field_column
616 && field_column.value(prev_cursor).is_nan()
617 {
618 continue;
620 }
621 take_indices.push(prev_cursor as u64);
623 aligned_ts.push(expected_ts);
624 }
625 }
626 } else if let Some(field_column) = &field_column
627 && field_column.value(cursor).is_nan()
628 {
629 } else {
631 take_indices.push(cursor as u64);
633 aligned_ts.push(expected_ts);
634 }
635 }
636
637 self.take_record_batch_optional(input, take_indices, aligned_ts)
639 }
640
641 fn take_record_batch_optional(
643 &self,
644 record_batch: RecordBatch,
645 take_indices: Vec<u64>,
646 aligned_ts: Vec<Millisecond>,
647 ) -> DataFusionResult<RecordBatch> {
648 assert_eq!(take_indices.len(), aligned_ts.len());
649
650 let output_len = aligned_ts.len();
651 let mut indices_array = None;
652 let mut arrays = Vec::with_capacity(record_batch.num_columns());
653 let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc<dyn Array>;
654
655 for (index, array) in record_batch.columns().iter().enumerate() {
656 if index == self.time_index {
657 arrays.push(aligned_ts.clone());
658 continue;
659 }
660
661 if self.reuse_tsid_column && self.tsid_index == Some(index) {
662 arrays.push(reuse_constant_column(array, output_len)?);
663 continue;
664 }
665
666 let indices_array =
667 indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone()));
668 arrays.push(compute::take(array, indices_array, None)?);
669 }
670
671 let result = RecordBatch::try_new(record_batch.schema(), arrays)
672 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
673 Ok(result)
674 }
675}
676
677fn reuse_constant_column(array: &Arc<dyn Array>, len: usize) -> DataFusionResult<Arc<dyn Array>> {
678 if len <= array.len() {
679 return Ok(array.slice(0, len));
680 }
681
682 if array.is_empty() {
683 return Ok(array.slice(0, 0));
684 }
685
686 ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len)
687}
688
689#[cfg(test)]
690mod test {
691 use datafusion::arrow::datatypes::{DataType, Field, Schema};
692 use datafusion::common::ToDFSchema;
693 use datafusion::datasource::memory::MemorySourceConfig;
694 use datafusion::datasource::source::DataSourceExec;
695 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
696 use datafusion::prelude::SessionContext;
697
698 use super::*;
699 use crate::extension_plan::test_util::{
700 TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
701 };
702
703 async fn do_normalize_test(
704 start: Millisecond,
705 end: Millisecond,
706 lookback_delta: Millisecond,
707 interval: Millisecond,
708 expected: String,
709 contains_nan: bool,
710 ) {
711 let memory_exec = if contains_nan {
712 Arc::new(prepare_test_data_with_nan())
713 } else {
714 Arc::new(prepare_test_data())
715 };
716 let normalize_exec = Arc::new(InstantManipulateExec {
717 start,
718 end,
719 lookback_delta,
720 interval,
721 time_index_column: TIME_INDEX_COLUMN.to_string(),
722 field_column: Some("value".to_string()),
723 reuse_tsid_column: false,
724 input: memory_exec,
725 metric: ExecutionPlanMetricsSet::new(),
726 });
727 let session_context = SessionContext::default();
728 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
729 .await
730 .unwrap();
731 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
732 .unwrap()
733 .to_string();
734
735 assert_eq!(result_literal, expected);
736 }
737
738 #[test]
739 fn pruning_should_keep_time_and_field_columns_for_exec() {
740 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
741 let input = LogicalPlan::EmptyRelation(EmptyRelation {
742 produce_one_row: false,
743 schema: df_schema,
744 });
745 let plan = InstantManipulate::new(
746 0,
747 0,
748 0,
749 0,
750 TIME_INDEX_COLUMN.to_string(),
751 Vec::new(),
752 Some("value".to_string()),
753 input,
754 );
755
756 let output_columns = [2usize];
758 let required = plan.necessary_children_exprs(&output_columns).unwrap();
759 let required = &required[0];
760 assert_eq!(required.as_slice(), &[0, 1, 2]);
761 }
762
763 #[test]
764 fn rebuild_should_recover_tag_columns_from_series_divide_input() {
765 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
766 let input = LogicalPlan::EmptyRelation(EmptyRelation {
767 produce_one_row: false,
768 schema: df_schema,
769 });
770 let series_divide = LogicalPlan::Extension(Extension {
771 node: Arc::new(SeriesDivide::new(
772 vec!["__tsid".to_string()],
773 TIME_INDEX_COLUMN.to_string(),
774 input,
775 )),
776 });
777 let bytes = InstantManipulate::new(
778 0,
779 0,
780 0,
781 0,
782 TIME_INDEX_COLUMN.to_string(),
783 vec!["__tsid".to_string()],
784 Some("value".to_string()),
785 series_divide.clone(),
786 )
787 .serialize();
788 let plan = InstantManipulate::deserialize(&bytes)
789 .unwrap()
790 .with_exprs_and_inputs(vec![], vec![series_divide])
791 .unwrap();
792
793 assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
794 }
795
796 #[test]
797 fn rebuild_should_recover_tag_columns_from_series_normalize_input() {
798 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
799 let input = LogicalPlan::EmptyRelation(EmptyRelation {
800 produce_one_row: false,
801 schema: df_schema,
802 });
803 let series_divide = LogicalPlan::Extension(Extension {
804 node: Arc::new(SeriesDivide::new(
805 vec!["__tsid".to_string()],
806 TIME_INDEX_COLUMN.to_string(),
807 input,
808 )),
809 });
810 let series_normalize = LogicalPlan::Extension(Extension {
811 node: Arc::new(crate::extension_plan::SeriesNormalize::new(
812 0,
813 TIME_INDEX_COLUMN,
814 false,
815 vec!["__tsid".to_string()],
816 series_divide,
817 )),
818 });
819 let bytes = InstantManipulate::new(
820 0,
821 0,
822 0,
823 0,
824 TIME_INDEX_COLUMN.to_string(),
825 vec!["__tsid".to_string()],
826 Some("value".to_string()),
827 series_normalize.clone(),
828 )
829 .serialize();
830 let plan = InstantManipulate::deserialize(&bytes)
831 .unwrap()
832 .with_exprs_and_inputs(vec![], vec![series_normalize])
833 .unwrap();
834
835 assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
836 }
837
838 #[test]
839 fn to_execution_plan_enables_tsid_fast_path() {
840 let schema = Arc::new(Schema::new(vec![
841 Field::new(
842 TIME_INDEX_COLUMN,
843 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
844 false,
845 ),
846 Field::new("value", DataType::Float64, true),
847 ]));
848 let exec_input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
849 MemorySourceConfig::try_new(&[], schema, None).unwrap(),
850 )));
851
852 let exec = InstantManipulate::new(
853 0,
854 0,
855 0,
856 0,
857 TIME_INDEX_COLUMN.to_string(),
858 vec!["__tsid".to_string()],
859 Some("value".to_string()),
860 LogicalPlan::EmptyRelation(EmptyRelation {
861 produce_one_row: false,
862 schema: Arc::new(datafusion::common::DFSchema::empty()),
863 }),
864 )
865 .to_execution_plan(exec_input);
866
867 assert!(format!("{exec:?}").contains("reuse_tsid_column: true"));
868 }
869
870 #[tokio::test]
871 async fn tsid_fast_path_reuses_tsid_column_when_output_grows() {
872 let schema = Arc::new(Schema::new(vec![
873 Field::new(
874 TIME_INDEX_COLUMN,
875 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
876 false,
877 ),
878 Field::new("value", DataType::Float64, true),
879 Field::new("host", DataType::Utf8, true),
880 Field::new("__tsid", DataType::UInt64, false),
881 ]));
882 let batch = RecordBatch::try_new(
883 schema.clone(),
884 vec![
885 Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
886 Arc::new(Float64Array::from(vec![1.0, 2.0])),
887 Arc::new(datafusion::arrow::array::StringArray::from(vec![
888 "foo", "foo",
889 ])),
890 Arc::new(UInt64Array::from(vec![42, 42])),
891 ],
892 )
893 .unwrap();
894 let input = Arc::new(DataSourceExec::new(Arc::new(
895 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
896 )));
897 let normalize_exec = Arc::new(InstantManipulateExec {
898 start: 0,
899 end: 1_500,
900 lookback_delta: 1_000,
901 interval: 500,
902 time_index_column: TIME_INDEX_COLUMN.to_string(),
903 field_column: Some("value".to_string()),
904 reuse_tsid_column: true,
905 input,
906 metric: ExecutionPlanMetricsSet::new(),
907 });
908 let session_context = SessionContext::default();
909 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
910 .await
911 .unwrap();
912 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
913 .unwrap()
914 .to_string();
915
916 assert_eq!(
917 result_literal,
918 "+-------------------------+-------+------+--------+\
919 \n| timestamp | value | host | __tsid |\
920 \n+-------------------------+-------+------+--------+\
921 \n| 1970-01-01T00:00:00 | 1.0 | foo | 42 |\
922 \n| 1970-01-01T00:00:00.500 | 1.0 | foo | 42 |\
923 \n| 1970-01-01T00:00:01 | 2.0 | foo | 42 |\
924 \n| 1970-01-01T00:00:01.500 | 2.0 | foo | 42 |\
925 \n+-------------------------+-------+------+--------+"
926 );
927 }
928
929 #[tokio::test]
930 async fn tsid_fast_path_still_takes_additional_field_columns() {
931 let schema = Arc::new(Schema::new(vec![
932 Field::new(
933 TIME_INDEX_COLUMN,
934 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
935 false,
936 ),
937 Field::new("value", DataType::Float64, true),
938 Field::new("value_2", DataType::Float64, true),
939 Field::new("host", DataType::Utf8, true),
940 Field::new("__tsid", DataType::UInt64, false),
941 ]));
942 let batch = RecordBatch::try_new(
943 schema.clone(),
944 vec![
945 Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
946 Arc::new(Float64Array::from(vec![1.0, 2.0])),
947 Arc::new(Float64Array::from(vec![10.0, 20.0])),
948 Arc::new(datafusion::arrow::array::StringArray::from(vec![
949 "foo", "foo",
950 ])),
951 Arc::new(UInt64Array::from(vec![42, 42])),
952 ],
953 )
954 .unwrap();
955 let input = Arc::new(DataSourceExec::new(Arc::new(
956 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
957 )));
958 let normalize_exec = Arc::new(InstantManipulateExec {
959 start: 0,
960 end: 1_500,
961 lookback_delta: 1_000,
962 interval: 500,
963 time_index_column: TIME_INDEX_COLUMN.to_string(),
964 field_column: Some("value".to_string()),
965 reuse_tsid_column: true,
966 input,
967 metric: ExecutionPlanMetricsSet::new(),
968 });
969 let session_context = SessionContext::default();
970 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
971 .await
972 .unwrap();
973 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
974 .unwrap()
975 .to_string();
976
977 assert_eq!(
978 result_literal,
979 "+-------------------------+-------+---------+------+--------+\
980 \n| timestamp | value | value_2 | host | __tsid |\
981 \n+-------------------------+-------+---------+------+--------+\
982 \n| 1970-01-01T00:00:00 | 1.0 | 10.0 | foo | 42 |\
983 \n| 1970-01-01T00:00:00.500 | 1.0 | 10.0 | foo | 42 |\
984 \n| 1970-01-01T00:00:01 | 2.0 | 20.0 | foo | 42 |\
985 \n| 1970-01-01T00:00:01.500 | 2.0 | 20.0 | foo | 42 |\
986 \n+-------------------------+-------+---------+------+--------+"
987 );
988 }
989
990 #[tokio::test]
991 async fn manipulate_should_reject_too_many_output_points() {
992 let schema = Arc::new(Schema::new(vec![
993 Field::new(
994 TIME_INDEX_COLUMN,
995 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
996 false,
997 ),
998 Field::new("value", DataType::Float64, true),
999 ]));
1000 let batch = RecordBatch::try_new(
1001 schema.clone(),
1002 vec![
1003 Arc::new(TimestampMillisecondArray::from(vec![0])),
1004 Arc::new(Float64Array::from(vec![1.0])),
1005 ],
1006 )
1007 .unwrap();
1008 let input = Arc::new(DataSourceExec::new(Arc::new(
1009 MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
1010 )));
1011 let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1;
1012 let normalize_exec = Arc::new(InstantManipulateExec {
1013 start: 0,
1014 end: too_many_points,
1015 lookback_delta: too_many_points + 1,
1016 interval: 1,
1017 time_index_column: TIME_INDEX_COLUMN.to_string(),
1018 field_column: Some("value".to_string()),
1019 reuse_tsid_column: false,
1020 input,
1021 metric: ExecutionPlanMetricsSet::new(),
1022 });
1023 let session_context = SessionContext::default();
1024 let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
1025 .await
1026 .unwrap_err();
1027
1028 assert!(
1029 err.to_string()
1030 .contains("InstantManipulate output points exceed limit")
1031 );
1032 }
1033
1034 #[tokio::test]
1035 async fn lookback_10s_interval_30s() {
1036 let expected = String::from(
1037 "+---------------------+-------+------+\
1038 \n| timestamp | value | path |\
1039 \n+---------------------+-------+------+\
1040 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1041 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1042 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1043 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1044 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1045 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1046 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1047 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1048 \n+---------------------+-------+------+",
1049 );
1050 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
1051 }
1052
1053 #[tokio::test]
1054 async fn lookback_10s_interval_10s() {
1055 let expected = String::from(
1056 "+---------------------+-------+------+\
1057 \n| timestamp | value | path |\
1058 \n+---------------------+-------+------+\
1059 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1060 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1061 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1062 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1063 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1064 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1065 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1066 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
1067 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
1068 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1069 \n+---------------------+-------+------+",
1070 );
1071 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
1072 }
1073
1074 #[tokio::test]
1075 async fn lookback_30s_interval_30s() {
1076 let expected = String::from(
1077 "+---------------------+-------+------+\
1078 \n| timestamp | value | path |\
1079 \n+---------------------+-------+------+\
1080 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1081 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1082 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1083 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1084 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1085 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1086 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1087 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
1088 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1089 \n+---------------------+-------+------+",
1090 );
1091 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
1092 }
1093
1094 #[tokio::test]
1095 async fn lookback_30s_interval_10s() {
1096 let expected = String::from(
1097 "+---------------------+-------+------+\
1098 \n| timestamp | value | path |\
1099 \n+---------------------+-------+------+\
1100 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1101 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
1102 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
1103 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1104 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
1105 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
1106 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1107 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
1108 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
1109 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1110 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
1111 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
1112 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1113 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
1114 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
1115 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1116 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
1117 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
1118 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1119 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
1120 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
1121 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
1122 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
1123 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
1124 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1125 \n+---------------------+-------+------+",
1126 );
1127 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
1128 }
1129
1130 #[tokio::test]
1131 async fn lookback_60s_interval_10s() {
1132 let expected = String::from(
1133 "+---------------------+-------+------+\
1134 \n| timestamp | value | path |\
1135 \n+---------------------+-------+------+\
1136 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1137 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
1138 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
1139 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1140 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
1141 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
1142 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1143 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
1144 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
1145 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1146 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
1147 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
1148 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1149 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
1150 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
1151 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
1152 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
1153 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
1154 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1155 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
1156 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
1157 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
1158 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
1159 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
1160 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1161 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
1162 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
1163 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
1164 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
1165 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
1166 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1167 \n+---------------------+-------+------+",
1168 );
1169 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
1170 }
1171
1172 #[tokio::test]
1173 async fn lookback_60s_interval_30s() {
1174 let expected = String::from(
1175 "+---------------------+-------+------+\
1176 \n| timestamp | value | path |\
1177 \n+---------------------+-------+------+\
1178 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1179 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1180 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1181 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
1182 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1183 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
1184 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1185 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
1186 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1187 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
1188 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1189 \n+---------------------+-------+------+",
1190 );
1191 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
1192 }
1193
1194 #[tokio::test]
1195 async fn small_range_lookback_0s_interval_1s() {
1196 let expected = String::from(
1197 "+---------------------+-------+------+\
1198 \n| timestamp | value | path |\
1199 \n+---------------------+-------+------+\
1200 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1201 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
1202 \n+---------------------+-------+------+",
1203 );
1204 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
1205 }
1206
1207 #[tokio::test]
1208 async fn small_range_lookback_10s_interval_10s() {
1209 let expected = String::from(
1210 "+---------------------+-------+------+\
1211 \n| timestamp | value | path |\
1212 \n+---------------------+-------+------+\
1213 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1214 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
1215 \n+---------------------+-------+------+",
1216 );
1217 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
1218 }
1219
1220 #[tokio::test]
1221 async fn large_range_lookback_30s_interval_60s() {
1222 let expected = String::from(
1223 "+---------------------+-------+------+\
1224 \n| timestamp | value | path |\
1225 \n+---------------------+-------+------+\
1226 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
1227 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
1228 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
1229 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
1230 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1231 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1232 \n+---------------------+-------+------+",
1233 );
1234 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
1235 }
1236
1237 #[tokio::test]
1238 async fn small_range_lookback_30s_interval_30s() {
1239 let expected = String::from(
1240 "+---------------------+-------+------+\
1241 \n| timestamp | value | path |\
1242 \n+---------------------+-------+------+\
1243 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
1244 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
1245 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
1246 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
1247 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
1248 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
1249 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
1250 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
1251 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
1252 \n+---------------------+-------+------+",
1253 );
1254 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
1255 }
1256
1257 #[tokio::test]
1258 async fn lookback_10s_interval_10s_with_nan() {
1259 let expected = String::from(
1260 "+---------------------+-------+\
1261 \n| timestamp | value |\
1262 \n+---------------------+-------+\
1263 \n| 1970-01-01T00:00:00 | 0.0 |\
1264 \n| 1970-01-01T00:01:00 | 6.0 |\
1265 \n| 1970-01-01T00:02:00 | 12.0 |\
1266 \n+---------------------+-------+",
1267 );
1268 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
1269 }
1270
1271 #[tokio::test]
1272 async fn lookback_10s_interval_10s_with_nan_unaligned() {
1273 let expected = String::from(
1274 "+-------------------------+-------+\
1275 \n| timestamp | value |\
1276 \n+-------------------------+-------+\
1277 \n| 1970-01-01T00:00:00.001 | 0.0 |\
1278 \n| 1970-01-01T00:01:00.001 | 6.0 |\
1279 \n| 1970-01-01T00:02:00.001 | 12.0 |\
1280 \n+-------------------------+-------+",
1281 );
1282 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
1283 }
1284
1285 #[tokio::test]
1286 async fn ultra_large_range() {
1287 let expected = String::from(
1288 "+-------------------------+-------+\
1289 \n| timestamp | value |\
1290 \n+-------------------------+-------+\
1291 \n| 1970-01-01T00:00:00.001 | 0.0 |\
1292 \n| 1970-01-01T00:01:00.001 | 6.0 |\
1293 \n| 1970-01-01T00:02:00.001 | 12.0 |\
1294 \n+-------------------------+-------+",
1295 );
1296 do_normalize_test(
1297 -900_000_000_000_000 + 1,
1298 900_000_000_000_000,
1299 10_000,
1300 10_000,
1301 expected,
1302 true,
1303 )
1304 .await;
1305 }
1306}