1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{
45 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
46};
47use crate::metrics::PROMQL_SERIES_COUNT;
48
49#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
55pub struct InstantManipulate {
56 start: Millisecond,
57 end: Millisecond,
58 lookback_delta: Millisecond,
59 interval: Millisecond,
60 time_index_column: String,
61 field_column: Option<String>,
63 input: LogicalPlan,
64 unfix: Option<UnfixIndices>,
65}
66
67#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
68struct UnfixIndices {
69 pub time_index_idx: u64,
70 pub field_index_idx: u64,
71}
72
73impl UserDefinedLogicalNodeCore for InstantManipulate {
74 fn name(&self) -> &str {
75 Self::name()
76 }
77
78 fn inputs(&self) -> Vec<&LogicalPlan> {
79 vec![&self.input]
80 }
81
82 fn schema(&self) -> &DFSchemaRef {
83 self.input.schema()
84 }
85
86 fn expressions(&self) -> Vec<Expr> {
87 vec![]
88 }
89
90 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91 write!(
92 f,
93 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
94 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
95 )
96 }
97
98 fn with_exprs_and_inputs(
99 &self,
100 _exprs: Vec<Expr>,
101 inputs: Vec<LogicalPlan>,
102 ) -> DataFusionResult<Self> {
103 if inputs.len() != 1 {
104 return Err(DataFusionError::Internal(
105 "InstantManipulate should have exact one input".to_string(),
106 ));
107 }
108
109 let input: LogicalPlan = inputs.into_iter().next().unwrap();
110 let input_schema = input.schema();
111
112 if let Some(unfix) = &self.unfix {
113 let time_index_column = resolve_column_name(
115 unfix.time_index_idx,
116 input_schema,
117 "InstantManipulate",
118 "time index",
119 )?;
120
121 let field_column = if unfix.field_index_idx == u64::MAX {
122 None
123 } else {
124 Some(resolve_column_name(
125 unfix.field_index_idx,
126 input_schema,
127 "InstantManipulate",
128 "field",
129 )?)
130 };
131
132 Ok(Self {
133 start: self.start,
134 end: self.end,
135 lookback_delta: self.lookback_delta,
136 interval: self.interval,
137 time_index_column,
138 field_column,
139 input,
140 unfix: None,
141 })
142 } else {
143 Ok(Self {
144 start: self.start,
145 end: self.end,
146 lookback_delta: self.lookback_delta,
147 interval: self.interval,
148 time_index_column: self.time_index_column.clone(),
149 field_column: self.field_column.clone(),
150 input,
151 unfix: None,
152 })
153 }
154 }
155}
156
157impl InstantManipulate {
158 pub fn new(
159 start: Millisecond,
160 end: Millisecond,
161 lookback_delta: Millisecond,
162 interval: Millisecond,
163 time_index_column: String,
164 field_column: Option<String>,
165 input: LogicalPlan,
166 ) -> Self {
167 Self {
168 start,
169 end,
170 lookback_delta,
171 interval,
172 time_index_column,
173 field_column,
174 input,
175 unfix: None,
176 }
177 }
178
179 pub const fn name() -> &'static str {
180 "InstantManipulate"
181 }
182
183 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
184 Arc::new(InstantManipulateExec {
185 start: self.start,
186 end: self.end,
187 lookback_delta: self.lookback_delta,
188 interval: self.interval,
189 time_index_column: self.time_index_column.clone(),
190 field_column: self.field_column.clone(),
191 input: exec_input,
192 metric: ExecutionPlanMetricsSet::new(),
193 })
194 }
195
196 pub fn serialize(&self) -> Vec<u8> {
197 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
198
199 let field_index_idx = self
200 .field_column
201 .as_ref()
202 .map(|name| serialize_column_index(self.input.schema(), name))
203 .unwrap_or(u64::MAX);
204
205 pb::InstantManipulate {
206 start: self.start,
207 end: self.end,
208 interval: self.interval,
209 lookback_delta: self.lookback_delta,
210 time_index_idx,
211 field_index_idx,
212 ..Default::default()
213 }
214 .encode_to_vec()
215 }
216
217 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
218 let pb_instant_manipulate =
219 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
220 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
221 produce_one_row: false,
222 schema: Arc::new(DFSchema::empty()),
223 });
224
225 let unfix = UnfixIndices {
226 time_index_idx: pb_instant_manipulate.time_index_idx,
227 field_index_idx: pb_instant_manipulate.field_index_idx,
228 };
229
230 Ok(Self {
231 start: pb_instant_manipulate.start,
232 end: pb_instant_manipulate.end,
233 lookback_delta: pb_instant_manipulate.lookback_delta,
234 interval: pb_instant_manipulate.interval,
235 time_index_column: String::new(),
236 field_column: None,
237 input: placeholder_plan,
238 unfix: Some(unfix),
239 })
240 }
241}
242
243#[derive(Debug)]
244pub struct InstantManipulateExec {
245 start: Millisecond,
246 end: Millisecond,
247 lookback_delta: Millisecond,
248 interval: Millisecond,
249 time_index_column: String,
250 field_column: Option<String>,
251
252 input: Arc<dyn ExecutionPlan>,
253 metric: ExecutionPlanMetricsSet,
254}
255
256impl ExecutionPlan for InstantManipulateExec {
257 fn as_any(&self) -> &dyn Any {
258 self
259 }
260
261 fn schema(&self) -> SchemaRef {
262 self.input.schema()
263 }
264
265 fn properties(&self) -> &PlanProperties {
266 self.input.properties()
267 }
268
269 fn required_input_distribution(&self) -> Vec<Distribution> {
270 self.input.required_input_distribution()
271 }
272
273 fn maintains_input_order(&self) -> Vec<bool> {
275 vec![false; self.children().len()]
276 }
277
278 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
279 vec![&self.input]
280 }
281
282 fn with_new_children(
283 self: Arc<Self>,
284 children: Vec<Arc<dyn ExecutionPlan>>,
285 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
286 assert!(!children.is_empty());
287 Ok(Arc::new(Self {
288 start: self.start,
289 end: self.end,
290 lookback_delta: self.lookback_delta,
291 interval: self.interval,
292 time_index_column: self.time_index_column.clone(),
293 field_column: self.field_column.clone(),
294 input: children[0].clone(),
295 metric: self.metric.clone(),
296 }))
297 }
298
299 fn execute(
300 &self,
301 partition: usize,
302 context: Arc<TaskContext>,
303 ) -> DataFusionResult<SendableRecordBatchStream> {
304 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
305 let metrics_builder = MetricBuilder::new(&self.metric);
306 let num_series = Count::new();
307 metrics_builder
308 .with_partition(partition)
309 .build(MetricValue::Count {
310 name: METRIC_NUM_SERIES.into(),
311 count: num_series.clone(),
312 });
313
314 let input = self.input.execute(partition, context)?;
315 let schema = input.schema();
316 let time_index = schema
317 .column_with_name(&self.time_index_column)
318 .expect("time index column not found")
319 .0;
320 let field_index = self
321 .field_column
322 .as_ref()
323 .and_then(|name| schema.column_with_name(name))
324 .map(|x| x.0);
325 Ok(Box::pin(InstantManipulateStream {
326 start: self.start,
327 end: self.end,
328 lookback_delta: self.lookback_delta,
329 interval: self.interval,
330 time_index,
331 field_index,
332 schema,
333 input,
334 metric: baseline_metric,
335 num_series,
336 }))
337 }
338
339 fn metrics(&self) -> Option<MetricsSet> {
340 Some(self.metric.clone_inner())
341 }
342
343 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
344 let input_stats = self.input.partition_statistics(partition)?;
345
346 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
347 let estimated_total_bytes = input_stats
348 .total_byte_size
349 .get_value()
350 .zip(input_stats.num_rows.get_value())
351 .map(|(size, rows)| {
352 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
353 })
354 .unwrap_or(Precision::Absent);
355
356 Ok(Statistics {
357 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
358 total_byte_size: estimated_total_bytes,
359 column_statistics: vec![
361 ColumnStatistics::new_unknown();
362 self.schema().flattened_fields().len()
363 ],
364 })
365 }
366
367 fn name(&self) -> &str {
368 "InstantManipulateExec"
369 }
370}
371
372impl DisplayAs for InstantManipulateExec {
373 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
374 match t {
375 DisplayFormatType::Default
376 | DisplayFormatType::Verbose
377 | DisplayFormatType::TreeRender => {
378 write!(
379 f,
380 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
381 self.start,
382 self.end,
383 self.lookback_delta,
384 self.interval,
385 self.time_index_column
386 )
387 }
388 }
389 }
390}
391
392pub struct InstantManipulateStream {
393 start: Millisecond,
394 end: Millisecond,
395 lookback_delta: Millisecond,
396 interval: Millisecond,
397 time_index: usize,
399 field_index: Option<usize>,
400
401 schema: SchemaRef,
402 input: SendableRecordBatchStream,
403 metric: BaselineMetrics,
404 num_series: Count,
406}
407
408impl RecordBatchStream for InstantManipulateStream {
409 fn schema(&self) -> SchemaRef {
410 self.schema.clone()
411 }
412}
413
414impl Stream for InstantManipulateStream {
415 type Item = DataFusionResult<RecordBatch>;
416
417 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
418 let poll = match ready!(self.input.poll_next_unpin(cx)) {
419 Some(Ok(batch)) => {
420 if batch.num_rows() == 0 {
421 return Poll::Pending;
422 }
423 let timer = std::time::Instant::now();
424 self.num_series.add(1);
425 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
426 self.metric.elapsed_compute().add_elapsed(timer);
427 Poll::Ready(Some(result))
428 }
429 None => {
430 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
431 Poll::Ready(None)
432 }
433 Some(Err(e)) => Poll::Ready(Some(Err(e))),
434 };
435 self.metric.record_poll(poll)
436 }
437}
438
439impl InstantManipulateStream {
440 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
443 let mut take_indices = vec![];
444
445 let ts_column = input
446 .column(self.time_index)
447 .as_any()
448 .downcast_ref::<TimestampMillisecondArray>()
449 .ok_or_else(|| {
450 DataFusionError::Execution(
451 "Time index Column downcast to TimestampMillisecondArray failed".into(),
452 )
453 })?;
454
455 if ts_column.is_empty() {
457 return Ok(input);
458 }
459
460 let field_column = self
462 .field_index
463 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
464
465 let first_ts = ts_column.value(0);
467 let last_ts = ts_column.value(ts_column.len() - 1);
468 let last_useful = last_ts + self.lookback_delta;
469
470 let max_start = first_ts.max(self.start);
471 let min_end = last_useful.min(self.end);
472
473 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
474 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
475
476 let mut cursor = 0;
477
478 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
479 let mut aligned_ts = vec![];
480
481 'next: for expected_ts in aligned_ts_iter {
483 while cursor < ts_column.len() {
485 let curr = ts_column.value(cursor);
486 match curr.cmp(&expected_ts) {
487 Ordering::Equal => {
488 if let Some(field_column) = &field_column
489 && field_column.value(cursor).is_nan()
490 {
491 } else {
493 take_indices.push(cursor as u64);
494 aligned_ts.push(expected_ts);
495 }
496 continue 'next;
497 }
498 Ordering::Greater => break,
499 Ordering::Less => {}
500 }
501 cursor += 1;
502 }
503 if cursor == ts_column.len() {
504 cursor -= 1;
505 if ts_column.value(cursor) + self.lookback_delta < expected_ts {
507 break;
508 }
509 }
510
511 let curr_ts = ts_column.value(cursor);
513 if curr_ts + self.lookback_delta < expected_ts {
514 continue;
515 }
516 if curr_ts > expected_ts {
517 if let Some(prev_cursor) = cursor.checked_sub(1) {
519 let prev_ts = ts_column.value(prev_cursor);
520 if prev_ts + self.lookback_delta >= expected_ts {
521 if let Some(field_column) = &field_column
523 && field_column.value(prev_cursor).is_nan()
524 {
525 continue;
527 }
528 take_indices.push(prev_cursor as u64);
530 aligned_ts.push(expected_ts);
531 }
532 }
533 } else if let Some(field_column) = &field_column
534 && field_column.value(cursor).is_nan()
535 {
536 } else {
538 take_indices.push(cursor as u64);
540 aligned_ts.push(expected_ts);
541 }
542 }
543
544 self.take_record_batch_optional(input, take_indices, aligned_ts)
546 }
547
548 fn take_record_batch_optional(
550 &self,
551 record_batch: RecordBatch,
552 take_indices: Vec<u64>,
553 aligned_ts: Vec<Millisecond>,
554 ) -> DataFusionResult<RecordBatch> {
555 assert_eq!(take_indices.len(), aligned_ts.len());
556
557 let indices_array = UInt64Array::from(take_indices);
558 let mut arrays = record_batch
559 .columns()
560 .iter()
561 .map(|array| compute::take(array, &indices_array, None))
562 .collect::<ArrowResult<Vec<_>>>()?;
563 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
564
565 let result = RecordBatch::try_new(record_batch.schema(), arrays)
566 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
567 Ok(result)
568 }
569}
570
571#[cfg(test)]
572mod test {
573 use datafusion::prelude::SessionContext;
574
575 use super::*;
576 use crate::extension_plan::test_util::{
577 TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
578 };
579
580 async fn do_normalize_test(
581 start: Millisecond,
582 end: Millisecond,
583 lookback_delta: Millisecond,
584 interval: Millisecond,
585 expected: String,
586 contains_nan: bool,
587 ) {
588 let memory_exec = if contains_nan {
589 Arc::new(prepare_test_data_with_nan())
590 } else {
591 Arc::new(prepare_test_data())
592 };
593 let normalize_exec = Arc::new(InstantManipulateExec {
594 start,
595 end,
596 lookback_delta,
597 interval,
598 time_index_column: TIME_INDEX_COLUMN.to_string(),
599 field_column: Some("value".to_string()),
600 input: memory_exec,
601 metric: ExecutionPlanMetricsSet::new(),
602 });
603 let session_context = SessionContext::default();
604 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
605 .await
606 .unwrap();
607 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
608 .unwrap()
609 .to_string();
610
611 assert_eq!(result_literal, expected);
612 }
613
614 #[tokio::test]
615 async fn lookback_10s_interval_30s() {
616 let expected = String::from(
617 "+---------------------+-------+------+\
618 \n| timestamp | value | path |\
619 \n+---------------------+-------+------+\
620 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
621 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
622 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
623 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
624 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
625 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
626 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
627 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
628 \n+---------------------+-------+------+",
629 );
630 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
631 }
632
633 #[tokio::test]
634 async fn lookback_10s_interval_10s() {
635 let expected = String::from(
636 "+---------------------+-------+------+\
637 \n| timestamp | value | path |\
638 \n+---------------------+-------+------+\
639 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
640 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
641 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
642 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
643 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
644 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
645 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
646 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
647 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
648 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
649 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
650 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
651 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
652 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
653 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
654 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
655 \n+---------------------+-------+------+",
656 );
657 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
658 }
659
660 #[tokio::test]
661 async fn lookback_30s_interval_30s() {
662 let expected = String::from(
663 "+---------------------+-------+------+\
664 \n| timestamp | value | path |\
665 \n+---------------------+-------+------+\
666 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
667 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
668 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
669 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
670 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
671 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
672 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
673 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
674 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
675 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
676 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
677 \n+---------------------+-------+------+",
678 );
679 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
680 }
681
682 #[tokio::test]
683 async fn lookback_30s_interval_10s() {
684 let expected = String::from(
685 "+---------------------+-------+------+\
686 \n| timestamp | value | path |\
687 \n+---------------------+-------+------+\
688 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
689 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
690 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
691 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
692 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
693 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
694 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
695 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
696 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
697 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
698 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
699 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
700 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
701 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
702 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
703 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
704 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
705 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
706 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
707 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
708 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
709 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
710 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
711 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
712 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
713 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
714 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
715 \n+---------------------+-------+------+",
716 );
717 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
718 }
719
720 #[tokio::test]
721 async fn lookback_60s_interval_10s() {
722 let expected = String::from(
723 "+---------------------+-------+------+\
724 \n| timestamp | value | path |\
725 \n+---------------------+-------+------+\
726 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
727 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
728 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
729 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
730 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
731 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
732 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
733 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
734 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
735 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
736 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
737 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
738 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
739 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
740 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
741 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
742 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
743 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
744 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
745 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
746 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
747 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
748 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
749 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
750 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
751 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
752 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
753 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
754 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
755 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
756 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
757 \n+---------------------+-------+------+",
758 );
759 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
760 }
761
762 #[tokio::test]
763 async fn lookback_60s_interval_30s() {
764 let expected = String::from(
765 "+---------------------+-------+------+\
766 \n| timestamp | value | path |\
767 \n+---------------------+-------+------+\
768 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
769 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
770 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
771 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
772 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
773 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
774 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
775 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
776 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
777 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
778 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
779 \n+---------------------+-------+------+",
780 );
781 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
782 }
783
784 #[tokio::test]
785 async fn small_range_lookback_0s_interval_1s() {
786 let expected = String::from(
787 "+---------------------+-------+------+\
788 \n| timestamp | value | path |\
789 \n+---------------------+-------+------+\
790 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
791 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
792 \n+---------------------+-------+------+",
793 );
794 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
795 }
796
797 #[tokio::test]
798 async fn small_range_lookback_10s_interval_10s() {
799 let expected = String::from(
800 "+---------------------+-------+------+\
801 \n| timestamp | value | path |\
802 \n+---------------------+-------+------+\
803 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
804 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
805 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
806 \n+---------------------+-------+------+",
807 );
808 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
809 }
810
811 #[tokio::test]
812 async fn large_range_lookback_30s_interval_60s() {
813 let expected = String::from(
814 "+---------------------+-------+------+\
815 \n| timestamp | value | path |\
816 \n+---------------------+-------+------+\
817 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
818 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
819 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
820 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
821 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
822 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
823 \n+---------------------+-------+------+",
824 );
825 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
826 }
827
828 #[tokio::test]
829 async fn small_range_lookback_30s_interval_30s() {
830 let expected = String::from(
831 "+---------------------+-------+------+\
832 \n| timestamp | value | path |\
833 \n+---------------------+-------+------+\
834 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
835 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
836 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
837 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
838 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
839 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
840 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
841 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
842 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
843 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
844 \n+---------------------+-------+------+",
845 );
846 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
847 }
848
849 #[tokio::test]
850 async fn lookback_10s_interval_10s_with_nan() {
851 let expected = String::from(
852 "+---------------------+-------+\
853 \n| timestamp | value |\
854 \n+---------------------+-------+\
855 \n| 1970-01-01T00:00:00 | 0.0 |\
856 \n| 1970-01-01T00:00:10 | 0.0 |\
857 \n| 1970-01-01T00:01:00 | 6.0 |\
858 \n| 1970-01-01T00:01:10 | 6.0 |\
859 \n| 1970-01-01T00:02:00 | 12.0 |\
860 \n| 1970-01-01T00:02:10 | 12.0 |\
861 \n+---------------------+-------+",
862 );
863 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
864 }
865
866 #[tokio::test]
867 async fn lookback_10s_interval_10s_with_nan_unaligned() {
868 let expected = String::from(
869 "+-------------------------+-------+\
870 \n| timestamp | value |\
871 \n+-------------------------+-------+\
872 \n| 1970-01-01T00:00:00.001 | 0.0 |\
873 \n| 1970-01-01T00:01:00.001 | 6.0 |\
874 \n| 1970-01-01T00:02:00.001 | 12.0 |\
875 \n+-------------------------+-------+",
876 );
877 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
878 }
879
880 #[tokio::test]
881 async fn ultra_large_range() {
882 let expected = String::from(
883 "+-------------------------+-------+\
884 \n| timestamp | value |\
885 \n+-------------------------+-------+\
886 \n| 1970-01-01T00:00:00.001 | 0.0 |\
887 \n| 1970-01-01T00:01:00.001 | 6.0 |\
888 \n| 1970-01-01T00:02:00.001 | 12.0 |\
889 \n+-------------------------+-------+",
890 );
891 do_normalize_test(
892 -900_000_000_000_000 + 1,
893 900_000_000_000_000,
894 10_000,
895 10_000,
896 expected,
897 true,
898 )
899 .await;
900 }
901}