1use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, cast_with_options, take_arrays, CastOptions};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::{Result as DataFusionResult, Statistics};
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::SessionState;
31use datafusion::execution::TaskContext;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36 SendableRecordBatchStream,
37};
38use datafusion::physical_planner::create_physical_sort_expr;
39use datafusion_common::hash_utils::create_hashes;
40use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
41use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION};
42use datafusion_expr::{
43 lit, Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore,
44};
45use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
46use datafusion_physical_expr::{
47 create_physical_expr, Distribution, EquivalenceProperties, LexOrdering, Partitioning,
48 PhysicalExpr, PhysicalSortExpr,
49};
50use datatypes::arrow::array::{
51 Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
52};
53use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
54use datatypes::arrow::record_batch::RecordBatch;
55use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
56use futures::{ready, Stream};
57use futures_util::StreamExt;
58use snafu::ensure;
59
60use crate::error::{RangeQuerySnafu, Result};
61
62type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
63
64#[derive(PartialEq, Eq, Debug, Hash, Clone)]
65pub enum Fill {
66 Null,
67 Prev,
68 Linear,
69 Const(ScalarValue),
70}
71
72impl Display for Fill {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Fill::Null => write!(f, "NULL"),
76 Fill::Prev => write!(f, "PREV"),
77 Fill::Linear => write!(f, "LINEAR"),
78 Fill::Const(x) => write!(f, "{}", x),
79 }
80 }
81}
82
83impl Fill {
84 pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
85 let s = value.to_uppercase();
86 match s.as_str() {
87 "" => Ok(None),
88 "NULL" => Ok(Some(Self::Null)),
89 "PREV" => Ok(Some(Self::Prev)),
90 "LINEAR" => {
91 if datatype.is_numeric() {
92 Ok(Some(Self::Linear))
93 } else {
94 Err(DataFusionError::Plan(format!(
95 "Use FILL LINEAR on Non-numeric DataType {}",
96 datatype
97 )))
98 }
99 }
100 _ => ScalarValue::try_from_string(s.clone(), datatype)
101 .map_err(|err| {
102 DataFusionError::Plan(format!(
103 "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
104 s, err
105 ))
106 })
107 .map(|x| Some(Fill::Const(x))),
108 }
109 }
110
111 pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
114 if matches!(self, Fill::Null) {
116 return Ok(());
117 }
118 let len = data.len();
119 if *self == Fill::Linear {
120 return Self::fill_linear(ts, data);
121 }
122 for i in 0..len {
123 if data[i].is_null() {
124 match self {
125 Fill::Prev => {
126 if i != 0 {
127 data[i] = data[i - 1].clone()
128 }
129 }
130 Fill::Linear | Fill::Null => unreachable!(),
134 Fill::Const(v) => data[i] = v.clone(),
135 }
136 }
137 }
138 Ok(())
139 }
140
141 fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
142 let not_null_num = data
143 .iter()
144 .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
145 if not_null_num < 2 {
147 return Ok(());
148 }
149 let mut index = 0;
150 let mut head: Option<usize> = None;
151 let mut tail: Option<usize> = None;
152 while index < data.len() {
153 let start = data[index..]
156 .iter()
157 .position(ScalarValue::is_null)
158 .unwrap_or(data.len() - index)
159 + index;
160 if start == data.len() {
161 break;
162 }
163 let end = data[start..]
164 .iter()
165 .position(|r| !r.is_null())
166 .unwrap_or(data.len() - start)
167 + start;
168 index = end + 1;
169 if start == 0 {
171 head = Some(end);
172 } else if end == data.len() {
173 tail = Some(start);
174 } else {
175 linear_interpolation(ts, data, start - 1, end, start, end)?;
176 }
177 }
178 if let Some(end) = head {
180 linear_interpolation(ts, data, end, end + 1, 0, end)?;
181 }
182 if let Some(start) = tail {
184 linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
185 }
186 Ok(())
187 }
188}
189
190fn linear_interpolation(
192 ts: &[i64],
193 data: &mut [ScalarValue],
194 i1: usize,
195 i2: usize,
196 start: usize,
197 end: usize,
198) -> DfResult<()> {
199 let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
200 let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
201 (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
202 (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
203 (*y0 as f64, *y1 as f64, true)
204 }
205 _ => {
206 return Err(DataFusionError::Execution(
207 "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
208 ));
209 }
210 };
211 if x1 == x0 {
213 return Err(DataFusionError::Execution(
214 "RangePlan: Linear interpolation using the same coordinate points".to_string(),
215 ));
216 }
217 for i in start..end {
218 let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
219 data[i] = if is_float32 {
220 ScalarValue::Float32(Some(val as f32))
221 } else {
222 ScalarValue::Float64(Some(val))
223 }
224 }
225 Ok(())
226}
227
228#[derive(Eq, Clone, Debug)]
229pub struct RangeFn {
230 pub name: String,
232 pub data_type: DataType,
233 pub expr: Expr,
234 pub range: Duration,
235 pub fill: Option<Fill>,
236 pub need_cast: bool,
241}
242
243impl PartialEq for RangeFn {
244 fn eq(&self, other: &Self) -> bool {
245 self.name == other.name
246 }
247}
248
249impl PartialOrd for RangeFn {
250 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
251 Some(self.cmp(other))
252 }
253}
254
255impl Ord for RangeFn {
256 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
257 self.name.cmp(&other.name)
258 }
259}
260
261impl std::hash::Hash for RangeFn {
262 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
263 self.name.hash(state);
264 }
265}
266
267impl Display for RangeFn {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 write!(f, "{}", self.name)
270 }
271}
272
273#[derive(Debug, PartialEq, Eq, Hash)]
274pub struct RangeSelect {
275 pub input: Arc<LogicalPlan>,
277 pub range_expr: Vec<RangeFn>,
279 pub align: Duration,
280 pub align_to: i64,
281 pub time_index: String,
282 pub time_expr: Expr,
283 pub by: Vec<Expr>,
284 pub schema: DFSchemaRef,
285 pub by_schema: DFSchemaRef,
286 pub schema_project: Option<Vec<usize>>,
290 pub schema_before_project: DFSchemaRef,
294}
295
296impl PartialOrd for RangeSelect {
297 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
298 match self.input.partial_cmp(&other.input) {
300 Some(Ordering::Equal) => {}
301 ord => return ord,
302 }
303 match self.range_expr.partial_cmp(&other.range_expr) {
304 Some(Ordering::Equal) => {}
305 ord => return ord,
306 }
307 match self.align.partial_cmp(&other.align) {
308 Some(Ordering::Equal) => {}
309 ord => return ord,
310 }
311 match self.align_to.partial_cmp(&other.align_to) {
312 Some(Ordering::Equal) => {}
313 ord => return ord,
314 }
315 match self.time_index.partial_cmp(&other.time_index) {
316 Some(Ordering::Equal) => {}
317 ord => return ord,
318 }
319 match self.time_expr.partial_cmp(&other.time_expr) {
320 Some(Ordering::Equal) => {}
321 ord => return ord,
322 }
323 match self.by.partial_cmp(&other.by) {
324 Some(Ordering::Equal) => {}
325 ord => return ord,
326 }
327 self.schema_project.partial_cmp(&other.schema_project)
328 }
329}
330
331impl RangeSelect {
332 pub fn try_new(
333 input: Arc<LogicalPlan>,
334 range_expr: Vec<RangeFn>,
335 align: Duration,
336 align_to: i64,
337 time_index: Expr,
338 by: Vec<Expr>,
339 projection_expr: &[Expr],
340 ) -> Result<Self> {
341 ensure!(
342 align.as_millis() != 0,
343 RangeQuerySnafu {
344 msg: "Can't use 0 as align in Range Query"
345 }
346 );
347 for expr in &range_expr {
348 ensure!(
349 expr.range.as_millis() != 0,
350 RangeQuerySnafu {
351 msg: format!(
352 "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
353 expr.name
354 )
355 }
356 );
357 }
358 let mut fields = range_expr
359 .iter()
360 .map(
361 |RangeFn {
362 name,
363 data_type,
364 fill,
365 ..
366 }| {
367 let field = Field::new(
368 name,
369 data_type.clone(),
370 !matches!(fill, Some(Fill::Const(..))),
372 );
373 Ok((None, Arc::new(field)))
374 },
375 )
376 .collect::<DfResult<Vec<_>>>()?;
377 let ts_field = time_index.to_field(input.schema().as_ref())?;
379 let time_index_name = ts_field.1.name().clone();
380 fields.push(ts_field);
381 let by_fields = exprlist_to_fields(&by, &input)?;
383 fields.extend(by_fields.clone());
384 let schema_before_project = Arc::new(DFSchema::new_with_metadata(
385 fields,
386 input.schema().metadata().clone(),
387 )?);
388 let by_schema = Arc::new(DFSchema::new_with_metadata(
389 by_fields,
390 input.schema().metadata().clone(),
391 )?);
392 let schema_project = projection_expr
396 .iter()
397 .map(|project_expr| {
398 if let Expr::Column(column) = project_expr {
399 schema_before_project
400 .index_of_column_by_name(column.relation.as_ref(), &column.name)
401 .ok_or(())
402 } else {
403 let (qualifier, field) = project_expr
404 .to_field(input.schema().as_ref())
405 .map_err(|_| ())?;
406 schema_before_project
407 .index_of_column_by_name(qualifier.as_ref(), field.name())
408 .ok_or(())
409 }
410 })
411 .collect::<std::result::Result<Vec<usize>, ()>>()
412 .ok();
413 let schema = if let Some(project) = &schema_project {
414 let project_field = project
415 .iter()
416 .map(|i| {
417 let f = schema_before_project.qualified_field(*i);
418 (f.0.cloned(), Arc::new(f.1.clone()))
419 })
420 .collect();
421 Arc::new(DFSchema::new_with_metadata(
422 project_field,
423 input.schema().metadata().clone(),
424 )?)
425 } else {
426 schema_before_project.clone()
427 };
428 Ok(Self {
429 input,
430 range_expr,
431 align,
432 align_to,
433 time_index: time_index_name,
434 time_expr: time_index,
435 schema,
436 by_schema,
437 by,
438 schema_project,
439 schema_before_project,
440 })
441 }
442}
443
444impl UserDefinedLogicalNodeCore for RangeSelect {
445 fn name(&self) -> &str {
446 "RangeSelect"
447 }
448
449 fn inputs(&self) -> Vec<&LogicalPlan> {
450 vec![&self.input]
451 }
452
453 fn schema(&self) -> &DFSchemaRef {
454 &self.schema
455 }
456
457 fn expressions(&self) -> Vec<Expr> {
458 self.range_expr
459 .iter()
460 .map(|expr| expr.expr.clone())
461 .chain([self.time_expr.clone()])
462 .chain(self.by.clone())
463 .collect()
464 }
465
466 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
467 write!(
468 f,
469 "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
470 self.range_expr
471 .iter()
472 .map(ToString::to_string)
473 .collect::<Vec<_>>()
474 .join(", "),
475 self.align.as_millis(),
476 self.align_to,
477 self.by
478 .iter()
479 .map(ToString::to_string)
480 .collect::<Vec<_>>()
481 .join(", "),
482 self.time_index
483 )
484 }
485
486 fn with_exprs_and_inputs(
487 &self,
488 exprs: Vec<Expr>,
489 inputs: Vec<LogicalPlan>,
490 ) -> DataFusionResult<Self> {
491 if inputs.is_empty() {
492 return Err(DataFusionError::Plan(
493 "RangeSelect: inputs is empty".to_string(),
494 ));
495 }
496 if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
497 return Err(DataFusionError::Plan(
498 "RangeSelect: exprs length not match".to_string(),
499 ));
500 }
501
502 let range_expr = exprs
503 .iter()
504 .zip(self.range_expr.iter())
505 .map(|(e, range)| RangeFn {
506 name: range.name.clone(),
507 data_type: range.data_type.clone(),
508 expr: e.clone(),
509 range: range.range,
510 fill: range.fill.clone(),
511 need_cast: range.need_cast,
512 })
513 .collect();
514 let time_expr = exprs[self.range_expr.len()].clone();
515 let by = exprs[self.range_expr.len() + 1..].to_vec();
516 Ok(Self {
517 align: self.align,
518 align_to: self.align_to,
519 range_expr,
520 input: Arc::new(inputs[0].clone()),
521 time_index: self.time_index.clone(),
522 time_expr,
523 schema: self.schema.clone(),
524 by,
525 by_schema: self.by_schema.clone(),
526 schema_project: self.schema_project.clone(),
527 schema_before_project: self.schema_before_project.clone(),
528 })
529 }
530}
531
532impl RangeSelect {
533 fn create_physical_expr_list(
534 &self,
535 is_count_aggr: bool,
536 exprs: &[Expr],
537 df_schema: &Arc<DFSchema>,
538 session_state: &SessionState,
539 ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
540 exprs
541 .iter()
542 .map(|e| match e {
543 Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549 &lit(COUNT_STAR_EXPANSION),
550 df_schema.as_ref(),
551 session_state.execution_props(),
552 ),
553 _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554 })
555 .collect::<DfResult<Vec<_>>>()
556 }
557
558 pub fn to_execution_plan(
559 &self,
560 logical_input: &LogicalPlan,
561 exec_input: Arc<dyn ExecutionPlan>,
562 session_state: &SessionState,
563 ) -> DfResult<Arc<dyn ExecutionPlan>> {
564 let fields: Vec<_> = self
565 .schema_before_project
566 .fields()
567 .iter()
568 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569 .collect();
570 let by_fields: Vec<_> = self
571 .by_schema
572 .fields()
573 .iter()
574 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575 .collect();
576 let input_dfschema = logical_input.schema();
577 let input_schema = exec_input.schema();
578 let range_exec: Vec<RangeFnExec> = self
579 .range_expr
580 .iter()
581 .map(|range_fn| {
582 let name = range_fn.expr.schema_name().to_string();
583 let range_expr = match &range_fn.expr {
584 Expr::Alias(expr) => expr.expr.as_ref(),
585 others => others,
586 };
587
588 let expr = match &range_expr {
589 Expr::AggregateFunction(aggr)
590 if (aggr.func.name() == "last_value"
591 || aggr.func.name() == "first_value") =>
592 {
593 let order_by = if let Some(exprs) = &aggr.order_by {
594 exprs
595 .iter()
596 .map(|x| {
597 create_physical_sort_expr(
598 x,
599 input_dfschema.as_ref(),
600 session_state.execution_props(),
601 )
602 })
603 .collect::<DfResult<Vec<_>>>()?
604 } else {
605 let time_index = create_physical_expr(
607 &self.time_expr,
608 input_dfschema.as_ref(),
609 session_state.execution_props(),
610 )?;
611 vec![PhysicalSortExpr {
612 expr: time_index,
613 options: SortOptions {
614 descending: false,
615 nulls_first: false,
616 },
617 }]
618 };
619 let arg = self.create_physical_expr_list(
620 false,
621 &aggr.args,
622 input_dfschema,
623 session_state,
624 )?;
625 AggregateExprBuilder::new(aggr.func.clone(), arg)
629 .schema(input_schema.clone())
630 .order_by(LexOrdering::new(order_by))
631 .alias(name)
632 .build()
633 }
634 Expr::AggregateFunction(aggr) => {
635 let order_by = if let Some(exprs) = &aggr.order_by {
636 exprs
637 .iter()
638 .map(|x| {
639 create_physical_sort_expr(
640 x,
641 input_dfschema.as_ref(),
642 session_state.execution_props(),
643 )
644 })
645 .collect::<DfResult<Vec<_>>>()?
646 } else {
647 vec![]
648 };
649 let distinct = aggr.distinct;
650 let input_phy_exprs = self.create_physical_expr_list(
653 aggr.func.name() == "count",
654 &aggr.args,
655 input_dfschema,
656 session_state,
657 )?;
658 AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
659 .schema(input_schema.clone())
660 .order_by(LexOrdering::new(order_by))
661 .with_distinct(distinct)
662 .alias(name)
663 .build()
664 }
665 _ => Err(DataFusionError::Plan(format!(
666 "Unexpected Expr: {} in RangeSelect",
667 range_fn.expr
668 ))),
669 }?;
670 Ok(RangeFnExec {
671 expr: Arc::new(expr),
672 range: range_fn.range.as_millis() as Millisecond,
673 fill: range_fn.fill.clone(),
674 need_cast: if range_fn.need_cast {
675 Some(range_fn.data_type.clone())
676 } else {
677 None
678 },
679 })
680 })
681 .collect::<DfResult<Vec<_>>>()?;
682 let schema_before_project = Arc::new(Schema::new(fields));
683 let schema = if let Some(project) = &self.schema_project {
684 Arc::new(schema_before_project.project(project)?)
685 } else {
686 schema_before_project.clone()
687 };
688 let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
689 let cache = PlanProperties::new(
690 EquivalenceProperties::new(schema.clone()),
691 Partitioning::UnknownPartitioning(1),
692 EmissionType::Incremental,
693 Boundedness::Bounded,
694 );
695 Ok(Arc::new(RangeSelectExec {
696 input: exec_input,
697 range_exec,
698 align: self.align.as_millis() as Millisecond,
699 align_to: self.align_to,
700 by,
701 time_index: self.time_index.clone(),
702 schema,
703 by_schema: Arc::new(Schema::new(by_fields)),
704 metric: ExecutionPlanMetricsSet::new(),
705 schema_before_project,
706 schema_project: self.schema_project.clone(),
707 cache,
708 }))
709 }
710}
711
712#[derive(Debug, Clone)]
714struct RangeFnExec {
715 expr: Arc<AggregateFunctionExpr>,
716 range: Millisecond,
717 fill: Option<Fill>,
718 need_cast: Option<DataType>,
719}
720
721impl RangeFnExec {
722 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
726 let mut exprs = self.expr.expressions();
727 if let Some(ordering) = self.expr.order_bys() {
728 exprs.extend(ordering.iter().map(|sort| sort.expr.clone()));
729 }
730 exprs
731 }
732}
733
734impl Display for RangeFnExec {
735 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736 if let Some(fill) = &self.fill {
737 write!(
738 f,
739 "{} RANGE {}s FILL {}",
740 self.expr.name(),
741 self.range / 1000,
742 fill
743 )
744 } else {
745 write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746 }
747 }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752 input: Arc<dyn ExecutionPlan>,
753 range_exec: Vec<RangeFnExec>,
754 align: Millisecond,
755 align_to: i64,
756 time_index: String,
757 by: Vec<Arc<dyn PhysicalExpr>>,
758 schema: SchemaRef,
759 by_schema: SchemaRef,
760 metric: ExecutionPlanMetricsSet,
761 schema_project: Option<Vec<usize>>,
762 schema_before_project: SchemaRef,
763 cache: PlanProperties,
764}
765
766impl DisplayAs for RangeSelectExec {
767 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768 match t {
769 DisplayFormatType::Default | DisplayFormatType::Verbose => {
770 write!(f, "RangeSelectExec: ")?;
771 let range_expr_strs: Vec<String> =
772 self.range_exec.iter().map(RangeFnExec::to_string).collect();
773 let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
774 write!(
775 f,
776 "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
777 range_expr_strs.join(", "),
778 self.align,
779 self.align_to,
780 by.join(", "),
781 self.time_index,
782 )?;
783 }
784 }
785 Ok(())
786 }
787}
788
789impl ExecutionPlan for RangeSelectExec {
790 fn as_any(&self) -> &dyn std::any::Any {
791 self
792 }
793
794 fn schema(&self) -> SchemaRef {
795 self.schema.clone()
796 }
797
798 fn required_input_distribution(&self) -> Vec<Distribution> {
799 vec![Distribution::SinglePartition]
800 }
801
802 fn properties(&self) -> &PlanProperties {
803 &self.cache
804 }
805
806 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
807 vec![&self.input]
808 }
809
810 fn with_new_children(
811 self: Arc<Self>,
812 children: Vec<Arc<dyn ExecutionPlan>>,
813 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
814 assert!(!children.is_empty());
815 Ok(Arc::new(Self {
816 input: children[0].clone(),
817 range_exec: self.range_exec.clone(),
818 time_index: self.time_index.clone(),
819 by: self.by.clone(),
820 align: self.align,
821 align_to: self.align_to,
822 schema: self.schema.clone(),
823 by_schema: self.by_schema.clone(),
824 metric: self.metric.clone(),
825 schema_before_project: self.schema_before_project.clone(),
826 schema_project: self.schema_project.clone(),
827 cache: self.cache.clone(),
828 }))
829 }
830
831 fn execute(
832 &self,
833 partition: usize,
834 context: Arc<TaskContext>,
835 ) -> DfResult<DfSendableRecordBatchStream> {
836 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
837 let input = self.input.execute(partition, context)?;
838 let schema = input.schema();
839 let time_index = schema
840 .column_with_name(&self.time_index)
841 .ok_or(DataFusionError::Execution(
842 "time index column not found".into(),
843 ))?
844 .0;
845 let row_converter = RowConverter::new(
846 self.by_schema
847 .fields()
848 .iter()
849 .map(|f| SortField::new(f.data_type().clone()))
850 .collect(),
851 )?;
852 Ok(Box::pin(RangeSelectStream {
853 schema: self.schema.clone(),
854 range_exec: self.range_exec.clone(),
855 input,
856 random_state: RandomState::new(),
857 time_index,
858 align: self.align,
859 align_to: self.align_to,
860 by: self.by.clone(),
861 series_map: HashMap::new(),
862 exec_state: ExecutionState::ReadingInput,
863 num_not_null_rows: 0,
864 row_converter,
865 modify_map: HashMap::new(),
866 metric: baseline_metric,
867 schema_project: self.schema_project.clone(),
868 schema_before_project: self.schema_before_project.clone(),
869 }))
870 }
871
872 fn metrics(&self) -> Option<MetricsSet> {
873 Some(self.metric.clone_inner())
874 }
875
876 fn statistics(&self) -> DataFusionResult<Statistics> {
877 Ok(Statistics::new_unknown(self.schema.as_ref()))
878 }
879
880 fn name(&self) -> &str {
881 "RanegSelectExec"
882 }
883}
884
885struct RangeSelectStream {
886 schema: SchemaRef,
888 range_exec: Vec<RangeFnExec>,
889 input: SendableRecordBatchStream,
890 time_index: usize,
892 align: Millisecond,
894 align_to: i64,
895 by: Vec<Arc<dyn PhysicalExpr>>,
896 exec_state: ExecutionState,
897 row_converter: RowConverter,
899 random_state: RandomState,
900 series_map: HashMap<u64, SeriesState>,
903 modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
907 num_not_null_rows: usize,
909 metric: BaselineMetrics,
910 schema_project: Option<Vec<usize>>,
911 schema_before_project: SchemaRef,
912}
913
914#[derive(Debug)]
915struct SeriesState {
916 row: OwnedRow,
918 align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
921}
922
923fn produce_align_time(
929 align_to: i64,
930 range: Millisecond,
931 align: Millisecond,
932 ts_column: &TimestampMillisecondArray,
933 by_columns_hash: &[u64],
934 modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
935) {
936 modify_map.clear();
937 for (row, hash) in by_columns_hash.iter().enumerate() {
939 let ts = ts_column.value(row);
940 let ith_slot = (ts - align_to).div_floor(align);
941 let mut align_ts = ith_slot * align + align_to;
942 while align_ts <= ts && ts < align_ts + range {
943 modify_map
944 .entry((*hash, align_ts))
945 .or_default()
946 .push(row as u32);
947 align_ts -= align;
948 }
949 }
950}
951
952fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
953 let array = ScalarValue::iter_to_array(values.to_vec())?;
954 let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
955 for (i, value) in values.iter_mut().enumerate() {
956 *value = ScalarValue::try_from_array(&cast_array, i)?;
957 }
958 Ok(())
959}
960
961impl RangeSelectStream {
962 fn evaluate_many(
963 &self,
964 batch: &RecordBatch,
965 exprs: &[Arc<dyn PhysicalExpr>],
966 ) -> DfResult<Vec<ArrayRef>> {
967 exprs
968 .iter()
969 .map(|expr| {
970 let value = expr.evaluate(batch)?;
971 value.into_array(batch.num_rows())
972 })
973 .collect::<DfResult<Vec<_>>>()
974 }
975
976 fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
977 let _timer = self.metric.elapsed_compute().timer();
978 let num_rows = batch.num_rows();
979 let by_arrays = self.evaluate_many(&batch, &self.by)?;
980 let mut hashes = vec![0; num_rows];
981 create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
982 let by_rows = self.row_converter.convert_columns(&by_arrays)?;
983 let mut ts_column = batch.column(self.time_index).clone();
984 if !matches!(
985 ts_column.data_type(),
986 DataType::Timestamp(TimeUnit::Millisecond, _)
987 ) {
988 ts_column = compute::cast(
989 ts_column.as_ref(),
990 &DataType::Timestamp(TimeUnit::Millisecond, None),
991 )?;
992 }
993 let ts_column_ref = ts_column
994 .as_any()
995 .downcast_ref::<TimestampMillisecondArray>()
996 .ok_or_else(|| {
997 DataFusionError::Execution(
998 "Time index Column downcast to TimestampMillisecondArray failed".into(),
999 )
1000 })?;
1001 for i in 0..self.range_exec.len() {
1002 let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1003 produce_align_time(
1005 self.align_to,
1006 self.range_exec[i].range,
1007 self.align,
1008 ts_column_ref,
1009 &hashes,
1010 &mut self.modify_map,
1011 );
1012 let mut modify_rows = UInt32Builder::with_capacity(0);
1014 let mut modify_index = Vec::with_capacity(self.modify_map.len());
1018 let mut offsets = vec![0];
1019 let mut offset_so_far = 0;
1020 for ((hash, ts), modify) in &self.modify_map {
1021 modify_rows.append_slice(modify);
1022 offset_so_far += modify.len();
1023 offsets.push(offset_so_far);
1024 modify_index.push((*hash, *ts, modify[0]));
1025 }
1026 let modify_rows = modify_rows.finish();
1027 let args = take_arrays(&args, &modify_rows, None)?;
1028 modify_index.iter().zip(offsets.windows(2)).try_for_each(
1029 |((hash, ts, row), offset)| {
1030 let (offset, length) = (offset[0], offset[1] - offset[0]);
1031 let sliced_arrays: Vec<ArrayRef> = args
1032 .iter()
1033 .map(|array| array.slice(offset, length))
1034 .collect();
1035 let accumulators_map =
1036 self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1037 row: by_rows.row(*row as usize).owned(),
1038 align_ts_accumulator: BTreeMap::new(),
1039 });
1040 match accumulators_map.align_ts_accumulator.entry(*ts) {
1041 Entry::Occupied(mut e) => {
1042 let accumulators = e.get_mut();
1043 accumulators[i].update_batch(&sliced_arrays)
1044 }
1045 Entry::Vacant(e) => {
1046 self.num_not_null_rows += 1;
1047 let mut accumulators = self
1048 .range_exec
1049 .iter()
1050 .map(|range| range.expr.create_accumulator())
1051 .collect::<DfResult<Vec<_>>>()?;
1052 let result = accumulators[i].update_batch(&sliced_arrays);
1053 e.insert(accumulators);
1054 result
1055 }
1056 }
1057 },
1058 )?;
1059 }
1060 Ok(())
1061 }
1062
1063 fn generate_output(&mut self) -> DfResult<RecordBatch> {
1064 let _timer = self.metric.elapsed_compute().timer();
1065 if self.series_map.is_empty() {
1066 return Ok(RecordBatch::new_empty(self.schema.clone()));
1067 }
1068 let mut columns: Vec<Arc<dyn Array>> =
1070 Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1071 let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1072 let mut all_scalar =
1073 vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1074 let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1075 let mut start_index = 0;
1076 let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1078 let padding_values = self
1080 .range_exec
1081 .iter()
1082 .map(|e| e.expr.create_accumulator()?.evaluate())
1083 .collect::<DfResult<Vec<_>>>()?;
1084 for SeriesState {
1085 row,
1086 align_ts_accumulator,
1087 } in self.series_map.values_mut()
1088 {
1089 if align_ts_accumulator.is_empty() {
1091 continue;
1092 }
1093 let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1095 let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1096 let align_ts = if need_fill_output {
1097 (begin_ts..=end_ts).step_by(self.align as usize).collect()
1099 } else {
1100 align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1101 };
1102 for ts in &align_ts {
1103 if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1104 for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1105 column.push(acc.evaluate()?);
1106 }
1107 } else {
1108 for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1110 column.push(padding.clone())
1111 }
1112 }
1113 }
1114 ts_builder.append_slice(&align_ts);
1115 for (
1117 i,
1118 RangeFnExec {
1119 fill, need_cast, ..
1120 },
1121 ) in self.range_exec.iter().enumerate()
1122 {
1123 let time_series_data =
1124 &mut all_scalar[i][start_index..start_index + align_ts.len()];
1125 if let Some(data_type) = need_cast {
1126 cast_scalar_values(time_series_data, data_type)?;
1127 }
1128 if let Some(fill) = fill {
1129 fill.apply_fill_strategy(&align_ts, time_series_data)?;
1130 }
1131 }
1132 by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1133 start_index += align_ts.len();
1134 }
1135 for column_scalar in all_scalar {
1136 columns.push(ScalarValue::iter_to_array(column_scalar)?);
1137 }
1138 let ts_column = ts_builder.finish();
1139 let ts_column = compute::cast(
1141 &ts_column,
1142 self.schema_before_project.field(columns.len()).data_type(),
1143 )?;
1144 columns.push(ts_column);
1145 columns.extend(self.row_converter.convert_rows(by_rows)?);
1146 let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1147 let project_output = if let Some(project) = &self.schema_project {
1148 output.project(project)?
1149 } else {
1150 output
1151 };
1152 Ok(project_output)
1153 }
1154}
1155
1156enum ExecutionState {
1157 ReadingInput,
1158 ProducingOutput,
1159 Done,
1160}
1161
1162impl RecordBatchStream for RangeSelectStream {
1163 fn schema(&self) -> SchemaRef {
1164 self.schema.clone()
1165 }
1166}
1167
1168impl Stream for RangeSelectStream {
1169 type Item = DataFusionResult<RecordBatch>;
1170
1171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1172 loop {
1173 match self.exec_state {
1174 ExecutionState::ReadingInput => {
1175 match ready!(self.input.poll_next_unpin(cx)) {
1176 Some(Ok(batch)) => {
1178 if let Err(e) = self.update_range_context(batch) {
1179 common_telemetry::debug!(
1180 "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}", self.schema, e
1181 );
1182 return Poll::Ready(Some(Err(e)));
1183 }
1184 }
1185 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1187 None => {
1189 self.exec_state = ExecutionState::ProducingOutput;
1190 }
1191 }
1192 }
1193 ExecutionState::ProducingOutput => {
1194 let result = self.generate_output();
1195 return match result {
1196 Ok(batch) => {
1198 self.exec_state = ExecutionState::Done;
1199 Poll::Ready(Some(Ok(batch)))
1200 }
1201 Err(error) => Poll::Ready(Some(Err(error))),
1203 };
1204 }
1205 ExecutionState::Done => return Poll::Ready(None),
1206 }
1207 }
1208 }
1209}
1210
1211#[cfg(test)]
1212mod test {
1213 macro_rules! nullable_array {
1214 ($builder:ident,) => {
1215 };
1216 ($array_type:ident ; $($tail:tt)*) => {
1217 paste::item! {
1218 {
1219 let mut builder = arrow::array::[<$array_type Builder>]::new();
1220 nullable_array!(builder, $($tail)*);
1221 builder.finish()
1222 }
1223 }
1224 };
1225 ($builder:ident, null) => {
1226 $builder.append_null();
1227 };
1228 ($builder:ident, null, $($tail:tt)*) => {
1229 $builder.append_null();
1230 nullable_array!($builder, $($tail)*);
1231 };
1232 ($builder:ident, $value:literal) => {
1233 $builder.append_value($value);
1234 };
1235 ($builder:ident, $value:literal, $($tail:tt)*) => {
1236 $builder.append_value($value);
1237 nullable_array!($builder, $($tail)*);
1238 };
1239 }
1240
1241 use std::sync::Arc;
1242
1243 use arrow_schema::SortOptions;
1244 use datafusion::arrow::datatypes::{
1245 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1246 };
1247 use datafusion::functions_aggregate::min_max;
1248 use datafusion::physical_plan::memory::MemoryExec;
1249 use datafusion::physical_plan::sorts::sort::SortExec;
1250 use datafusion::prelude::SessionContext;
1251 use datafusion_physical_expr::expressions::Column;
1252 use datafusion_physical_expr::PhysicalSortExpr;
1253 use datatypes::arrow::array::TimestampMillisecondArray;
1254 use datatypes::arrow_array::StringArray;
1255
1256 use super::*;
1257
1258 const TIME_INDEX_COLUMN: &str = "timestamp";
1259
1260 fn prepare_test_data(is_float: bool, is_gap: bool) -> MemoryExec {
1261 let schema = Arc::new(Schema::new(vec![
1262 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1263 Field::new(
1264 "value",
1265 if is_float {
1266 DataType::Float64
1267 } else {
1268 DataType::Int64
1269 },
1270 true,
1271 ),
1272 Field::new("host", DataType::Utf8, true),
1273 ]));
1274 let timestamp_column: Arc<dyn Array> = if !is_gap {
1275 Arc::new(TimestampMillisecondArray::from(vec![
1276 0, 5_000, 10_000, 15_000, 20_000, 0, 5_000, 10_000, 15_000, 20_000, ])) as _
1279 } else {
1280 Arc::new(TimestampMillisecondArray::from(vec![
1281 0, 15_000, 0, 15_000, ])) as _
1284 };
1285 let mut host = vec!["host1"; timestamp_column.len() / 2];
1286 host.extend(vec!["host2"; timestamp_column.len() / 2]);
1287 let mut value_column: Arc<dyn Array> = if is_gap {
1288 Arc::new(nullable_array!(Int64;
1289 0, 6, 6, 12 )) as _
1292 } else {
1293 Arc::new(nullable_array!(Int64;
1294 0, null, 1, null, 2, 3, null, 4, null, 5 )) as _
1297 };
1298 if is_float {
1299 value_column =
1300 cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1301 .unwrap();
1302 }
1303 let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1304 let data = RecordBatch::try_new(
1305 schema.clone(),
1306 vec![timestamp_column, value_column, host_column],
1307 )
1308 .unwrap();
1309
1310 MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
1311 }
1312
1313 async fn do_range_select_test(
1314 range1: Millisecond,
1315 range2: Millisecond,
1316 align: Millisecond,
1317 fill: Option<Fill>,
1318 is_float: bool,
1319 is_gap: bool,
1320 expected: String,
1321 ) {
1322 let data_type = if is_float {
1323 DataType::Float64
1324 } else {
1325 DataType::Int64
1326 };
1327 let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1328 (Some(DataType::Float64), DataType::Float64)
1330 } else {
1331 (None, data_type.clone())
1332 };
1333 let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1334 let schema = Arc::new(Schema::new(vec![
1335 Field::new("MIN(value)", schema_data_type.clone(), true),
1336 Field::new("MAX(value)", schema_data_type, true),
1337 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1338 Field::new("host", DataType::Utf8, true),
1339 ]));
1340 let cache = PlanProperties::new(
1341 EquivalenceProperties::new(schema.clone()),
1342 Partitioning::UnknownPartitioning(1),
1343 EmissionType::Incremental,
1344 Boundedness::Bounded,
1345 );
1346 let input_schema = memory_exec.schema().clone();
1347 let range_select_exec = Arc::new(RangeSelectExec {
1348 input: memory_exec,
1349 range_exec: vec![
1350 RangeFnExec {
1351 expr: Arc::new(
1352 AggregateExprBuilder::new(
1353 min_max::min_udaf(),
1354 vec![Arc::new(Column::new("value", 1))],
1355 )
1356 .schema(input_schema.clone())
1357 .alias("MIN(value)")
1358 .build()
1359 .unwrap(),
1360 ),
1361 range: range1,
1362 fill: fill.clone(),
1363 need_cast: need_cast.clone(),
1364 },
1365 RangeFnExec {
1366 expr: Arc::new(
1367 AggregateExprBuilder::new(
1368 min_max::max_udaf(),
1369 vec![Arc::new(Column::new("value", 1))],
1370 )
1371 .schema(input_schema.clone())
1372 .alias("MAX(value)")
1373 .build()
1374 .unwrap(),
1375 ),
1376 range: range2,
1377 fill,
1378 need_cast,
1379 },
1380 ],
1381 align,
1382 align_to: 0,
1383 by: vec![Arc::new(Column::new("host", 2))],
1384 time_index: TIME_INDEX_COLUMN.to_string(),
1385 schema: schema.clone(),
1386 schema_before_project: schema.clone(),
1387 schema_project: None,
1388 by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1389 metric: ExecutionPlanMetricsSet::new(),
1390 cache,
1391 });
1392 let sort_exec = SortExec::new(
1393 LexOrdering::new(vec![
1394 PhysicalSortExpr {
1395 expr: Arc::new(Column::new("host", 3)),
1396 options: SortOptions {
1397 descending: false,
1398 nulls_first: true,
1399 },
1400 },
1401 PhysicalSortExpr {
1402 expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1403 options: SortOptions {
1404 descending: false,
1405 nulls_first: true,
1406 },
1407 },
1408 ]),
1409 range_select_exec,
1410 );
1411 let session_context = SessionContext::default();
1412 let result =
1413 datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1414 .await
1415 .unwrap();
1416
1417 let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1418 .unwrap()
1419 .to_string();
1420
1421 assert_eq!(result_literal, expected);
1422 }
1423
1424 #[tokio::test]
1425 async fn range_10s_align_1000s() {
1426 let expected = String::from(
1427 "+------------+------------+---------------------+-------+\
1428 \n| MIN(value) | MAX(value) | timestamp | host |\
1429 \n+------------+------------+---------------------+-------+\
1430 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1431 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1432 \n+------------+------------+---------------------+-------+",
1433 );
1434 do_range_select_test(
1435 10_000,
1436 10_000,
1437 1_000_000,
1438 Some(Fill::Null),
1439 true,
1440 false,
1441 expected,
1442 )
1443 .await;
1444 }
1445
1446 #[tokio::test]
1447 async fn range_fill_null() {
1448 let expected = String::from(
1449 "+------------+------------+---------------------+-------+\
1450 \n| MIN(value) | MAX(value) | timestamp | host |\
1451 \n+------------+------------+---------------------+-------+\
1452 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1453 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1454 \n| 1.0 | | 1970-01-01T00:00:05 | host1 |\
1455 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1456 \n| 2.0 | | 1970-01-01T00:00:15 | host1 |\
1457 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1458 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1459 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1460 \n| 4.0 | | 1970-01-01T00:00:05 | host2 |\
1461 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1462 \n| 5.0 | | 1970-01-01T00:00:15 | host2 |\
1463 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1464 \n+------------+------------+---------------------+-------+",
1465 );
1466 do_range_select_test(
1467 10_000,
1468 5_000,
1469 5_000,
1470 Some(Fill::Null),
1471 true,
1472 false,
1473 expected,
1474 )
1475 .await;
1476 }
1477
1478 #[tokio::test]
1479 async fn range_fill_prev() {
1480 let expected = String::from(
1481 "+------------+------------+---------------------+-------+\
1482 \n| MIN(value) | MAX(value) | timestamp | host |\
1483 \n+------------+------------+---------------------+-------+\
1484 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1485 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1486 \n| 1.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1487 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1488 \n| 2.0 | 1.0 | 1970-01-01T00:00:15 | host1 |\
1489 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1490 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1491 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1492 \n| 4.0 | 3.0 | 1970-01-01T00:00:05 | host2 |\
1493 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1494 \n| 5.0 | 4.0 | 1970-01-01T00:00:15 | host2 |\
1495 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1496 \n+------------+------------+---------------------+-------+",
1497 );
1498 do_range_select_test(
1499 10_000,
1500 5_000,
1501 5_000,
1502 Some(Fill::Prev),
1503 true,
1504 false,
1505 expected,
1506 )
1507 .await;
1508 }
1509
1510 #[tokio::test]
1511 async fn range_fill_linear() {
1512 let expected = String::from(
1513 "+------------+------------+---------------------+-------+\
1514 \n| MIN(value) | MAX(value) | timestamp | host |\
1515 \n+------------+------------+---------------------+-------+\
1516 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1517 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1518 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1519 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1520 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1521 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1522 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1523 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1524 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1525 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1526 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1527 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1528 \n+------------+------------+---------------------+-------+",
1529 );
1530 do_range_select_test(
1531 10_000,
1532 5_000,
1533 5_000,
1534 Some(Fill::Linear),
1535 true,
1536 false,
1537 expected,
1538 )
1539 .await;
1540 }
1541
1542 #[tokio::test]
1543 async fn range_fill_integer_linear() {
1544 let expected = String::from(
1545 "+------------+------------+---------------------+-------+\
1546 \n| MIN(value) | MAX(value) | timestamp | host |\
1547 \n+------------+------------+---------------------+-------+\
1548 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1549 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1550 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1551 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1552 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1553 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1554 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1555 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1556 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1557 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1558 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1559 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1560 \n+------------+------------+---------------------+-------+",
1561 );
1562 do_range_select_test(
1563 10_000,
1564 5_000,
1565 5_000,
1566 Some(Fill::Linear),
1567 false,
1568 false,
1569 expected,
1570 )
1571 .await;
1572 }
1573
1574 #[tokio::test]
1575 async fn range_fill_const() {
1576 let expected = String::from(
1577 "+------------+------------+---------------------+-------+\
1578 \n| MIN(value) | MAX(value) | timestamp | host |\
1579 \n+------------+------------+---------------------+-------+\
1580 \n| 0.0 | 6.6 | 1969-12-31T23:59:55 | host1 |\
1581 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1582 \n| 1.0 | 6.6 | 1970-01-01T00:00:05 | host1 |\
1583 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1584 \n| 2.0 | 6.6 | 1970-01-01T00:00:15 | host1 |\
1585 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1586 \n| 3.0 | 6.6 | 1969-12-31T23:59:55 | host2 |\
1587 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1588 \n| 4.0 | 6.6 | 1970-01-01T00:00:05 | host2 |\
1589 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1590 \n| 5.0 | 6.6 | 1970-01-01T00:00:15 | host2 |\
1591 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1592 \n+------------+------------+---------------------+-------+",
1593 );
1594 do_range_select_test(
1595 10_000,
1596 5_000,
1597 5_000,
1598 Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1599 true,
1600 false,
1601 expected,
1602 )
1603 .await;
1604 }
1605
1606 #[tokio::test]
1607 async fn range_fill_gap() {
1608 let expected = String::from(
1609 "+------------+------------+---------------------+-------+\
1610 \n| MIN(value) | MAX(value) | timestamp | host |\
1611 \n+------------+------------+---------------------+-------+\
1612 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1613 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1614 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1615 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1616 \n+------------+------------+---------------------+-------+",
1617 );
1618 do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1619 let expected = String::from(
1620 "+------------+------------+---------------------+-------+\
1621 \n| MIN(value) | MAX(value) | timestamp | host |\
1622 \n+------------+------------+---------------------+-------+\
1623 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1624 \n| | | 1970-01-01T00:00:05 | host1 |\
1625 \n| | | 1970-01-01T00:00:10 | host1 |\
1626 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1627 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1628 \n| | | 1970-01-01T00:00:05 | host2 |\
1629 \n| | | 1970-01-01T00:00:10 | host2 |\
1630 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1631 \n+------------+------------+---------------------+-------+",
1632 );
1633 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1634 let expected = String::from(
1635 "+------------+------------+---------------------+-------+\
1636 \n| MIN(value) | MAX(value) | timestamp | host |\
1637 \n+------------+------------+---------------------+-------+\
1638 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1639 \n| 0.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1640 \n| 0.0 | 0.0 | 1970-01-01T00:00:10 | host1 |\
1641 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1642 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1643 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1644 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1645 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1646 \n+------------+------------+---------------------+-------+",
1647 );
1648 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1649 let expected = String::from(
1650 "+------------+------------+---------------------+-------+\
1651 \n| MIN(value) | MAX(value) | timestamp | host |\
1652 \n+------------+------------+---------------------+-------+\
1653 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1654 \n| 2.0 | 2.0 | 1970-01-01T00:00:05 | host1 |\
1655 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host1 |\
1656 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1657 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1658 \n| 8.0 | 8.0 | 1970-01-01T00:00:05 | host2 |\
1659 \n| 10.0 | 10.0 | 1970-01-01T00:00:10 | host2 |\
1660 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1661 \n+------------+------------+---------------------+-------+",
1662 );
1663 do_range_select_test(
1664 5_000,
1665 5_000,
1666 5_000,
1667 Some(Fill::Linear),
1668 true,
1669 true,
1670 expected,
1671 )
1672 .await;
1673 let expected = String::from(
1674 "+------------+------------+---------------------+-------+\
1675 \n| MIN(value) | MAX(value) | timestamp | host |\
1676 \n+------------+------------+---------------------+-------+\
1677 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1678 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host1 |\
1679 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host1 |\
1680 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1681 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1682 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1683 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1684 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1685 \n+------------+------------+---------------------+-------+",
1686 );
1687 do_range_select_test(
1688 5_000,
1689 5_000,
1690 5_000,
1691 Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1692 true,
1693 true,
1694 expected,
1695 )
1696 .await;
1697 }
1698
1699 #[test]
1700 fn fill_test() {
1701 assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1702 assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1703 assert_eq!(
1704 Fill::try_from_str("Linear", &DataType::Boolean)
1705 .unwrap_err()
1706 .to_string(),
1707 "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1708 );
1709 assert_eq!(
1710 Fill::try_from_str("WHAT", &DataType::UInt8)
1711 .unwrap_err()
1712 .to_string(),
1713 "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1714 );
1715 assert_eq!(
1716 Fill::try_from_str("8.0", &DataType::UInt8)
1717 .unwrap_err()
1718 .to_string(),
1719 "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1720 );
1721 assert!(
1722 Fill::try_from_str("8", &DataType::UInt8).unwrap()
1723 == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1724 );
1725 let mut test1 = vec![
1726 ScalarValue::UInt8(Some(8)),
1727 ScalarValue::UInt8(None),
1728 ScalarValue::UInt8(Some(9)),
1729 ];
1730 Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1731 assert_eq!(test1[1], ScalarValue::UInt8(None));
1732 Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1733 assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1734 test1[1] = ScalarValue::UInt8(None);
1735 Fill::Const(ScalarValue::UInt8(Some(10)))
1736 .apply_fill_strategy(&[], &mut test1)
1737 .unwrap();
1738 assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1739 }
1740
1741 #[test]
1742 fn test_fill_linear() {
1743 let ts = vec![1, 2, 3, 4, 5];
1744 let mut test = vec![
1745 ScalarValue::Float32(Some(1.0)),
1746 ScalarValue::Float32(None),
1747 ScalarValue::Float32(Some(3.0)),
1748 ScalarValue::Float32(None),
1749 ScalarValue::Float32(Some(5.0)),
1750 ];
1751 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1752 let mut test1 = vec![
1753 ScalarValue::Float32(None),
1754 ScalarValue::Float32(Some(2.0)),
1755 ScalarValue::Float32(None),
1756 ScalarValue::Float32(Some(4.0)),
1757 ScalarValue::Float32(None),
1758 ];
1759 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1760 assert_eq!(test, test1);
1761 let ts = vec![
1763 1, 3, 8, 30, 88, 108, 128, ];
1771 let mut test = vec![
1772 ScalarValue::Float64(None),
1773 ScalarValue::Float64(Some(1.0)),
1774 ScalarValue::Float64(Some(11.0)),
1775 ScalarValue::Float64(None),
1776 ScalarValue::Float64(Some(10.0)),
1777 ScalarValue::Float64(Some(5.0)),
1778 ScalarValue::Float64(None),
1779 ];
1780 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1781 let data: Vec<_> = test
1782 .into_iter()
1783 .map(|x| {
1784 let ScalarValue::Float64(Some(f)) = x else {
1785 unreachable!()
1786 };
1787 f
1788 })
1789 .collect();
1790 assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1791 let ts = vec![1];
1793 let test = vec![ScalarValue::Float32(None)];
1794 let mut test1 = test.clone();
1795 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1796 assert_eq!(test, test1);
1797 }
1798}