1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
53pub struct InstantManipulate {
54 start: Millisecond,
55 end: Millisecond,
56 lookback_delta: Millisecond,
57 interval: Millisecond,
58 time_index_column: String,
59 field_column: Option<String>,
61 input: LogicalPlan,
62}
63
64impl UserDefinedLogicalNodeCore for InstantManipulate {
65 fn name(&self) -> &str {
66 Self::name()
67 }
68
69 fn inputs(&self) -> Vec<&LogicalPlan> {
70 vec![&self.input]
71 }
72
73 fn schema(&self) -> &DFSchemaRef {
74 self.input.schema()
75 }
76
77 fn expressions(&self) -> Vec<Expr> {
78 vec![]
79 }
80
81 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82 write!(
83 f,
84 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
85 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
86 )
87 }
88
89 fn with_exprs_and_inputs(
90 &self,
91 _exprs: Vec<Expr>,
92 inputs: Vec<LogicalPlan>,
93 ) -> DataFusionResult<Self> {
94 if inputs.len() != 1 {
95 return Err(DataFusionError::Internal(
96 "InstantManipulate should have exact one input".to_string(),
97 ));
98 }
99
100 Ok(Self {
101 start: self.start,
102 end: self.end,
103 lookback_delta: self.lookback_delta,
104 interval: self.interval,
105 time_index_column: self.time_index_column.clone(),
106 field_column: self.field_column.clone(),
107 input: inputs.into_iter().next().unwrap(),
108 })
109 }
110}
111
112impl InstantManipulate {
113 pub fn new(
114 start: Millisecond,
115 end: Millisecond,
116 lookback_delta: Millisecond,
117 interval: Millisecond,
118 time_index_column: String,
119 field_column: Option<String>,
120 input: LogicalPlan,
121 ) -> Self {
122 Self {
123 start,
124 end,
125 lookback_delta,
126 interval,
127 time_index_column,
128 field_column,
129 input,
130 }
131 }
132
133 pub const fn name() -> &'static str {
134 "InstantManipulate"
135 }
136
137 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
138 Arc::new(InstantManipulateExec {
139 start: self.start,
140 end: self.end,
141 lookback_delta: self.lookback_delta,
142 interval: self.interval,
143 time_index_column: self.time_index_column.clone(),
144 field_column: self.field_column.clone(),
145 input: exec_input,
146 metric: ExecutionPlanMetricsSet::new(),
147 })
148 }
149
150 pub fn serialize(&self) -> Vec<u8> {
151 pb::InstantManipulate {
152 start: self.start,
153 end: self.end,
154 interval: self.interval,
155 lookback_delta: self.lookback_delta,
156 time_index: self.time_index_column.clone(),
157 field_index: self.field_column.clone().unwrap_or_default(),
158 }
159 .encode_to_vec()
160 }
161
162 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
163 let pb_instant_manipulate =
164 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
165 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
166 produce_one_row: false,
167 schema: Arc::new(DFSchema::empty()),
168 });
169 let field_column = if pb_instant_manipulate.field_index.is_empty() {
170 None
171 } else {
172 Some(pb_instant_manipulate.field_index)
173 };
174 Ok(Self {
175 start: pb_instant_manipulate.start,
176 end: pb_instant_manipulate.end,
177 lookback_delta: pb_instant_manipulate.lookback_delta,
178 interval: pb_instant_manipulate.interval,
179 time_index_column: pb_instant_manipulate.time_index,
180 field_column,
181 input: placeholder_plan,
182 })
183 }
184}
185
186#[derive(Debug)]
187pub struct InstantManipulateExec {
188 start: Millisecond,
189 end: Millisecond,
190 lookback_delta: Millisecond,
191 interval: Millisecond,
192 time_index_column: String,
193 field_column: Option<String>,
194
195 input: Arc<dyn ExecutionPlan>,
196 metric: ExecutionPlanMetricsSet,
197}
198
199impl ExecutionPlan for InstantManipulateExec {
200 fn as_any(&self) -> &dyn Any {
201 self
202 }
203
204 fn schema(&self) -> SchemaRef {
205 self.input.schema()
206 }
207
208 fn properties(&self) -> &PlanProperties {
209 self.input.properties()
210 }
211
212 fn required_input_distribution(&self) -> Vec<Distribution> {
213 self.input.required_input_distribution()
214 }
215
216 fn maintains_input_order(&self) -> Vec<bool> {
218 vec![false; self.children().len()]
219 }
220
221 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
222 vec![&self.input]
223 }
224
225 fn with_new_children(
226 self: Arc<Self>,
227 children: Vec<Arc<dyn ExecutionPlan>>,
228 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
229 assert!(!children.is_empty());
230 Ok(Arc::new(Self {
231 start: self.start,
232 end: self.end,
233 lookback_delta: self.lookback_delta,
234 interval: self.interval,
235 time_index_column: self.time_index_column.clone(),
236 field_column: self.field_column.clone(),
237 input: children[0].clone(),
238 metric: self.metric.clone(),
239 }))
240 }
241
242 fn execute(
243 &self,
244 partition: usize,
245 context: Arc<TaskContext>,
246 ) -> DataFusionResult<SendableRecordBatchStream> {
247 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
248 let metrics_builder = MetricBuilder::new(&self.metric);
249 let num_series = Count::new();
250 metrics_builder
251 .with_partition(partition)
252 .build(MetricValue::Count {
253 name: METRIC_NUM_SERIES.into(),
254 count: num_series.clone(),
255 });
256
257 let input = self.input.execute(partition, context)?;
258 let schema = input.schema();
259 let time_index = schema
260 .column_with_name(&self.time_index_column)
261 .expect("time index column not found")
262 .0;
263 let field_index = self
264 .field_column
265 .as_ref()
266 .and_then(|name| schema.column_with_name(name))
267 .map(|x| x.0);
268 Ok(Box::pin(InstantManipulateStream {
269 start: self.start,
270 end: self.end,
271 lookback_delta: self.lookback_delta,
272 interval: self.interval,
273 time_index,
274 field_index,
275 schema,
276 input,
277 metric: baseline_metric,
278 num_series,
279 }))
280 }
281
282 fn metrics(&self) -> Option<MetricsSet> {
283 Some(self.metric.clone_inner())
284 }
285
286 fn statistics(&self) -> DataFusionResult<Statistics> {
287 let input_stats = self.input.statistics()?;
288
289 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
290 let estimated_total_bytes = input_stats
291 .total_byte_size
292 .get_value()
293 .zip(input_stats.num_rows.get_value())
294 .map(|(size, rows)| {
295 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
296 })
297 .unwrap_or(Precision::Absent);
298
299 Ok(Statistics {
300 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
301 total_byte_size: estimated_total_bytes,
302 column_statistics: vec![
304 ColumnStatistics::new_unknown();
305 self.schema().flattened_fields().len()
306 ],
307 })
308 }
309
310 fn name(&self) -> &str {
311 "InstantManipulateExec"
312 }
313}
314
315impl DisplayAs for InstantManipulateExec {
316 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
317 match t {
318 DisplayFormatType::Default | DisplayFormatType::Verbose => {
319 write!(
320 f,
321 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
322 self.start,self.end, self.lookback_delta, self.interval, self.time_index_column
323 )
324 }
325 }
326 }
327}
328
329pub struct InstantManipulateStream {
330 start: Millisecond,
331 end: Millisecond,
332 lookback_delta: Millisecond,
333 interval: Millisecond,
334 time_index: usize,
336 field_index: Option<usize>,
337
338 schema: SchemaRef,
339 input: SendableRecordBatchStream,
340 metric: BaselineMetrics,
341 num_series: Count,
343}
344
345impl RecordBatchStream for InstantManipulateStream {
346 fn schema(&self) -> SchemaRef {
347 self.schema.clone()
348 }
349}
350
351impl Stream for InstantManipulateStream {
352 type Item = DataFusionResult<RecordBatch>;
353
354 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
355 let poll = match ready!(self.input.poll_next_unpin(cx)) {
356 Some(Ok(batch)) => {
357 if batch.num_rows() == 0 {
358 return Poll::Pending;
359 }
360 let timer = std::time::Instant::now();
361 self.num_series.add(1);
362 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
363 self.metric.elapsed_compute().add_elapsed(timer);
364 Poll::Ready(Some(result))
365 }
366 None => {
367 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
368 Poll::Ready(None)
369 }
370 Some(Err(e)) => Poll::Ready(Some(Err(e))),
371 };
372 self.metric.record_poll(poll)
373 }
374}
375
376impl InstantManipulateStream {
377 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
380 let mut take_indices = vec![];
381
382 let ts_column = input
383 .column(self.time_index)
384 .as_any()
385 .downcast_ref::<TimestampMillisecondArray>()
386 .ok_or_else(|| {
387 DataFusionError::Execution(
388 "Time index Column downcast to TimestampMillisecondArray failed".into(),
389 )
390 })?;
391
392 let field_column = self
394 .field_index
395 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
396
397 let mut cursor = 0;
398
399 let aligned_ts_iter = (self.start..=self.end).step_by(self.interval as usize);
400 let mut aligned_ts = vec![];
401
402 'next: for expected_ts in aligned_ts_iter {
404 while cursor < ts_column.len() {
406 let curr = ts_column.value(cursor);
407 match curr.cmp(&expected_ts) {
408 Ordering::Equal => {
409 if let Some(field_column) = &field_column
410 && field_column.value(cursor).is_nan()
411 {
412 } else {
414 take_indices.push(cursor as u64);
415 aligned_ts.push(expected_ts);
416 }
417 continue 'next;
418 }
419 Ordering::Greater => break,
420 Ordering::Less => {}
421 }
422 cursor += 1;
423 }
424 if cursor == ts_column.len() {
425 cursor -= 1;
426 if ts_column.value(cursor) + self.lookback_delta < expected_ts {
428 break;
429 }
430 }
431
432 let curr_ts = ts_column.value(cursor);
434 if curr_ts + self.lookback_delta < expected_ts {
435 continue;
436 }
437 if curr_ts > expected_ts {
438 if let Some(prev_cursor) = cursor.checked_sub(1) {
440 let prev_ts = ts_column.value(prev_cursor);
441 if prev_ts + self.lookback_delta >= expected_ts {
442 if let Some(field_column) = &field_column
444 && field_column.value(prev_cursor).is_nan()
445 {
446 continue;
448 }
449 take_indices.push(prev_cursor as u64);
451 aligned_ts.push(expected_ts);
452 }
453 }
454 } else if let Some(field_column) = &field_column
455 && field_column.value(cursor).is_nan()
456 {
457 } else {
459 take_indices.push(cursor as u64);
461 aligned_ts.push(expected_ts);
462 }
463 }
464
465 self.take_record_batch_optional(input, take_indices, aligned_ts)
467 }
468
469 fn take_record_batch_optional(
471 &self,
472 record_batch: RecordBatch,
473 take_indices: Vec<u64>,
474 aligned_ts: Vec<Millisecond>,
475 ) -> DataFusionResult<RecordBatch> {
476 assert_eq!(take_indices.len(), aligned_ts.len());
477
478 let indices_array = UInt64Array::from(take_indices);
479 let mut arrays = record_batch
480 .columns()
481 .iter()
482 .map(|array| compute::take(array, &indices_array, None))
483 .collect::<ArrowResult<Vec<_>>>()?;
484 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
485
486 let result = RecordBatch::try_new(record_batch.schema(), arrays)
487 .map_err(|e| DataFusionError::ArrowError(e, None))?;
488 Ok(result)
489 }
490}
491
492#[cfg(test)]
493mod test {
494 use datafusion::prelude::SessionContext;
495
496 use super::*;
497 use crate::extension_plan::test_util::{
498 prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN,
499 };
500
501 async fn do_normalize_test(
502 start: Millisecond,
503 end: Millisecond,
504 lookback_delta: Millisecond,
505 interval: Millisecond,
506 expected: String,
507 contains_nan: bool,
508 ) {
509 let memory_exec = if contains_nan {
510 Arc::new(prepare_test_data_with_nan())
511 } else {
512 Arc::new(prepare_test_data())
513 };
514 let normalize_exec = Arc::new(InstantManipulateExec {
515 start,
516 end,
517 lookback_delta,
518 interval,
519 time_index_column: TIME_INDEX_COLUMN.to_string(),
520 field_column: Some("value".to_string()),
521 input: memory_exec,
522 metric: ExecutionPlanMetricsSet::new(),
523 });
524 let session_context = SessionContext::default();
525 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
526 .await
527 .unwrap();
528 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
529 .unwrap()
530 .to_string();
531
532 assert_eq!(result_literal, expected);
533 }
534
535 #[tokio::test]
536 async fn lookback_10s_interval_30s() {
537 let expected = String::from(
538 "+---------------------+-------+------+\
539 \n| timestamp | value | path |\
540 \n+---------------------+-------+------+\
541 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
542 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
543 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
544 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
545 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
546 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
547 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
548 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
549 \n+---------------------+-------+------+",
550 );
551 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
552 }
553
554 #[tokio::test]
555 async fn lookback_10s_interval_10s() {
556 let expected = String::from(
557 "+---------------------+-------+------+\
558 \n| timestamp | value | path |\
559 \n+---------------------+-------+------+\
560 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
561 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
562 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
563 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
564 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
565 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
566 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
567 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
568 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
569 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
570 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
571 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
572 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
573 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
574 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
575 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
576 \n+---------------------+-------+------+",
577 );
578 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
579 }
580
581 #[tokio::test]
582 async fn lookback_30s_interval_30s() {
583 let expected = String::from(
584 "+---------------------+-------+------+\
585 \n| timestamp | value | path |\
586 \n+---------------------+-------+------+\
587 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
588 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
589 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
590 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
591 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
592 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
593 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
594 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
595 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
596 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
597 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
598 \n+---------------------+-------+------+",
599 );
600 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
601 }
602
603 #[tokio::test]
604 async fn lookback_30s_interval_10s() {
605 let expected = String::from(
606 "+---------------------+-------+------+\
607 \n| timestamp | value | path |\
608 \n+---------------------+-------+------+\
609 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
610 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
611 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
612 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
613 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
614 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
615 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
616 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
617 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
618 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
619 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
620 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
621 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
622 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
623 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
624 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
625 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
626 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
627 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
628 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
629 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
630 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
631 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
632 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
633 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
634 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
635 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
636 \n+---------------------+-------+------+",
637 );
638 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
639 }
640
641 #[tokio::test]
642 async fn lookback_60s_interval_10s() {
643 let expected = String::from(
644 "+---------------------+-------+------+\
645 \n| timestamp | value | path |\
646 \n+---------------------+-------+------+\
647 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
648 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
649 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
650 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
651 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
652 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
653 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
654 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
655 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
656 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
657 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
658 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
659 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
660 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
661 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
662 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
663 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
664 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
665 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
666 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
667 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
668 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
669 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
670 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
671 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
672 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
673 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
674 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
675 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
676 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
677 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
678 \n+---------------------+-------+------+",
679 );
680 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
681 }
682
683 #[tokio::test]
684 async fn lookback_60s_interval_30s() {
685 let expected = String::from(
686 "+---------------------+-------+------+\
687 \n| timestamp | value | path |\
688 \n+---------------------+-------+------+\
689 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
690 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
691 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
692 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
693 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
694 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
695 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
696 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
697 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
698 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
699 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
700 \n+---------------------+-------+------+",
701 );
702 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
703 }
704
705 #[tokio::test]
706 async fn small_range_lookback_0s_interval_1s() {
707 let expected = String::from(
708 "+---------------------+-------+------+\
709 \n| timestamp | value | path |\
710 \n+---------------------+-------+------+\
711 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
712 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
713 \n+---------------------+-------+------+",
714 );
715 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
716 }
717
718 #[tokio::test]
719 async fn small_range_lookback_10s_interval_10s() {
720 let expected = String::from(
721 "+---------------------+-------+------+\
722 \n| timestamp | value | path |\
723 \n+---------------------+-------+------+\
724 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
725 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
726 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
727 \n+---------------------+-------+------+",
728 );
729 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
730 }
731
732 #[tokio::test]
733 async fn large_range_lookback_30s_interval_60s() {
734 let expected = String::from(
735 "+---------------------+-------+------+\
736 \n| timestamp | value | path |\
737 \n+---------------------+-------+------+\
738 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
739 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
740 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
741 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
742 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
743 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
744 \n+---------------------+-------+------+",
745 );
746 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
747 }
748
749 #[tokio::test]
750 async fn small_range_lookback_30s_interval_30s() {
751 let expected = String::from(
752 "+---------------------+-------+------+\
753 \n| timestamp | value | path |\
754 \n+---------------------+-------+------+\
755 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
756 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
757 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
758 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
759 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
760 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
761 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
762 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
763 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
764 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
765 \n+---------------------+-------+------+",
766 );
767 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
768 }
769
770 #[tokio::test]
771 async fn lookback_10s_interval_10s_with_nan() {
772 let expected = String::from(
773 "+---------------------+-------+\
774 \n| timestamp | value |\
775 \n+---------------------+-------+\
776 \n| 1970-01-01T00:00:00 | 0.0 |\
777 \n| 1970-01-01T00:00:10 | 0.0 |\
778 \n| 1970-01-01T00:01:00 | 6.0 |\
779 \n| 1970-01-01T00:01:10 | 6.0 |\
780 \n| 1970-01-01T00:02:00 | 12.0 |\
781 \n| 1970-01-01T00:02:10 | 12.0 |\
782 \n+---------------------+-------+",
783 );
784 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
785 }
786
787 #[tokio::test]
788 async fn lookback_10s_interval_10s_with_nan_unaligned() {
789 let expected = String::from(
790 "+-------------------------+-------+\
791 \n| timestamp | value |\
792 \n+-------------------------+-------+\
793 \n| 1970-01-01T00:00:00.001 | 0.0 |\
794 \n| 1970-01-01T00:01:00.001 | 6.0 |\
795 \n| 1970-01-01T00:02:00.001 | 12.0 |\
796 \n+-------------------------+-------+",
797 );
798 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
799 }
800
801 #[tokio::test]
802 async fn ultra_large_range() {
803 let expected = String::from(
804 "+-------------------------+-------+\
805 \n| timestamp | value |\
806 \n+-------------------------+-------+\
807 \n| 1970-01-01T00:00:00.001 | 0.0 |\
808 \n| 1970-01-01T00:01:00.001 | 6.0 |\
809 \n| 1970-01-01T00:02:00.001 | 12.0 |\
810 \n+-------------------------+-------+",
811 );
812 do_normalize_test(1, 900_000_000_000_000, 10_000, 10_000, expected, true).await;
813 }
814}