1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
53pub struct InstantManipulate {
54 start: Millisecond,
55 end: Millisecond,
56 lookback_delta: Millisecond,
57 interval: Millisecond,
58 time_index_column: String,
59 field_column: Option<String>,
61 input: LogicalPlan,
62}
63
64impl UserDefinedLogicalNodeCore for InstantManipulate {
65 fn name(&self) -> &str {
66 Self::name()
67 }
68
69 fn inputs(&self) -> Vec<&LogicalPlan> {
70 vec![&self.input]
71 }
72
73 fn schema(&self) -> &DFSchemaRef {
74 self.input.schema()
75 }
76
77 fn expressions(&self) -> Vec<Expr> {
78 vec![]
79 }
80
81 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82 write!(
83 f,
84 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
85 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
86 )
87 }
88
89 fn with_exprs_and_inputs(
90 &self,
91 _exprs: Vec<Expr>,
92 inputs: Vec<LogicalPlan>,
93 ) -> DataFusionResult<Self> {
94 if inputs.len() != 1 {
95 return Err(DataFusionError::Internal(
96 "InstantManipulate should have exact one input".to_string(),
97 ));
98 }
99
100 Ok(Self {
101 start: self.start,
102 end: self.end,
103 lookback_delta: self.lookback_delta,
104 interval: self.interval,
105 time_index_column: self.time_index_column.clone(),
106 field_column: self.field_column.clone(),
107 input: inputs.into_iter().next().unwrap(),
108 })
109 }
110}
111
112impl InstantManipulate {
113 pub fn new(
114 start: Millisecond,
115 end: Millisecond,
116 lookback_delta: Millisecond,
117 interval: Millisecond,
118 time_index_column: String,
119 field_column: Option<String>,
120 input: LogicalPlan,
121 ) -> Self {
122 Self {
123 start,
124 end,
125 lookback_delta,
126 interval,
127 time_index_column,
128 field_column,
129 input,
130 }
131 }
132
133 pub const fn name() -> &'static str {
134 "InstantManipulate"
135 }
136
137 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
138 Arc::new(InstantManipulateExec {
139 start: self.start,
140 end: self.end,
141 lookback_delta: self.lookback_delta,
142 interval: self.interval,
143 time_index_column: self.time_index_column.clone(),
144 field_column: self.field_column.clone(),
145 input: exec_input,
146 metric: ExecutionPlanMetricsSet::new(),
147 })
148 }
149
150 pub fn serialize(&self) -> Vec<u8> {
151 pb::InstantManipulate {
152 start: self.start,
153 end: self.end,
154 interval: self.interval,
155 lookback_delta: self.lookback_delta,
156 time_index: self.time_index_column.clone(),
157 field_index: self.field_column.clone().unwrap_or_default(),
158 }
159 .encode_to_vec()
160 }
161
162 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
163 let pb_instant_manipulate =
164 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
165 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
166 produce_one_row: false,
167 schema: Arc::new(DFSchema::empty()),
168 });
169 let field_column = if pb_instant_manipulate.field_index.is_empty() {
170 None
171 } else {
172 Some(pb_instant_manipulate.field_index)
173 };
174 Ok(Self {
175 start: pb_instant_manipulate.start,
176 end: pb_instant_manipulate.end,
177 lookback_delta: pb_instant_manipulate.lookback_delta,
178 interval: pb_instant_manipulate.interval,
179 time_index_column: pb_instant_manipulate.time_index,
180 field_column,
181 input: placeholder_plan,
182 })
183 }
184}
185
186#[derive(Debug)]
187pub struct InstantManipulateExec {
188 start: Millisecond,
189 end: Millisecond,
190 lookback_delta: Millisecond,
191 interval: Millisecond,
192 time_index_column: String,
193 field_column: Option<String>,
194
195 input: Arc<dyn ExecutionPlan>,
196 metric: ExecutionPlanMetricsSet,
197}
198
199impl ExecutionPlan for InstantManipulateExec {
200 fn as_any(&self) -> &dyn Any {
201 self
202 }
203
204 fn schema(&self) -> SchemaRef {
205 self.input.schema()
206 }
207
208 fn properties(&self) -> &PlanProperties {
209 self.input.properties()
210 }
211
212 fn required_input_distribution(&self) -> Vec<Distribution> {
213 self.input.required_input_distribution()
214 }
215
216 fn maintains_input_order(&self) -> Vec<bool> {
218 vec![false; self.children().len()]
219 }
220
221 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
222 vec![&self.input]
223 }
224
225 fn with_new_children(
226 self: Arc<Self>,
227 children: Vec<Arc<dyn ExecutionPlan>>,
228 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
229 assert!(!children.is_empty());
230 Ok(Arc::new(Self {
231 start: self.start,
232 end: self.end,
233 lookback_delta: self.lookback_delta,
234 interval: self.interval,
235 time_index_column: self.time_index_column.clone(),
236 field_column: self.field_column.clone(),
237 input: children[0].clone(),
238 metric: self.metric.clone(),
239 }))
240 }
241
242 fn execute(
243 &self,
244 partition: usize,
245 context: Arc<TaskContext>,
246 ) -> DataFusionResult<SendableRecordBatchStream> {
247 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
248 let metrics_builder = MetricBuilder::new(&self.metric);
249 let num_series = Count::new();
250 metrics_builder
251 .with_partition(partition)
252 .build(MetricValue::Count {
253 name: METRIC_NUM_SERIES.into(),
254 count: num_series.clone(),
255 });
256
257 let input = self.input.execute(partition, context)?;
258 let schema = input.schema();
259 let time_index = schema
260 .column_with_name(&self.time_index_column)
261 .expect("time index column not found")
262 .0;
263 let field_index = self
264 .field_column
265 .as_ref()
266 .and_then(|name| schema.column_with_name(name))
267 .map(|x| x.0);
268 Ok(Box::pin(InstantManipulateStream {
269 start: self.start,
270 end: self.end,
271 lookback_delta: self.lookback_delta,
272 interval: self.interval,
273 time_index,
274 field_index,
275 schema,
276 input,
277 metric: baseline_metric,
278 num_series,
279 }))
280 }
281
282 fn metrics(&self) -> Option<MetricsSet> {
283 Some(self.metric.clone_inner())
284 }
285
286 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
287 let input_stats = self.input.partition_statistics(partition)?;
288
289 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
290 let estimated_total_bytes = input_stats
291 .total_byte_size
292 .get_value()
293 .zip(input_stats.num_rows.get_value())
294 .map(|(size, rows)| {
295 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
296 })
297 .unwrap_or(Precision::Absent);
298
299 Ok(Statistics {
300 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
301 total_byte_size: estimated_total_bytes,
302 column_statistics: vec![
304 ColumnStatistics::new_unknown();
305 self.schema().flattened_fields().len()
306 ],
307 })
308 }
309
310 fn name(&self) -> &str {
311 "InstantManipulateExec"
312 }
313}
314
315impl DisplayAs for InstantManipulateExec {
316 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
317 match t {
318 DisplayFormatType::Default
319 | DisplayFormatType::Verbose
320 | DisplayFormatType::TreeRender => {
321 write!(
322 f,
323 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
324 self.start,self.end, self.lookback_delta, self.interval, self.time_index_column
325 )
326 }
327 }
328 }
329}
330
331pub struct InstantManipulateStream {
332 start: Millisecond,
333 end: Millisecond,
334 lookback_delta: Millisecond,
335 interval: Millisecond,
336 time_index: usize,
338 field_index: Option<usize>,
339
340 schema: SchemaRef,
341 input: SendableRecordBatchStream,
342 metric: BaselineMetrics,
343 num_series: Count,
345}
346
347impl RecordBatchStream for InstantManipulateStream {
348 fn schema(&self) -> SchemaRef {
349 self.schema.clone()
350 }
351}
352
353impl Stream for InstantManipulateStream {
354 type Item = DataFusionResult<RecordBatch>;
355
356 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357 let poll = match ready!(self.input.poll_next_unpin(cx)) {
358 Some(Ok(batch)) => {
359 if batch.num_rows() == 0 {
360 return Poll::Pending;
361 }
362 let timer = std::time::Instant::now();
363 self.num_series.add(1);
364 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
365 self.metric.elapsed_compute().add_elapsed(timer);
366 Poll::Ready(Some(result))
367 }
368 None => {
369 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
370 Poll::Ready(None)
371 }
372 Some(Err(e)) => Poll::Ready(Some(Err(e))),
373 };
374 self.metric.record_poll(poll)
375 }
376}
377
378impl InstantManipulateStream {
379 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
382 let mut take_indices = vec![];
383
384 let ts_column = input
385 .column(self.time_index)
386 .as_any()
387 .downcast_ref::<TimestampMillisecondArray>()
388 .ok_or_else(|| {
389 DataFusionError::Execution(
390 "Time index Column downcast to TimestampMillisecondArray failed".into(),
391 )
392 })?;
393
394 if ts_column.is_empty() {
396 return Ok(input);
397 }
398
399 let field_column = self
401 .field_index
402 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
403
404 let first_ts = ts_column.value(0);
406 let last_ts = ts_column.value(ts_column.len() - 1);
407 let last_useful = last_ts + self.lookback_delta;
408
409 let max_start = first_ts.max(self.start);
410 let min_end = last_useful.min(self.end);
411
412 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
413 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
414
415 let mut cursor = 0;
416
417 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
418 let mut aligned_ts = vec![];
419
420 'next: for expected_ts in aligned_ts_iter {
422 while cursor < ts_column.len() {
424 let curr = ts_column.value(cursor);
425 match curr.cmp(&expected_ts) {
426 Ordering::Equal => {
427 if let Some(field_column) = &field_column
428 && field_column.value(cursor).is_nan()
429 {
430 } else {
432 take_indices.push(cursor as u64);
433 aligned_ts.push(expected_ts);
434 }
435 continue 'next;
436 }
437 Ordering::Greater => break,
438 Ordering::Less => {}
439 }
440 cursor += 1;
441 }
442 if cursor == ts_column.len() {
443 cursor -= 1;
444 if ts_column.value(cursor) + self.lookback_delta < expected_ts {
446 break;
447 }
448 }
449
450 let curr_ts = ts_column.value(cursor);
452 if curr_ts + self.lookback_delta < expected_ts {
453 continue;
454 }
455 if curr_ts > expected_ts {
456 if let Some(prev_cursor) = cursor.checked_sub(1) {
458 let prev_ts = ts_column.value(prev_cursor);
459 if prev_ts + self.lookback_delta >= expected_ts {
460 if let Some(field_column) = &field_column
462 && field_column.value(prev_cursor).is_nan()
463 {
464 continue;
466 }
467 take_indices.push(prev_cursor as u64);
469 aligned_ts.push(expected_ts);
470 }
471 }
472 } else if let Some(field_column) = &field_column
473 && field_column.value(cursor).is_nan()
474 {
475 } else {
477 take_indices.push(cursor as u64);
479 aligned_ts.push(expected_ts);
480 }
481 }
482
483 self.take_record_batch_optional(input, take_indices, aligned_ts)
485 }
486
487 fn take_record_batch_optional(
489 &self,
490 record_batch: RecordBatch,
491 take_indices: Vec<u64>,
492 aligned_ts: Vec<Millisecond>,
493 ) -> DataFusionResult<RecordBatch> {
494 assert_eq!(take_indices.len(), aligned_ts.len());
495
496 let indices_array = UInt64Array::from(take_indices);
497 let mut arrays = record_batch
498 .columns()
499 .iter()
500 .map(|array| compute::take(array, &indices_array, None))
501 .collect::<ArrowResult<Vec<_>>>()?;
502 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
503
504 let result = RecordBatch::try_new(record_batch.schema(), arrays)
505 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
506 Ok(result)
507 }
508}
509
510#[cfg(test)]
511mod test {
512 use datafusion::prelude::SessionContext;
513
514 use super::*;
515 use crate::extension_plan::test_util::{
516 prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN,
517 };
518
519 async fn do_normalize_test(
520 start: Millisecond,
521 end: Millisecond,
522 lookback_delta: Millisecond,
523 interval: Millisecond,
524 expected: String,
525 contains_nan: bool,
526 ) {
527 let memory_exec = if contains_nan {
528 Arc::new(prepare_test_data_with_nan())
529 } else {
530 Arc::new(prepare_test_data())
531 };
532 let normalize_exec = Arc::new(InstantManipulateExec {
533 start,
534 end,
535 lookback_delta,
536 interval,
537 time_index_column: TIME_INDEX_COLUMN.to_string(),
538 field_column: Some("value".to_string()),
539 input: memory_exec,
540 metric: ExecutionPlanMetricsSet::new(),
541 });
542 let session_context = SessionContext::default();
543 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
544 .await
545 .unwrap();
546 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
547 .unwrap()
548 .to_string();
549
550 assert_eq!(result_literal, expected);
551 }
552
553 #[tokio::test]
554 async fn lookback_10s_interval_30s() {
555 let expected = String::from(
556 "+---------------------+-------+------+\
557 \n| timestamp | value | path |\
558 \n+---------------------+-------+------+\
559 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
560 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
561 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
562 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
563 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
564 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
565 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
566 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
567 \n+---------------------+-------+------+",
568 );
569 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
570 }
571
572 #[tokio::test]
573 async fn lookback_10s_interval_10s() {
574 let expected = String::from(
575 "+---------------------+-------+------+\
576 \n| timestamp | value | path |\
577 \n+---------------------+-------+------+\
578 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
579 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
580 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
581 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
582 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
583 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
584 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
585 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
586 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
587 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
588 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
589 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
590 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
591 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
592 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
593 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
594 \n+---------------------+-------+------+",
595 );
596 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
597 }
598
599 #[tokio::test]
600 async fn lookback_30s_interval_30s() {
601 let expected = String::from(
602 "+---------------------+-------+------+\
603 \n| timestamp | value | path |\
604 \n+---------------------+-------+------+\
605 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
606 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
607 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
608 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
609 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
610 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
611 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
612 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
613 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
614 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
615 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
616 \n+---------------------+-------+------+",
617 );
618 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
619 }
620
621 #[tokio::test]
622 async fn lookback_30s_interval_10s() {
623 let expected = String::from(
624 "+---------------------+-------+------+\
625 \n| timestamp | value | path |\
626 \n+---------------------+-------+------+\
627 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
628 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
629 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
630 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
631 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
632 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
633 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
634 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
635 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
636 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
637 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
638 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
639 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
640 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
641 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
642 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
643 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
644 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
645 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
646 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
647 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
648 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
649 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
650 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
651 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
652 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
653 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
654 \n+---------------------+-------+------+",
655 );
656 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
657 }
658
659 #[tokio::test]
660 async fn lookback_60s_interval_10s() {
661 let expected = String::from(
662 "+---------------------+-------+------+\
663 \n| timestamp | value | path |\
664 \n+---------------------+-------+------+\
665 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
666 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
667 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
668 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
669 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
670 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
671 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
672 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
673 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
674 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
675 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
676 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
677 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
678 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
679 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
680 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
681 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
682 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
683 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
684 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
685 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
686 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
687 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
688 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
689 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
690 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
691 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
692 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
693 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
694 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
695 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
696 \n+---------------------+-------+------+",
697 );
698 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
699 }
700
701 #[tokio::test]
702 async fn lookback_60s_interval_30s() {
703 let expected = String::from(
704 "+---------------------+-------+------+\
705 \n| timestamp | value | path |\
706 \n+---------------------+-------+------+\
707 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
708 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
709 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
710 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
711 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
712 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
713 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
714 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
715 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
716 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
717 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
718 \n+---------------------+-------+------+",
719 );
720 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
721 }
722
723 #[tokio::test]
724 async fn small_range_lookback_0s_interval_1s() {
725 let expected = String::from(
726 "+---------------------+-------+------+\
727 \n| timestamp | value | path |\
728 \n+---------------------+-------+------+\
729 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
730 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
731 \n+---------------------+-------+------+",
732 );
733 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
734 }
735
736 #[tokio::test]
737 async fn small_range_lookback_10s_interval_10s() {
738 let expected = String::from(
739 "+---------------------+-------+------+\
740 \n| timestamp | value | path |\
741 \n+---------------------+-------+------+\
742 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
743 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
744 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
745 \n+---------------------+-------+------+",
746 );
747 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
748 }
749
750 #[tokio::test]
751 async fn large_range_lookback_30s_interval_60s() {
752 let expected = String::from(
753 "+---------------------+-------+------+\
754 \n| timestamp | value | path |\
755 \n+---------------------+-------+------+\
756 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
757 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
758 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
759 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
760 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
761 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
762 \n+---------------------+-------+------+",
763 );
764 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
765 }
766
767 #[tokio::test]
768 async fn small_range_lookback_30s_interval_30s() {
769 let expected = String::from(
770 "+---------------------+-------+------+\
771 \n| timestamp | value | path |\
772 \n+---------------------+-------+------+\
773 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
774 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
775 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
776 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
777 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
778 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
779 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
780 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
781 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
782 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
783 \n+---------------------+-------+------+",
784 );
785 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
786 }
787
788 #[tokio::test]
789 async fn lookback_10s_interval_10s_with_nan() {
790 let expected = String::from(
791 "+---------------------+-------+\
792 \n| timestamp | value |\
793 \n+---------------------+-------+\
794 \n| 1970-01-01T00:00:00 | 0.0 |\
795 \n| 1970-01-01T00:00:10 | 0.0 |\
796 \n| 1970-01-01T00:01:00 | 6.0 |\
797 \n| 1970-01-01T00:01:10 | 6.0 |\
798 \n| 1970-01-01T00:02:00 | 12.0 |\
799 \n| 1970-01-01T00:02:10 | 12.0 |\
800 \n+---------------------+-------+",
801 );
802 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
803 }
804
805 #[tokio::test]
806 async fn lookback_10s_interval_10s_with_nan_unaligned() {
807 let expected = String::from(
808 "+-------------------------+-------+\
809 \n| timestamp | value |\
810 \n+-------------------------+-------+\
811 \n| 1970-01-01T00:00:00.001 | 0.0 |\
812 \n| 1970-01-01T00:01:00.001 | 6.0 |\
813 \n| 1970-01-01T00:02:00.001 | 12.0 |\
814 \n+-------------------------+-------+",
815 );
816 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
817 }
818
819 #[tokio::test]
820 async fn ultra_large_range() {
821 let expected = String::from(
822 "+-------------------------+-------+\
823 \n| timestamp | value |\
824 \n+-------------------------+-------+\
825 \n| 1970-01-01T00:00:00.001 | 0.0 |\
826 \n| 1970-01-01T00:01:00.001 | 6.0 |\
827 \n| 1970-01-01T00:02:00.001 | 12.0 |\
828 \n+-------------------------+-------+",
829 );
830 do_normalize_test(
831 -900_000_000_000_000 + 1,
832 900_000_000_000_000,
833 10_000,
834 10_000,
835 expected,
836 true,
837 )
838 .await;
839 }
840}