1use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, cast_with_options, take_arrays, CastOptions};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::Result as DataFusionResult;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::SessionState;
31use datafusion::execution::TaskContext;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36 SendableRecordBatchStream,
37};
38use datafusion_common::hash_utils::create_hashes;
39use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
40use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION};
41use datafusion_expr::{
42 lit, Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore,
43};
44use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
45use datafusion_physical_expr::{
46 create_physical_expr, create_physical_sort_expr, Distribution, EquivalenceProperties,
47 Partitioning, PhysicalExpr, PhysicalSortExpr,
48};
49use datatypes::arrow::array::{
50 Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
51};
52use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
53use datatypes::arrow::record_batch::RecordBatch;
54use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
55use futures::{ready, Stream};
56use futures_util::StreamExt;
57use snafu::ensure;
58
59use crate::error::{RangeQuerySnafu, Result};
60
61type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
62
63#[derive(PartialEq, Eq, Debug, Hash, Clone)]
64pub enum Fill {
65 Null,
66 Prev,
67 Linear,
68 Const(ScalarValue),
69}
70
71impl Display for Fill {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 Fill::Null => write!(f, "NULL"),
75 Fill::Prev => write!(f, "PREV"),
76 Fill::Linear => write!(f, "LINEAR"),
77 Fill::Const(x) => write!(f, "{}", x),
78 }
79 }
80}
81
82impl Fill {
83 pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
84 let s = value.to_uppercase();
85 match s.as_str() {
86 "" => Ok(None),
87 "NULL" => Ok(Some(Self::Null)),
88 "PREV" => Ok(Some(Self::Prev)),
89 "LINEAR" => {
90 if datatype.is_numeric() {
91 Ok(Some(Self::Linear))
92 } else {
93 Err(DataFusionError::Plan(format!(
94 "Use FILL LINEAR on Non-numeric DataType {}",
95 datatype
96 )))
97 }
98 }
99 _ => ScalarValue::try_from_string(s.clone(), datatype)
100 .map_err(|err| {
101 DataFusionError::Plan(format!(
102 "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
103 s, err
104 ))
105 })
106 .map(|x| Some(Fill::Const(x))),
107 }
108 }
109
110 pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
113 if matches!(self, Fill::Null) {
115 return Ok(());
116 }
117 let len = data.len();
118 if *self == Fill::Linear {
119 return Self::fill_linear(ts, data);
120 }
121 for i in 0..len {
122 if data[i].is_null() {
123 match self {
124 Fill::Prev => {
125 if i != 0 {
126 data[i] = data[i - 1].clone()
127 }
128 }
129 Fill::Linear | Fill::Null => unreachable!(),
133 Fill::Const(v) => data[i] = v.clone(),
134 }
135 }
136 }
137 Ok(())
138 }
139
140 fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
141 let not_null_num = data
142 .iter()
143 .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
144 if not_null_num < 2 {
146 return Ok(());
147 }
148 let mut index = 0;
149 let mut head: Option<usize> = None;
150 let mut tail: Option<usize> = None;
151 while index < data.len() {
152 let start = data[index..]
155 .iter()
156 .position(ScalarValue::is_null)
157 .unwrap_or(data.len() - index)
158 + index;
159 if start == data.len() {
160 break;
161 }
162 let end = data[start..]
163 .iter()
164 .position(|r| !r.is_null())
165 .unwrap_or(data.len() - start)
166 + start;
167 index = end + 1;
168 if start == 0 {
170 head = Some(end);
171 } else if end == data.len() {
172 tail = Some(start);
173 } else {
174 linear_interpolation(ts, data, start - 1, end, start, end)?;
175 }
176 }
177 if let Some(end) = head {
179 linear_interpolation(ts, data, end, end + 1, 0, end)?;
180 }
181 if let Some(start) = tail {
183 linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
184 }
185 Ok(())
186 }
187}
188
189fn linear_interpolation(
191 ts: &[i64],
192 data: &mut [ScalarValue],
193 i1: usize,
194 i2: usize,
195 start: usize,
196 end: usize,
197) -> DfResult<()> {
198 let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
199 let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
200 (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
201 (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
202 (*y0 as f64, *y1 as f64, true)
203 }
204 _ => {
205 return Err(DataFusionError::Execution(
206 "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
207 ));
208 }
209 };
210 if x1 == x0 {
212 return Err(DataFusionError::Execution(
213 "RangePlan: Linear interpolation using the same coordinate points".to_string(),
214 ));
215 }
216 for i in start..end {
217 let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
218 data[i] = if is_float32 {
219 ScalarValue::Float32(Some(val as f32))
220 } else {
221 ScalarValue::Float64(Some(val))
222 }
223 }
224 Ok(())
225}
226
227#[derive(Eq, Clone, Debug)]
228pub struct RangeFn {
229 pub name: String,
231 pub data_type: DataType,
232 pub expr: Expr,
233 pub range: Duration,
234 pub fill: Option<Fill>,
235 pub need_cast: bool,
240}
241
242impl PartialEq for RangeFn {
243 fn eq(&self, other: &Self) -> bool {
244 self.name == other.name
245 }
246}
247
248impl PartialOrd for RangeFn {
249 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
250 Some(self.cmp(other))
251 }
252}
253
254impl Ord for RangeFn {
255 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
256 self.name.cmp(&other.name)
257 }
258}
259
260impl std::hash::Hash for RangeFn {
261 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
262 self.name.hash(state);
263 }
264}
265
266impl Display for RangeFn {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 write!(f, "{}", self.name)
269 }
270}
271
272#[derive(Debug, PartialEq, Eq, Hash)]
273pub struct RangeSelect {
274 pub input: Arc<LogicalPlan>,
276 pub range_expr: Vec<RangeFn>,
278 pub align: Duration,
279 pub align_to: i64,
280 pub time_index: String,
281 pub time_expr: Expr,
282 pub by: Vec<Expr>,
283 pub schema: DFSchemaRef,
284 pub by_schema: DFSchemaRef,
285 pub schema_project: Option<Vec<usize>>,
289 pub schema_before_project: DFSchemaRef,
293}
294
295impl PartialOrd for RangeSelect {
296 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
297 match self.input.partial_cmp(&other.input) {
299 Some(Ordering::Equal) => {}
300 ord => return ord,
301 }
302 match self.range_expr.partial_cmp(&other.range_expr) {
303 Some(Ordering::Equal) => {}
304 ord => return ord,
305 }
306 match self.align.partial_cmp(&other.align) {
307 Some(Ordering::Equal) => {}
308 ord => return ord,
309 }
310 match self.align_to.partial_cmp(&other.align_to) {
311 Some(Ordering::Equal) => {}
312 ord => return ord,
313 }
314 match self.time_index.partial_cmp(&other.time_index) {
315 Some(Ordering::Equal) => {}
316 ord => return ord,
317 }
318 match self.time_expr.partial_cmp(&other.time_expr) {
319 Some(Ordering::Equal) => {}
320 ord => return ord,
321 }
322 match self.by.partial_cmp(&other.by) {
323 Some(Ordering::Equal) => {}
324 ord => return ord,
325 }
326 self.schema_project.partial_cmp(&other.schema_project)
327 }
328}
329
330impl RangeSelect {
331 pub fn try_new(
332 input: Arc<LogicalPlan>,
333 range_expr: Vec<RangeFn>,
334 align: Duration,
335 align_to: i64,
336 time_index: Expr,
337 by: Vec<Expr>,
338 projection_expr: &[Expr],
339 ) -> Result<Self> {
340 ensure!(
341 align.as_millis() != 0,
342 RangeQuerySnafu {
343 msg: "Can't use 0 as align in Range Query"
344 }
345 );
346 for expr in &range_expr {
347 ensure!(
348 expr.range.as_millis() != 0,
349 RangeQuerySnafu {
350 msg: format!(
351 "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
352 expr.name
353 )
354 }
355 );
356 }
357 let mut fields = range_expr
358 .iter()
359 .map(
360 |RangeFn {
361 name,
362 data_type,
363 fill,
364 ..
365 }| {
366 let field = Field::new(
367 name,
368 data_type.clone(),
369 !matches!(fill, Some(Fill::Const(..))),
371 );
372 Ok((None, Arc::new(field)))
373 },
374 )
375 .collect::<DfResult<Vec<_>>>()?;
376 let ts_field = time_index.to_field(input.schema().as_ref())?;
378 let time_index_name = ts_field.1.name().clone();
379 fields.push(ts_field);
380 let by_fields = exprlist_to_fields(&by, &input)?;
382 fields.extend(by_fields.clone());
383 let schema_before_project = Arc::new(DFSchema::new_with_metadata(
384 fields,
385 input.schema().metadata().clone(),
386 )?);
387 let by_schema = Arc::new(DFSchema::new_with_metadata(
388 by_fields,
389 input.schema().metadata().clone(),
390 )?);
391 let schema_project = projection_expr
395 .iter()
396 .map(|project_expr| {
397 if let Expr::Column(column) = project_expr {
398 schema_before_project
399 .index_of_column_by_name(column.relation.as_ref(), &column.name)
400 .ok_or(())
401 } else {
402 let (qualifier, field) = project_expr
403 .to_field(input.schema().as_ref())
404 .map_err(|_| ())?;
405 schema_before_project
406 .index_of_column_by_name(qualifier.as_ref(), field.name())
407 .ok_or(())
408 }
409 })
410 .collect::<std::result::Result<Vec<usize>, ()>>()
411 .ok();
412 let schema = if let Some(project) = &schema_project {
413 let project_field = project
414 .iter()
415 .map(|i| {
416 let f = schema_before_project.qualified_field(*i);
417 (f.0.cloned(), Arc::new(f.1.clone()))
418 })
419 .collect();
420 Arc::new(DFSchema::new_with_metadata(
421 project_field,
422 input.schema().metadata().clone(),
423 )?)
424 } else {
425 schema_before_project.clone()
426 };
427 Ok(Self {
428 input,
429 range_expr,
430 align,
431 align_to,
432 time_index: time_index_name,
433 time_expr: time_index,
434 schema,
435 by_schema,
436 by,
437 schema_project,
438 schema_before_project,
439 })
440 }
441}
442
443impl UserDefinedLogicalNodeCore for RangeSelect {
444 fn name(&self) -> &str {
445 "RangeSelect"
446 }
447
448 fn inputs(&self) -> Vec<&LogicalPlan> {
449 vec![&self.input]
450 }
451
452 fn schema(&self) -> &DFSchemaRef {
453 &self.schema
454 }
455
456 fn expressions(&self) -> Vec<Expr> {
457 self.range_expr
458 .iter()
459 .map(|expr| expr.expr.clone())
460 .chain([self.time_expr.clone()])
461 .chain(self.by.clone())
462 .collect()
463 }
464
465 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
466 write!(
467 f,
468 "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
469 self.range_expr
470 .iter()
471 .map(ToString::to_string)
472 .collect::<Vec<_>>()
473 .join(", "),
474 self.align.as_millis(),
475 self.align_to,
476 self.by
477 .iter()
478 .map(ToString::to_string)
479 .collect::<Vec<_>>()
480 .join(", "),
481 self.time_index
482 )
483 }
484
485 fn with_exprs_and_inputs(
486 &self,
487 exprs: Vec<Expr>,
488 inputs: Vec<LogicalPlan>,
489 ) -> DataFusionResult<Self> {
490 if inputs.is_empty() {
491 return Err(DataFusionError::Plan(
492 "RangeSelect: inputs is empty".to_string(),
493 ));
494 }
495 if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
496 return Err(DataFusionError::Plan(
497 "RangeSelect: exprs length not match".to_string(),
498 ));
499 }
500
501 let range_expr = exprs
502 .iter()
503 .zip(self.range_expr.iter())
504 .map(|(e, range)| RangeFn {
505 name: range.name.clone(),
506 data_type: range.data_type.clone(),
507 expr: e.clone(),
508 range: range.range,
509 fill: range.fill.clone(),
510 need_cast: range.need_cast,
511 })
512 .collect();
513 let time_expr = exprs[self.range_expr.len()].clone();
514 let by = exprs[self.range_expr.len() + 1..].to_vec();
515 Ok(Self {
516 align: self.align,
517 align_to: self.align_to,
518 range_expr,
519 input: Arc::new(inputs[0].clone()),
520 time_index: self.time_index.clone(),
521 time_expr,
522 schema: self.schema.clone(),
523 by,
524 by_schema: self.by_schema.clone(),
525 schema_project: self.schema_project.clone(),
526 schema_before_project: self.schema_before_project.clone(),
527 })
528 }
529}
530
531impl RangeSelect {
532 fn create_physical_expr_list(
533 &self,
534 is_count_aggr: bool,
535 exprs: &[Expr],
536 df_schema: &Arc<DFSchema>,
537 session_state: &SessionState,
538 ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
539 exprs
540 .iter()
541 .map(|e| match e {
542 #[expect(deprecated)]
548 Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549 &lit(COUNT_STAR_EXPANSION),
550 df_schema.as_ref(),
551 session_state.execution_props(),
552 ),
553 _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554 })
555 .collect::<DfResult<Vec<_>>>()
556 }
557
558 pub fn to_execution_plan(
559 &self,
560 logical_input: &LogicalPlan,
561 exec_input: Arc<dyn ExecutionPlan>,
562 session_state: &SessionState,
563 ) -> DfResult<Arc<dyn ExecutionPlan>> {
564 let fields: Vec<_> = self
565 .schema_before_project
566 .fields()
567 .iter()
568 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569 .collect();
570 let by_fields: Vec<_> = self
571 .by_schema
572 .fields()
573 .iter()
574 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575 .collect();
576 let input_dfschema = logical_input.schema();
577 let input_schema = exec_input.schema();
578 let range_exec: Vec<RangeFnExec> = self
579 .range_expr
580 .iter()
581 .map(|range_fn| {
582 let name = range_fn.expr.schema_name().to_string();
583 let range_expr = match &range_fn.expr {
584 Expr::Alias(expr) => expr.expr.as_ref(),
585 others => others,
586 };
587
588 let expr = match &range_expr {
589 Expr::AggregateFunction(aggr)
590 if (aggr.func.name() == "last_value"
591 || aggr.func.name() == "first_value") =>
592 {
593 let order_by = if !aggr.params.order_by.is_empty() {
594 aggr.params
595 .order_by
596 .iter()
597 .map(|x| {
598 create_physical_sort_expr(
599 x,
600 input_dfschema.as_ref(),
601 session_state.execution_props(),
602 )
603 })
604 .collect::<DfResult<Vec<_>>>()?
605 } else {
606 let time_index = create_physical_expr(
608 &self.time_expr,
609 input_dfschema.as_ref(),
610 session_state.execution_props(),
611 )?;
612 vec![PhysicalSortExpr {
613 expr: time_index,
614 options: SortOptions {
615 descending: false,
616 nulls_first: false,
617 },
618 }]
619 };
620 let arg = self.create_physical_expr_list(
621 false,
622 &aggr.params.args,
623 input_dfschema,
624 session_state,
625 )?;
626 AggregateExprBuilder::new(aggr.func.clone(), arg)
630 .schema(input_schema.clone())
631 .order_by(order_by)
632 .alias(name)
633 .build()
634 }
635 Expr::AggregateFunction(aggr) => {
636 let order_by = if !aggr.params.order_by.is_empty() {
637 aggr.params
638 .order_by
639 .iter()
640 .map(|x| {
641 create_physical_sort_expr(
642 x,
643 input_dfschema.as_ref(),
644 session_state.execution_props(),
645 )
646 })
647 .collect::<DfResult<Vec<_>>>()?
648 } else {
649 vec![]
650 };
651 let distinct = aggr.params.distinct;
652 let input_phy_exprs = self.create_physical_expr_list(
655 aggr.func.name() == "count",
656 &aggr.params.args,
657 input_dfschema,
658 session_state,
659 )?;
660 AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
661 .schema(input_schema.clone())
662 .order_by(order_by)
663 .with_distinct(distinct)
664 .alias(name)
665 .build()
666 }
667 _ => Err(DataFusionError::Plan(format!(
668 "Unexpected Expr: {} in RangeSelect",
669 range_fn.expr
670 ))),
671 }?;
672 Ok(RangeFnExec {
673 expr: Arc::new(expr),
674 range: range_fn.range.as_millis() as Millisecond,
675 fill: range_fn.fill.clone(),
676 need_cast: if range_fn.need_cast {
677 Some(range_fn.data_type.clone())
678 } else {
679 None
680 },
681 })
682 })
683 .collect::<DfResult<Vec<_>>>()?;
684 let schema_before_project = Arc::new(Schema::new(fields));
685 let schema = if let Some(project) = &self.schema_project {
686 Arc::new(schema_before_project.project(project)?)
687 } else {
688 schema_before_project.clone()
689 };
690 let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
691 let cache = PlanProperties::new(
692 EquivalenceProperties::new(schema.clone()),
693 Partitioning::UnknownPartitioning(1),
694 EmissionType::Incremental,
695 Boundedness::Bounded,
696 );
697 Ok(Arc::new(RangeSelectExec {
698 input: exec_input,
699 range_exec,
700 align: self.align.as_millis() as Millisecond,
701 align_to: self.align_to,
702 by,
703 time_index: self.time_index.clone(),
704 schema,
705 by_schema: Arc::new(Schema::new(by_fields)),
706 metric: ExecutionPlanMetricsSet::new(),
707 schema_before_project,
708 schema_project: self.schema_project.clone(),
709 cache,
710 }))
711 }
712}
713
714#[derive(Debug, Clone)]
716struct RangeFnExec {
717 expr: Arc<AggregateFunctionExpr>,
718 range: Millisecond,
719 fill: Option<Fill>,
720 need_cast: Option<DataType>,
721}
722
723impl RangeFnExec {
724 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
728 let mut exprs = self.expr.expressions();
729 exprs.extend(self.expr.order_bys().iter().map(|sort| sort.expr.clone()));
730 exprs
731 }
732}
733
734impl Display for RangeFnExec {
735 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736 if let Some(fill) = &self.fill {
737 write!(
738 f,
739 "{} RANGE {}s FILL {}",
740 self.expr.name(),
741 self.range / 1000,
742 fill
743 )
744 } else {
745 write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746 }
747 }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752 input: Arc<dyn ExecutionPlan>,
753 range_exec: Vec<RangeFnExec>,
754 align: Millisecond,
755 align_to: i64,
756 time_index: String,
757 by: Vec<Arc<dyn PhysicalExpr>>,
758 schema: SchemaRef,
759 by_schema: SchemaRef,
760 metric: ExecutionPlanMetricsSet,
761 schema_project: Option<Vec<usize>>,
762 schema_before_project: SchemaRef,
763 cache: PlanProperties,
764}
765
766impl DisplayAs for RangeSelectExec {
767 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768 match t {
769 DisplayFormatType::Default
770 | DisplayFormatType::Verbose
771 | DisplayFormatType::TreeRender => {
772 write!(f, "RangeSelectExec: ")?;
773 let range_expr_strs: Vec<String> =
774 self.range_exec.iter().map(RangeFnExec::to_string).collect();
775 let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
776 write!(
777 f,
778 "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
779 range_expr_strs.join(", "),
780 self.align,
781 self.align_to,
782 by.join(", "),
783 self.time_index,
784 )?;
785 }
786 }
787 Ok(())
788 }
789}
790
791impl ExecutionPlan for RangeSelectExec {
792 fn as_any(&self) -> &dyn std::any::Any {
793 self
794 }
795
796 fn schema(&self) -> SchemaRef {
797 self.schema.clone()
798 }
799
800 fn required_input_distribution(&self) -> Vec<Distribution> {
801 vec![Distribution::SinglePartition]
802 }
803
804 fn properties(&self) -> &PlanProperties {
805 &self.cache
806 }
807
808 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
809 vec![&self.input]
810 }
811
812 fn with_new_children(
813 self: Arc<Self>,
814 children: Vec<Arc<dyn ExecutionPlan>>,
815 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
816 assert!(!children.is_empty());
817 Ok(Arc::new(Self {
818 input: children[0].clone(),
819 range_exec: self.range_exec.clone(),
820 time_index: self.time_index.clone(),
821 by: self.by.clone(),
822 align: self.align,
823 align_to: self.align_to,
824 schema: self.schema.clone(),
825 by_schema: self.by_schema.clone(),
826 metric: self.metric.clone(),
827 schema_before_project: self.schema_before_project.clone(),
828 schema_project: self.schema_project.clone(),
829 cache: self.cache.clone(),
830 }))
831 }
832
833 fn execute(
834 &self,
835 partition: usize,
836 context: Arc<TaskContext>,
837 ) -> DfResult<DfSendableRecordBatchStream> {
838 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
839 let input = self.input.execute(partition, context)?;
840 let schema = input.schema();
841 let time_index = schema
842 .column_with_name(&self.time_index)
843 .ok_or(DataFusionError::Execution(
844 "time index column not found".into(),
845 ))?
846 .0;
847 let row_converter = RowConverter::new(
848 self.by_schema
849 .fields()
850 .iter()
851 .map(|f| SortField::new(f.data_type().clone()))
852 .collect(),
853 )?;
854 Ok(Box::pin(RangeSelectStream {
855 schema: self.schema.clone(),
856 range_exec: self.range_exec.clone(),
857 input,
858 random_state: RandomState::new(),
859 time_index,
860 align: self.align,
861 align_to: self.align_to,
862 by: self.by.clone(),
863 series_map: HashMap::new(),
864 exec_state: ExecutionState::ReadingInput,
865 num_not_null_rows: 0,
866 row_converter,
867 modify_map: HashMap::new(),
868 metric: baseline_metric,
869 schema_project: self.schema_project.clone(),
870 schema_before_project: self.schema_before_project.clone(),
871 }))
872 }
873
874 fn metrics(&self) -> Option<MetricsSet> {
875 Some(self.metric.clone_inner())
876 }
877
878 fn name(&self) -> &str {
879 "RanegSelectExec"
880 }
881}
882
883struct RangeSelectStream {
884 schema: SchemaRef,
886 range_exec: Vec<RangeFnExec>,
887 input: SendableRecordBatchStream,
888 time_index: usize,
890 align: Millisecond,
892 align_to: i64,
893 by: Vec<Arc<dyn PhysicalExpr>>,
894 exec_state: ExecutionState,
895 row_converter: RowConverter,
897 random_state: RandomState,
898 series_map: HashMap<u64, SeriesState>,
901 modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
905 num_not_null_rows: usize,
907 metric: BaselineMetrics,
908 schema_project: Option<Vec<usize>>,
909 schema_before_project: SchemaRef,
910}
911
912#[derive(Debug)]
913struct SeriesState {
914 row: OwnedRow,
916 align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
919}
920
921fn produce_align_time(
927 align_to: i64,
928 range: Millisecond,
929 align: Millisecond,
930 ts_column: &TimestampMillisecondArray,
931 by_columns_hash: &[u64],
932 modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
933) {
934 modify_map.clear();
935 for (row, hash) in by_columns_hash.iter().enumerate() {
937 let ts = ts_column.value(row);
938 let ith_slot = (ts - align_to).div_floor(align);
939 let mut align_ts = ith_slot * align + align_to;
940 while align_ts <= ts && ts < align_ts + range {
941 modify_map
942 .entry((*hash, align_ts))
943 .or_default()
944 .push(row as u32);
945 align_ts -= align;
946 }
947 }
948}
949
950fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
951 let array = ScalarValue::iter_to_array(values.to_vec())?;
952 let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
953 for (i, value) in values.iter_mut().enumerate() {
954 *value = ScalarValue::try_from_array(&cast_array, i)?;
955 }
956 Ok(())
957}
958
959impl RangeSelectStream {
960 fn evaluate_many(
961 &self,
962 batch: &RecordBatch,
963 exprs: &[Arc<dyn PhysicalExpr>],
964 ) -> DfResult<Vec<ArrayRef>> {
965 exprs
966 .iter()
967 .map(|expr| {
968 let value = expr.evaluate(batch)?;
969 value.into_array(batch.num_rows())
970 })
971 .collect::<DfResult<Vec<_>>>()
972 }
973
974 fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
975 let _timer = self.metric.elapsed_compute().timer();
976 let num_rows = batch.num_rows();
977 let by_arrays = self.evaluate_many(&batch, &self.by)?;
978 let mut hashes = vec![0; num_rows];
979 create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
980 let by_rows = self.row_converter.convert_columns(&by_arrays)?;
981 let mut ts_column = batch.column(self.time_index).clone();
982 if !matches!(
983 ts_column.data_type(),
984 DataType::Timestamp(TimeUnit::Millisecond, _)
985 ) {
986 ts_column = compute::cast(
987 ts_column.as_ref(),
988 &DataType::Timestamp(TimeUnit::Millisecond, None),
989 )?;
990 }
991 let ts_column_ref = ts_column
992 .as_any()
993 .downcast_ref::<TimestampMillisecondArray>()
994 .ok_or_else(|| {
995 DataFusionError::Execution(
996 "Time index Column downcast to TimestampMillisecondArray failed".into(),
997 )
998 })?;
999 for i in 0..self.range_exec.len() {
1000 let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1001 produce_align_time(
1003 self.align_to,
1004 self.range_exec[i].range,
1005 self.align,
1006 ts_column_ref,
1007 &hashes,
1008 &mut self.modify_map,
1009 );
1010 let mut modify_rows = UInt32Builder::with_capacity(0);
1012 let mut modify_index = Vec::with_capacity(self.modify_map.len());
1016 let mut offsets = vec![0];
1017 let mut offset_so_far = 0;
1018 for ((hash, ts), modify) in &self.modify_map {
1019 modify_rows.append_slice(modify);
1020 offset_so_far += modify.len();
1021 offsets.push(offset_so_far);
1022 modify_index.push((*hash, *ts, modify[0]));
1023 }
1024 let modify_rows = modify_rows.finish();
1025 let args = take_arrays(&args, &modify_rows, None)?;
1026 modify_index.iter().zip(offsets.windows(2)).try_for_each(
1027 |((hash, ts, row), offset)| {
1028 let (offset, length) = (offset[0], offset[1] - offset[0]);
1029 let sliced_arrays: Vec<ArrayRef> = args
1030 .iter()
1031 .map(|array| array.slice(offset, length))
1032 .collect();
1033 let accumulators_map =
1034 self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1035 row: by_rows.row(*row as usize).owned(),
1036 align_ts_accumulator: BTreeMap::new(),
1037 });
1038 match accumulators_map.align_ts_accumulator.entry(*ts) {
1039 Entry::Occupied(mut e) => {
1040 let accumulators = e.get_mut();
1041 accumulators[i].update_batch(&sliced_arrays)
1042 }
1043 Entry::Vacant(e) => {
1044 self.num_not_null_rows += 1;
1045 let mut accumulators = self
1046 .range_exec
1047 .iter()
1048 .map(|range| range.expr.create_accumulator())
1049 .collect::<DfResult<Vec<_>>>()?;
1050 let result = accumulators[i].update_batch(&sliced_arrays);
1051 e.insert(accumulators);
1052 result
1053 }
1054 }
1055 },
1056 )?;
1057 }
1058 Ok(())
1059 }
1060
1061 fn generate_output(&mut self) -> DfResult<RecordBatch> {
1062 let _timer = self.metric.elapsed_compute().timer();
1063 if self.series_map.is_empty() {
1064 return Ok(RecordBatch::new_empty(self.schema.clone()));
1065 }
1066 let mut columns: Vec<Arc<dyn Array>> =
1068 Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1069 let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1070 let mut all_scalar =
1071 vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1072 let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1073 let mut start_index = 0;
1074 let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1076 let padding_values = self
1078 .range_exec
1079 .iter()
1080 .map(|e| e.expr.create_accumulator()?.evaluate())
1081 .collect::<DfResult<Vec<_>>>()?;
1082 for SeriesState {
1083 row,
1084 align_ts_accumulator,
1085 } in self.series_map.values_mut()
1086 {
1087 if align_ts_accumulator.is_empty() {
1089 continue;
1090 }
1091 let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1093 let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1094 let align_ts = if need_fill_output {
1095 (begin_ts..=end_ts).step_by(self.align as usize).collect()
1097 } else {
1098 align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1099 };
1100 for ts in &align_ts {
1101 if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1102 for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1103 column.push(acc.evaluate()?);
1104 }
1105 } else {
1106 for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1108 column.push(padding.clone())
1109 }
1110 }
1111 }
1112 ts_builder.append_slice(&align_ts);
1113 for (
1115 i,
1116 RangeFnExec {
1117 fill, need_cast, ..
1118 },
1119 ) in self.range_exec.iter().enumerate()
1120 {
1121 let time_series_data =
1122 &mut all_scalar[i][start_index..start_index + align_ts.len()];
1123 if let Some(data_type) = need_cast {
1124 cast_scalar_values(time_series_data, data_type)?;
1125 }
1126 if let Some(fill) = fill {
1127 fill.apply_fill_strategy(&align_ts, time_series_data)?;
1128 }
1129 }
1130 by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1131 start_index += align_ts.len();
1132 }
1133 for column_scalar in all_scalar {
1134 columns.push(ScalarValue::iter_to_array(column_scalar)?);
1135 }
1136 let ts_column = ts_builder.finish();
1137 let ts_column = compute::cast(
1139 &ts_column,
1140 self.schema_before_project.field(columns.len()).data_type(),
1141 )?;
1142 columns.push(ts_column);
1143 columns.extend(self.row_converter.convert_rows(by_rows)?);
1144 let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1145 let project_output = if let Some(project) = &self.schema_project {
1146 output.project(project)?
1147 } else {
1148 output
1149 };
1150 Ok(project_output)
1151 }
1152}
1153
1154enum ExecutionState {
1155 ReadingInput,
1156 ProducingOutput,
1157 Done,
1158}
1159
1160impl RecordBatchStream for RangeSelectStream {
1161 fn schema(&self) -> SchemaRef {
1162 self.schema.clone()
1163 }
1164}
1165
1166impl Stream for RangeSelectStream {
1167 type Item = DataFusionResult<RecordBatch>;
1168
1169 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1170 loop {
1171 match self.exec_state {
1172 ExecutionState::ReadingInput => {
1173 match ready!(self.input.poll_next_unpin(cx)) {
1174 Some(Ok(batch)) => {
1176 if let Err(e) = self.update_range_context(batch) {
1177 common_telemetry::debug!(
1178 "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}", self.schema, e
1179 );
1180 return Poll::Ready(Some(Err(e)));
1181 }
1182 }
1183 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1185 None => {
1187 self.exec_state = ExecutionState::ProducingOutput;
1188 }
1189 }
1190 }
1191 ExecutionState::ProducingOutput => {
1192 let result = self.generate_output();
1193 return match result {
1194 Ok(batch) => {
1196 self.exec_state = ExecutionState::Done;
1197 Poll::Ready(Some(Ok(batch)))
1198 }
1199 Err(error) => Poll::Ready(Some(Err(error))),
1201 };
1202 }
1203 ExecutionState::Done => return Poll::Ready(None),
1204 }
1205 }
1206 }
1207}
1208
1209#[cfg(test)]
1210mod test {
1211 macro_rules! nullable_array {
1212 ($builder:ident,) => {
1213 };
1214 ($array_type:ident ; $($tail:tt)*) => {
1215 paste::item! {
1216 {
1217 let mut builder = arrow::array::[<$array_type Builder>]::new();
1218 nullable_array!(builder, $($tail)*);
1219 builder.finish()
1220 }
1221 }
1222 };
1223 ($builder:ident, null) => {
1224 $builder.append_null();
1225 };
1226 ($builder:ident, null, $($tail:tt)*) => {
1227 $builder.append_null();
1228 nullable_array!($builder, $($tail)*);
1229 };
1230 ($builder:ident, $value:literal) => {
1231 $builder.append_value($value);
1232 };
1233 ($builder:ident, $value:literal, $($tail:tt)*) => {
1234 $builder.append_value($value);
1235 nullable_array!($builder, $($tail)*);
1236 };
1237 }
1238
1239 use std::sync::Arc;
1240
1241 use arrow_schema::SortOptions;
1242 use datafusion::arrow::datatypes::{
1243 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1244 };
1245 use datafusion::datasource::memory::MemorySourceConfig;
1246 use datafusion::datasource::source::DataSourceExec;
1247 use datafusion::functions_aggregate::min_max;
1248 use datafusion::physical_plan::sorts::sort::SortExec;
1249 use datafusion::prelude::SessionContext;
1250 use datafusion_physical_expr::expressions::Column;
1251 use datafusion_physical_expr::PhysicalSortExpr;
1252 use datatypes::arrow::array::TimestampMillisecondArray;
1253 use datatypes::arrow_array::StringArray;
1254
1255 use super::*;
1256
1257 const TIME_INDEX_COLUMN: &str = "timestamp";
1258
1259 fn prepare_test_data(is_float: bool, is_gap: bool) -> DataSourceExec {
1260 let schema = Arc::new(Schema::new(vec![
1261 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1262 Field::new(
1263 "value",
1264 if is_float {
1265 DataType::Float64
1266 } else {
1267 DataType::Int64
1268 },
1269 true,
1270 ),
1271 Field::new("host", DataType::Utf8, true),
1272 ]));
1273 let timestamp_column: Arc<dyn Array> = if !is_gap {
1274 Arc::new(TimestampMillisecondArray::from(vec![
1275 0, 5_000, 10_000, 15_000, 20_000, 0, 5_000, 10_000, 15_000, 20_000, ])) as _
1278 } else {
1279 Arc::new(TimestampMillisecondArray::from(vec![
1280 0, 15_000, 0, 15_000, ])) as _
1283 };
1284 let mut host = vec!["host1"; timestamp_column.len() / 2];
1285 host.extend(vec!["host2"; timestamp_column.len() / 2]);
1286 let mut value_column: Arc<dyn Array> = if is_gap {
1287 Arc::new(nullable_array!(Int64;
1288 0, 6, 6, 12 )) as _
1291 } else {
1292 Arc::new(nullable_array!(Int64;
1293 0, null, 1, null, 2, 3, null, 4, null, 5 )) as _
1296 };
1297 if is_float {
1298 value_column =
1299 cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1300 .unwrap();
1301 }
1302 let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1303 let data = RecordBatch::try_new(
1304 schema.clone(),
1305 vec![timestamp_column, value_column, host_column],
1306 )
1307 .unwrap();
1308
1309 DataSourceExec::new(Arc::new(
1310 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1311 ))
1312 }
1313
1314 async fn do_range_select_test(
1315 range1: Millisecond,
1316 range2: Millisecond,
1317 align: Millisecond,
1318 fill: Option<Fill>,
1319 is_float: bool,
1320 is_gap: bool,
1321 expected: String,
1322 ) {
1323 let data_type = if is_float {
1324 DataType::Float64
1325 } else {
1326 DataType::Int64
1327 };
1328 let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1329 (Some(DataType::Float64), DataType::Float64)
1331 } else {
1332 (None, data_type.clone())
1333 };
1334 let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1335 let schema = Arc::new(Schema::new(vec![
1336 Field::new("MIN(value)", schema_data_type.clone(), true),
1337 Field::new("MAX(value)", schema_data_type, true),
1338 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1339 Field::new("host", DataType::Utf8, true),
1340 ]));
1341 let cache = PlanProperties::new(
1342 EquivalenceProperties::new(schema.clone()),
1343 Partitioning::UnknownPartitioning(1),
1344 EmissionType::Incremental,
1345 Boundedness::Bounded,
1346 );
1347 let input_schema = memory_exec.schema().clone();
1348 let range_select_exec = Arc::new(RangeSelectExec {
1349 input: memory_exec,
1350 range_exec: vec![
1351 RangeFnExec {
1352 expr: Arc::new(
1353 AggregateExprBuilder::new(
1354 min_max::min_udaf(),
1355 vec![Arc::new(Column::new("value", 1))],
1356 )
1357 .schema(input_schema.clone())
1358 .alias("MIN(value)")
1359 .build()
1360 .unwrap(),
1361 ),
1362 range: range1,
1363 fill: fill.clone(),
1364 need_cast: need_cast.clone(),
1365 },
1366 RangeFnExec {
1367 expr: Arc::new(
1368 AggregateExprBuilder::new(
1369 min_max::max_udaf(),
1370 vec![Arc::new(Column::new("value", 1))],
1371 )
1372 .schema(input_schema.clone())
1373 .alias("MAX(value)")
1374 .build()
1375 .unwrap(),
1376 ),
1377 range: range2,
1378 fill,
1379 need_cast,
1380 },
1381 ],
1382 align,
1383 align_to: 0,
1384 by: vec![Arc::new(Column::new("host", 2))],
1385 time_index: TIME_INDEX_COLUMN.to_string(),
1386 schema: schema.clone(),
1387 schema_before_project: schema.clone(),
1388 schema_project: None,
1389 by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1390 metric: ExecutionPlanMetricsSet::new(),
1391 cache,
1392 });
1393 let sort_exec = SortExec::new(
1394 [
1395 PhysicalSortExpr {
1396 expr: Arc::new(Column::new("host", 3)),
1397 options: SortOptions {
1398 descending: false,
1399 nulls_first: true,
1400 },
1401 },
1402 PhysicalSortExpr {
1403 expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1404 options: SortOptions {
1405 descending: false,
1406 nulls_first: true,
1407 },
1408 },
1409 ]
1410 .into(),
1411 range_select_exec,
1412 );
1413 let session_context = SessionContext::default();
1414 let result =
1415 datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1416 .await
1417 .unwrap();
1418
1419 let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1420 .unwrap()
1421 .to_string();
1422
1423 assert_eq!(result_literal, expected);
1424 }
1425
1426 #[tokio::test]
1427 async fn range_10s_align_1000s() {
1428 let expected = String::from(
1429 "+------------+------------+---------------------+-------+\
1430 \n| MIN(value) | MAX(value) | timestamp | host |\
1431 \n+------------+------------+---------------------+-------+\
1432 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1433 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1434 \n+------------+------------+---------------------+-------+",
1435 );
1436 do_range_select_test(
1437 10_000,
1438 10_000,
1439 1_000_000,
1440 Some(Fill::Null),
1441 true,
1442 false,
1443 expected,
1444 )
1445 .await;
1446 }
1447
1448 #[tokio::test]
1449 async fn range_fill_null() {
1450 let expected = String::from(
1451 "+------------+------------+---------------------+-------+\
1452 \n| MIN(value) | MAX(value) | timestamp | host |\
1453 \n+------------+------------+---------------------+-------+\
1454 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1455 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1456 \n| 1.0 | | 1970-01-01T00:00:05 | host1 |\
1457 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1458 \n| 2.0 | | 1970-01-01T00:00:15 | host1 |\
1459 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1460 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1461 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1462 \n| 4.0 | | 1970-01-01T00:00:05 | host2 |\
1463 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1464 \n| 5.0 | | 1970-01-01T00:00:15 | host2 |\
1465 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1466 \n+------------+------------+---------------------+-------+",
1467 );
1468 do_range_select_test(
1469 10_000,
1470 5_000,
1471 5_000,
1472 Some(Fill::Null),
1473 true,
1474 false,
1475 expected,
1476 )
1477 .await;
1478 }
1479
1480 #[tokio::test]
1481 async fn range_fill_prev() {
1482 let expected = String::from(
1483 "+------------+------------+---------------------+-------+\
1484 \n| MIN(value) | MAX(value) | timestamp | host |\
1485 \n+------------+------------+---------------------+-------+\
1486 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1487 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1488 \n| 1.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1489 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1490 \n| 2.0 | 1.0 | 1970-01-01T00:00:15 | host1 |\
1491 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1492 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1493 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1494 \n| 4.0 | 3.0 | 1970-01-01T00:00:05 | host2 |\
1495 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1496 \n| 5.0 | 4.0 | 1970-01-01T00:00:15 | host2 |\
1497 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1498 \n+------------+------------+---------------------+-------+",
1499 );
1500 do_range_select_test(
1501 10_000,
1502 5_000,
1503 5_000,
1504 Some(Fill::Prev),
1505 true,
1506 false,
1507 expected,
1508 )
1509 .await;
1510 }
1511
1512 #[tokio::test]
1513 async fn range_fill_linear() {
1514 let expected = String::from(
1515 "+------------+------------+---------------------+-------+\
1516 \n| MIN(value) | MAX(value) | timestamp | host |\
1517 \n+------------+------------+---------------------+-------+\
1518 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1519 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1520 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1521 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1522 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1523 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1524 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1525 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1526 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1527 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1528 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1529 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1530 \n+------------+------------+---------------------+-------+",
1531 );
1532 do_range_select_test(
1533 10_000,
1534 5_000,
1535 5_000,
1536 Some(Fill::Linear),
1537 true,
1538 false,
1539 expected,
1540 )
1541 .await;
1542 }
1543
1544 #[tokio::test]
1545 async fn range_fill_integer_linear() {
1546 let expected = String::from(
1547 "+------------+------------+---------------------+-------+\
1548 \n| MIN(value) | MAX(value) | timestamp | host |\
1549 \n+------------+------------+---------------------+-------+\
1550 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1551 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1552 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1553 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1554 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1555 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1556 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1557 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1558 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1559 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1560 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1561 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1562 \n+------------+------------+---------------------+-------+",
1563 );
1564 do_range_select_test(
1565 10_000,
1566 5_000,
1567 5_000,
1568 Some(Fill::Linear),
1569 false,
1570 false,
1571 expected,
1572 )
1573 .await;
1574 }
1575
1576 #[tokio::test]
1577 async fn range_fill_const() {
1578 let expected = String::from(
1579 "+------------+------------+---------------------+-------+\
1580 \n| MIN(value) | MAX(value) | timestamp | host |\
1581 \n+------------+------------+---------------------+-------+\
1582 \n| 0.0 | 6.6 | 1969-12-31T23:59:55 | host1 |\
1583 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1584 \n| 1.0 | 6.6 | 1970-01-01T00:00:05 | host1 |\
1585 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1586 \n| 2.0 | 6.6 | 1970-01-01T00:00:15 | host1 |\
1587 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1588 \n| 3.0 | 6.6 | 1969-12-31T23:59:55 | host2 |\
1589 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1590 \n| 4.0 | 6.6 | 1970-01-01T00:00:05 | host2 |\
1591 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1592 \n| 5.0 | 6.6 | 1970-01-01T00:00:15 | host2 |\
1593 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1594 \n+------------+------------+---------------------+-------+",
1595 );
1596 do_range_select_test(
1597 10_000,
1598 5_000,
1599 5_000,
1600 Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1601 true,
1602 false,
1603 expected,
1604 )
1605 .await;
1606 }
1607
1608 #[tokio::test]
1609 async fn range_fill_gap() {
1610 let expected = String::from(
1611 "+------------+------------+---------------------+-------+\
1612 \n| MIN(value) | MAX(value) | timestamp | host |\
1613 \n+------------+------------+---------------------+-------+\
1614 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1615 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1616 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1617 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1618 \n+------------+------------+---------------------+-------+",
1619 );
1620 do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1621 let expected = String::from(
1622 "+------------+------------+---------------------+-------+\
1623 \n| MIN(value) | MAX(value) | timestamp | host |\
1624 \n+------------+------------+---------------------+-------+\
1625 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1626 \n| | | 1970-01-01T00:00:05 | host1 |\
1627 \n| | | 1970-01-01T00:00:10 | host1 |\
1628 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1629 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1630 \n| | | 1970-01-01T00:00:05 | host2 |\
1631 \n| | | 1970-01-01T00:00:10 | host2 |\
1632 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1633 \n+------------+------------+---------------------+-------+",
1634 );
1635 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1636 let expected = String::from(
1637 "+------------+------------+---------------------+-------+\
1638 \n| MIN(value) | MAX(value) | timestamp | host |\
1639 \n+------------+------------+---------------------+-------+\
1640 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1641 \n| 0.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1642 \n| 0.0 | 0.0 | 1970-01-01T00:00:10 | host1 |\
1643 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1644 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1645 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1646 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1647 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1648 \n+------------+------------+---------------------+-------+",
1649 );
1650 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1651 let expected = String::from(
1652 "+------------+------------+---------------------+-------+\
1653 \n| MIN(value) | MAX(value) | timestamp | host |\
1654 \n+------------+------------+---------------------+-------+\
1655 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1656 \n| 2.0 | 2.0 | 1970-01-01T00:00:05 | host1 |\
1657 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host1 |\
1658 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1659 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1660 \n| 8.0 | 8.0 | 1970-01-01T00:00:05 | host2 |\
1661 \n| 10.0 | 10.0 | 1970-01-01T00:00:10 | host2 |\
1662 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1663 \n+------------+------------+---------------------+-------+",
1664 );
1665 do_range_select_test(
1666 5_000,
1667 5_000,
1668 5_000,
1669 Some(Fill::Linear),
1670 true,
1671 true,
1672 expected,
1673 )
1674 .await;
1675 let expected = String::from(
1676 "+------------+------------+---------------------+-------+\
1677 \n| MIN(value) | MAX(value) | timestamp | host |\
1678 \n+------------+------------+---------------------+-------+\
1679 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1680 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host1 |\
1681 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host1 |\
1682 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1683 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1684 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1685 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1686 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1687 \n+------------+------------+---------------------+-------+",
1688 );
1689 do_range_select_test(
1690 5_000,
1691 5_000,
1692 5_000,
1693 Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1694 true,
1695 true,
1696 expected,
1697 )
1698 .await;
1699 }
1700
1701 #[test]
1702 fn fill_test() {
1703 assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1704 assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1705 assert_eq!(
1706 Fill::try_from_str("Linear", &DataType::Boolean)
1707 .unwrap_err()
1708 .to_string(),
1709 "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1710 );
1711 assert_eq!(
1712 Fill::try_from_str("WHAT", &DataType::UInt8)
1713 .unwrap_err()
1714 .to_string(),
1715 "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1716 );
1717 assert_eq!(
1718 Fill::try_from_str("8.0", &DataType::UInt8)
1719 .unwrap_err()
1720 .to_string(),
1721 "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1722 );
1723 assert!(
1724 Fill::try_from_str("8", &DataType::UInt8).unwrap()
1725 == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1726 );
1727 let mut test1 = vec![
1728 ScalarValue::UInt8(Some(8)),
1729 ScalarValue::UInt8(None),
1730 ScalarValue::UInt8(Some(9)),
1731 ];
1732 Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1733 assert_eq!(test1[1], ScalarValue::UInt8(None));
1734 Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1735 assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1736 test1[1] = ScalarValue::UInt8(None);
1737 Fill::Const(ScalarValue::UInt8(Some(10)))
1738 .apply_fill_strategy(&[], &mut test1)
1739 .unwrap();
1740 assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1741 }
1742
1743 #[test]
1744 fn test_fill_linear() {
1745 let ts = vec![1, 2, 3, 4, 5];
1746 let mut test = vec![
1747 ScalarValue::Float32(Some(1.0)),
1748 ScalarValue::Float32(None),
1749 ScalarValue::Float32(Some(3.0)),
1750 ScalarValue::Float32(None),
1751 ScalarValue::Float32(Some(5.0)),
1752 ];
1753 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1754 let mut test1 = vec![
1755 ScalarValue::Float32(None),
1756 ScalarValue::Float32(Some(2.0)),
1757 ScalarValue::Float32(None),
1758 ScalarValue::Float32(Some(4.0)),
1759 ScalarValue::Float32(None),
1760 ];
1761 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1762 assert_eq!(test, test1);
1763 let ts = vec![
1765 1, 3, 8, 30, 88, 108, 128, ];
1773 let mut test = vec![
1774 ScalarValue::Float64(None),
1775 ScalarValue::Float64(Some(1.0)),
1776 ScalarValue::Float64(Some(11.0)),
1777 ScalarValue::Float64(None),
1778 ScalarValue::Float64(Some(10.0)),
1779 ScalarValue::Float64(Some(5.0)),
1780 ScalarValue::Float64(None),
1781 ];
1782 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1783 let data: Vec<_> = test
1784 .into_iter()
1785 .map(|x| {
1786 let ScalarValue::Float64(Some(f)) = x else {
1787 unreachable!()
1788 };
1789 f
1790 })
1791 .collect();
1792 assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1793 let ts = vec![1];
1795 let test = vec![ScalarValue::Float32(None)];
1796 let mut test1 = test.clone();
1797 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1798 assert_eq!(test, test1);
1799 }
1800}