1use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, CastOptions, cast_with_options, take_arrays};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::Result as DataFusionResult;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::TaskContext;
31use datafusion::execution::context::SessionState;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36 SendableRecordBatchStream,
37};
38use datafusion_common::hash_utils::create_hashes;
39use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
40use datafusion_expr::utils::{COUNT_STAR_EXPANSION, exprlist_to_fields};
41use datafusion_expr::{
42 Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore, lit,
43};
44use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
45use datafusion_physical_expr::{
46 Distribution, EquivalenceProperties, Partitioning, PhysicalExpr, PhysicalSortExpr,
47 create_physical_expr, create_physical_sort_expr,
48};
49use datatypes::arrow::array::{
50 Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
51};
52use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
53use datatypes::arrow::record_batch::RecordBatch;
54use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
55use futures::{Stream, ready};
56use futures_util::StreamExt;
57use snafu::ensure;
58
59use crate::error::{RangeQuerySnafu, Result};
60
61type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
62
63#[derive(PartialEq, Eq, Debug, Hash, Clone)]
64pub enum Fill {
65 Null,
66 Prev,
67 Linear,
68 Const(ScalarValue),
69}
70
71impl Display for Fill {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 Fill::Null => write!(f, "NULL"),
75 Fill::Prev => write!(f, "PREV"),
76 Fill::Linear => write!(f, "LINEAR"),
77 Fill::Const(x) => write!(f, "{}", x),
78 }
79 }
80}
81
82impl Fill {
83 pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
84 let s = value.to_uppercase();
85 match s.as_str() {
86 "" => Ok(None),
87 "NULL" => Ok(Some(Self::Null)),
88 "PREV" => Ok(Some(Self::Prev)),
89 "LINEAR" => {
90 if datatype.is_numeric() {
91 Ok(Some(Self::Linear))
92 } else {
93 Err(DataFusionError::Plan(format!(
94 "Use FILL LINEAR on Non-numeric DataType {}",
95 datatype
96 )))
97 }
98 }
99 _ => ScalarValue::try_from_string(s.clone(), datatype)
100 .map_err(|err| {
101 DataFusionError::Plan(format!(
102 "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
103 s, err
104 ))
105 })
106 .map(|x| Some(Fill::Const(x))),
107 }
108 }
109
110 pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
113 if matches!(self, Fill::Null) {
115 return Ok(());
116 }
117 let len = data.len();
118 if *self == Fill::Linear {
119 return Self::fill_linear(ts, data);
120 }
121 for i in 0..len {
122 if data[i].is_null() {
123 match self {
124 Fill::Prev => {
125 if i != 0 {
126 data[i] = data[i - 1].clone()
127 }
128 }
129 Fill::Linear | Fill::Null => unreachable!(),
133 Fill::Const(v) => data[i] = v.clone(),
134 }
135 }
136 }
137 Ok(())
138 }
139
140 fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
141 let not_null_num = data
142 .iter()
143 .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
144 if not_null_num < 2 {
146 return Ok(());
147 }
148 let mut index = 0;
149 let mut head: Option<usize> = None;
150 let mut tail: Option<usize> = None;
151 while index < data.len() {
152 let start = data[index..]
155 .iter()
156 .position(ScalarValue::is_null)
157 .unwrap_or(data.len() - index)
158 + index;
159 if start == data.len() {
160 break;
161 }
162 let end = data[start..]
163 .iter()
164 .position(|r| !r.is_null())
165 .unwrap_or(data.len() - start)
166 + start;
167 index = end + 1;
168 if start == 0 {
170 head = Some(end);
171 } else if end == data.len() {
172 tail = Some(start);
173 } else {
174 linear_interpolation(ts, data, start - 1, end, start, end)?;
175 }
176 }
177 if let Some(end) = head {
179 linear_interpolation(ts, data, end, end + 1, 0, end)?;
180 }
181 if let Some(start) = tail {
183 linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
184 }
185 Ok(())
186 }
187}
188
189fn linear_interpolation(
191 ts: &[i64],
192 data: &mut [ScalarValue],
193 i1: usize,
194 i2: usize,
195 start: usize,
196 end: usize,
197) -> DfResult<()> {
198 let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
199 let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
200 (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
201 (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
202 (*y0 as f64, *y1 as f64, true)
203 }
204 _ => {
205 return Err(DataFusionError::Execution(
206 "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
207 ));
208 }
209 };
210 if x1 == x0 {
212 return Err(DataFusionError::Execution(
213 "RangePlan: Linear interpolation using the same coordinate points".to_string(),
214 ));
215 }
216 for i in start..end {
217 let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
218 data[i] = if is_float32 {
219 ScalarValue::Float32(Some(val as f32))
220 } else {
221 ScalarValue::Float64(Some(val))
222 }
223 }
224 Ok(())
225}
226
227#[derive(Eq, Clone, Debug)]
228pub struct RangeFn {
229 pub name: String,
231 pub data_type: DataType,
232 pub expr: Expr,
233 pub range: Duration,
234 pub fill: Option<Fill>,
235 pub need_cast: bool,
240}
241
242impl PartialEq for RangeFn {
243 fn eq(&self, other: &Self) -> bool {
244 self.name == other.name
245 }
246}
247
248impl PartialOrd for RangeFn {
249 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
250 Some(self.cmp(other))
251 }
252}
253
254impl Ord for RangeFn {
255 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
256 self.name.cmp(&other.name)
257 }
258}
259
260impl std::hash::Hash for RangeFn {
261 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
262 self.name.hash(state);
263 }
264}
265
266impl Display for RangeFn {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 write!(f, "{}", self.name)
269 }
270}
271
272#[derive(Debug, PartialEq, Eq, Hash)]
273pub struct RangeSelect {
274 pub input: Arc<LogicalPlan>,
276 pub range_expr: Vec<RangeFn>,
278 pub align: Duration,
279 pub align_to: i64,
280 pub time_index: String,
281 pub time_expr: Expr,
282 pub by: Vec<Expr>,
283 pub schema: DFSchemaRef,
284 pub by_schema: DFSchemaRef,
285 pub schema_project: Option<Vec<usize>>,
289 pub schema_before_project: DFSchemaRef,
293}
294
295impl PartialOrd for RangeSelect {
296 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
297 match self.input.partial_cmp(&other.input) {
299 Some(Ordering::Equal) => {}
300 ord => return ord,
301 }
302 match self.range_expr.partial_cmp(&other.range_expr) {
303 Some(Ordering::Equal) => {}
304 ord => return ord,
305 }
306 match self.align.partial_cmp(&other.align) {
307 Some(Ordering::Equal) => {}
308 ord => return ord,
309 }
310 match self.align_to.partial_cmp(&other.align_to) {
311 Some(Ordering::Equal) => {}
312 ord => return ord,
313 }
314 match self.time_index.partial_cmp(&other.time_index) {
315 Some(Ordering::Equal) => {}
316 ord => return ord,
317 }
318 match self.time_expr.partial_cmp(&other.time_expr) {
319 Some(Ordering::Equal) => {}
320 ord => return ord,
321 }
322 match self.by.partial_cmp(&other.by) {
323 Some(Ordering::Equal) => {}
324 ord => return ord,
325 }
326 self.schema_project.partial_cmp(&other.schema_project)
327 }
328}
329
330impl RangeSelect {
331 pub fn try_new(
332 input: Arc<LogicalPlan>,
333 range_expr: Vec<RangeFn>,
334 align: Duration,
335 align_to: i64,
336 time_index: Expr,
337 by: Vec<Expr>,
338 projection_expr: &[Expr],
339 ) -> Result<Self> {
340 ensure!(
341 align.as_millis() != 0,
342 RangeQuerySnafu {
343 msg: "Can't use 0 as align in Range Query"
344 }
345 );
346 for expr in &range_expr {
347 ensure!(
348 expr.range.as_millis() != 0,
349 RangeQuerySnafu {
350 msg: format!(
351 "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
352 expr.name
353 )
354 }
355 );
356 }
357 let mut fields = range_expr
358 .iter()
359 .map(
360 |RangeFn {
361 name,
362 data_type,
363 fill,
364 ..
365 }| {
366 let field = Field::new(
367 name,
368 data_type.clone(),
369 !matches!(fill, Some(Fill::Const(..))),
371 );
372 Ok((None, Arc::new(field)))
373 },
374 )
375 .collect::<DfResult<Vec<_>>>()?;
376 let ts_field = time_index.to_field(input.schema().as_ref())?;
378 let time_index_name = ts_field.1.name().clone();
379 fields.push(ts_field);
380 let by_fields = exprlist_to_fields(&by, &input)?;
382 fields.extend(by_fields.clone());
383 let schema_before_project = Arc::new(DFSchema::new_with_metadata(
384 fields,
385 input.schema().metadata().clone(),
386 )?);
387 let by_schema = Arc::new(DFSchema::new_with_metadata(
388 by_fields,
389 input.schema().metadata().clone(),
390 )?);
391 let schema_project = projection_expr
395 .iter()
396 .map(|project_expr| {
397 if let Expr::Column(column) = project_expr {
398 schema_before_project
399 .index_of_column_by_name(column.relation.as_ref(), &column.name)
400 .ok_or(())
401 } else {
402 let (qualifier, field) = project_expr
403 .to_field(input.schema().as_ref())
404 .map_err(|_| ())?;
405 schema_before_project
406 .index_of_column_by_name(qualifier.as_ref(), field.name())
407 .ok_or(())
408 }
409 })
410 .collect::<std::result::Result<Vec<usize>, ()>>()
411 .ok();
412 let schema = if let Some(project) = &schema_project {
413 let project_field = project
414 .iter()
415 .map(|i| {
416 let f = schema_before_project.qualified_field(*i);
417 (f.0.cloned(), f.1.clone())
418 })
419 .collect();
420 Arc::new(DFSchema::new_with_metadata(
421 project_field,
422 input.schema().metadata().clone(),
423 )?)
424 } else {
425 schema_before_project.clone()
426 };
427 Ok(Self {
428 input,
429 range_expr,
430 align,
431 align_to,
432 time_index: time_index_name,
433 time_expr: time_index,
434 schema,
435 by_schema,
436 by,
437 schema_project,
438 schema_before_project,
439 })
440 }
441}
442
443impl UserDefinedLogicalNodeCore for RangeSelect {
444 fn name(&self) -> &str {
445 "RangeSelect"
446 }
447
448 fn inputs(&self) -> Vec<&LogicalPlan> {
449 vec![&self.input]
450 }
451
452 fn schema(&self) -> &DFSchemaRef {
453 &self.schema
454 }
455
456 fn expressions(&self) -> Vec<Expr> {
457 self.range_expr
458 .iter()
459 .map(|expr| expr.expr.clone())
460 .chain([self.time_expr.clone()])
461 .chain(self.by.clone())
462 .collect()
463 }
464
465 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
466 write!(
467 f,
468 "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
469 self.range_expr
470 .iter()
471 .map(ToString::to_string)
472 .collect::<Vec<_>>()
473 .join(", "),
474 self.align.as_millis(),
475 self.align_to,
476 self.by
477 .iter()
478 .map(ToString::to_string)
479 .collect::<Vec<_>>()
480 .join(", "),
481 self.time_index
482 )
483 }
484
485 fn with_exprs_and_inputs(
486 &self,
487 exprs: Vec<Expr>,
488 inputs: Vec<LogicalPlan>,
489 ) -> DataFusionResult<Self> {
490 if inputs.is_empty() {
491 return Err(DataFusionError::Plan(
492 "RangeSelect: inputs is empty".to_string(),
493 ));
494 }
495 if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
496 return Err(DataFusionError::Plan(
497 "RangeSelect: exprs length not match".to_string(),
498 ));
499 }
500
501 let range_expr = exprs
502 .iter()
503 .zip(self.range_expr.iter())
504 .map(|(e, range)| RangeFn {
505 name: range.name.clone(),
506 data_type: range.data_type.clone(),
507 expr: e.clone(),
508 range: range.range,
509 fill: range.fill.clone(),
510 need_cast: range.need_cast,
511 })
512 .collect();
513 let time_expr = exprs[self.range_expr.len()].clone();
514 let by = exprs[self.range_expr.len() + 1..].to_vec();
515 Ok(Self {
516 align: self.align,
517 align_to: self.align_to,
518 range_expr,
519 input: Arc::new(inputs[0].clone()),
520 time_index: self.time_index.clone(),
521 time_expr,
522 schema: self.schema.clone(),
523 by,
524 by_schema: self.by_schema.clone(),
525 schema_project: self.schema_project.clone(),
526 schema_before_project: self.schema_before_project.clone(),
527 })
528 }
529}
530
531impl RangeSelect {
532 fn create_physical_expr_list(
533 &self,
534 is_count_aggr: bool,
535 exprs: &[Expr],
536 df_schema: &Arc<DFSchema>,
537 session_state: &SessionState,
538 ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
539 exprs
540 .iter()
541 .map(|e| match e {
542 #[expect(deprecated)]
548 Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549 &lit(COUNT_STAR_EXPANSION),
550 df_schema.as_ref(),
551 session_state.execution_props(),
552 ),
553 _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554 })
555 .collect::<DfResult<Vec<_>>>()
556 }
557
558 pub fn to_execution_plan(
559 &self,
560 logical_input: &LogicalPlan,
561 exec_input: Arc<dyn ExecutionPlan>,
562 session_state: &SessionState,
563 ) -> DfResult<Arc<dyn ExecutionPlan>> {
564 let fields: Vec<_> = self
565 .schema_before_project
566 .fields()
567 .iter()
568 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569 .collect();
570 let by_fields: Vec<_> = self
571 .by_schema
572 .fields()
573 .iter()
574 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575 .collect();
576 let input_dfschema = logical_input.schema();
577 let input_schema = exec_input.schema();
578 let range_exec: Vec<RangeFnExec> = self
579 .range_expr
580 .iter()
581 .map(|range_fn| {
582 let name = range_fn.expr.schema_name().to_string();
583 let range_expr = match &range_fn.expr {
584 Expr::Alias(expr) => expr.expr.as_ref(),
585 others => others,
586 };
587
588 let expr = match &range_expr {
589 Expr::AggregateFunction(aggr)
590 if (aggr.func.name() == "last_value"
591 || aggr.func.name() == "first_value") =>
592 {
593 let order_by = if !aggr.params.order_by.is_empty() {
594 aggr.params
595 .order_by
596 .iter()
597 .map(|x| {
598 create_physical_sort_expr(
599 x,
600 input_dfschema.as_ref(),
601 session_state.execution_props(),
602 )
603 })
604 .collect::<DfResult<Vec<_>>>()?
605 } else {
606 let time_index = create_physical_expr(
608 &self.time_expr,
609 input_dfschema.as_ref(),
610 session_state.execution_props(),
611 )?;
612 vec![PhysicalSortExpr {
613 expr: time_index,
614 options: SortOptions {
615 descending: false,
616 nulls_first: false,
617 },
618 }]
619 };
620 let arg = self.create_physical_expr_list(
621 false,
622 &aggr.params.args,
623 input_dfschema,
624 session_state,
625 )?;
626 AggregateExprBuilder::new(aggr.func.clone(), arg)
630 .schema(input_schema.clone())
631 .order_by(order_by)
632 .alias(name)
633 .build()
634 }
635 Expr::AggregateFunction(aggr) => {
636 let order_by = if !aggr.params.order_by.is_empty() {
637 aggr.params
638 .order_by
639 .iter()
640 .map(|x| {
641 create_physical_sort_expr(
642 x,
643 input_dfschema.as_ref(),
644 session_state.execution_props(),
645 )
646 })
647 .collect::<DfResult<Vec<_>>>()?
648 } else {
649 vec![]
650 };
651 let distinct = aggr.params.distinct;
652 let input_phy_exprs = self.create_physical_expr_list(
655 aggr.func.name() == "count",
656 &aggr.params.args,
657 input_dfschema,
658 session_state,
659 )?;
660 AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
661 .schema(input_schema.clone())
662 .order_by(order_by)
663 .with_distinct(distinct)
664 .alias(name)
665 .build()
666 }
667 _ => Err(DataFusionError::Plan(format!(
668 "Unexpected Expr: {} in RangeSelect",
669 range_fn.expr
670 ))),
671 }?;
672 Ok(RangeFnExec {
673 expr: Arc::new(expr),
674 range: range_fn.range.as_millis() as Millisecond,
675 fill: range_fn.fill.clone(),
676 need_cast: if range_fn.need_cast {
677 Some(range_fn.data_type.clone())
678 } else {
679 None
680 },
681 })
682 })
683 .collect::<DfResult<Vec<_>>>()?;
684 let schema_before_project = Arc::new(Schema::new(fields));
685 let schema = if let Some(project) = &self.schema_project {
686 Arc::new(schema_before_project.project(project)?)
687 } else {
688 schema_before_project.clone()
689 };
690 let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
691 let cache = Arc::new(PlanProperties::new(
692 EquivalenceProperties::new(schema.clone()),
693 Partitioning::UnknownPartitioning(1),
694 EmissionType::Incremental,
695 Boundedness::Bounded,
696 ));
697 Ok(Arc::new(RangeSelectExec {
698 input: exec_input,
699 range_exec,
700 align: self.align.as_millis() as Millisecond,
701 align_to: self.align_to,
702 by,
703 time_index: self.time_index.clone(),
704 schema,
705 by_schema: Arc::new(Schema::new(by_fields)),
706 metric: ExecutionPlanMetricsSet::new(),
707 schema_before_project,
708 schema_project: self.schema_project.clone(),
709 cache,
710 }))
711 }
712}
713
714#[derive(Debug, Clone)]
716struct RangeFnExec {
717 expr: Arc<AggregateFunctionExpr>,
718 range: Millisecond,
719 fill: Option<Fill>,
720 need_cast: Option<DataType>,
721}
722
723impl RangeFnExec {
724 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
728 let mut exprs = self.expr.expressions();
729 exprs.extend(self.expr.order_bys().iter().map(|sort| sort.expr.clone()));
730 exprs
731 }
732}
733
734impl Display for RangeFnExec {
735 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736 if let Some(fill) = &self.fill {
737 write!(
738 f,
739 "{} RANGE {}s FILL {}",
740 self.expr.name(),
741 self.range / 1000,
742 fill
743 )
744 } else {
745 write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746 }
747 }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752 input: Arc<dyn ExecutionPlan>,
753 range_exec: Vec<RangeFnExec>,
754 align: Millisecond,
755 align_to: i64,
756 time_index: String,
757 by: Vec<Arc<dyn PhysicalExpr>>,
758 schema: SchemaRef,
759 by_schema: SchemaRef,
760 metric: ExecutionPlanMetricsSet,
761 schema_project: Option<Vec<usize>>,
762 schema_before_project: SchemaRef,
763 cache: Arc<PlanProperties>,
764}
765
766impl DisplayAs for RangeSelectExec {
767 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768 match t {
769 DisplayFormatType::Default
770 | DisplayFormatType::Verbose
771 | DisplayFormatType::TreeRender => {
772 write!(f, "RangeSelectExec: ")?;
773 let range_expr_strs: Vec<String> =
774 self.range_exec.iter().map(RangeFnExec::to_string).collect();
775 let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
776 write!(
777 f,
778 "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
779 range_expr_strs.join(", "),
780 self.align,
781 self.align_to,
782 by.join(", "),
783 self.time_index,
784 )?;
785 }
786 }
787 Ok(())
788 }
789}
790
791impl ExecutionPlan for RangeSelectExec {
792 fn as_any(&self) -> &dyn std::any::Any {
793 self
794 }
795
796 fn schema(&self) -> SchemaRef {
797 self.schema.clone()
798 }
799
800 fn required_input_distribution(&self) -> Vec<Distribution> {
801 vec![Distribution::SinglePartition]
802 }
803
804 fn properties(&self) -> &Arc<PlanProperties> {
805 &self.cache
806 }
807
808 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
809 vec![&self.input]
810 }
811
812 fn with_new_children(
813 self: Arc<Self>,
814 children: Vec<Arc<dyn ExecutionPlan>>,
815 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
816 assert!(!children.is_empty());
817 Ok(Arc::new(Self {
818 input: children[0].clone(),
819 range_exec: self.range_exec.clone(),
820 time_index: self.time_index.clone(),
821 by: self.by.clone(),
822 align: self.align,
823 align_to: self.align_to,
824 schema: self.schema.clone(),
825 by_schema: self.by_schema.clone(),
826 metric: self.metric.clone(),
827 schema_before_project: self.schema_before_project.clone(),
828 schema_project: self.schema_project.clone(),
829 cache: self.cache.clone(),
830 }))
831 }
832
833 fn execute(
834 &self,
835 partition: usize,
836 context: Arc<TaskContext>,
837 ) -> DfResult<DfSendableRecordBatchStream> {
838 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
839 let batch_size = context.session_config().batch_size();
840 let input = self.input.execute(partition, context)?;
841 let schema = input.schema();
842 let time_index = schema
843 .column_with_name(&self.time_index)
844 .ok_or(DataFusionError::Execution(
845 "time index column not found".into(),
846 ))?
847 .0;
848 let row_converter = RowConverter::new(
849 self.by_schema
850 .fields()
851 .iter()
852 .map(|f| SortField::new(f.data_type().clone()))
853 .collect(),
854 )?;
855 Ok(Box::pin(RangeSelectStream {
856 batch_size,
857 schema: self.schema.clone(),
858 range_exec: self.range_exec.clone(),
859 input,
860 random_state: RandomState::new(),
861 time_index,
862 align: self.align,
863 align_to: self.align_to,
864 by: self.by.clone(),
865 series_map: HashMap::new(),
866 exec_state: ExecutionState::ReadingInput,
867 num_not_null_rows: 0,
868 row_converter,
869 modify_map: HashMap::new(),
870 metric: baseline_metric,
871 schema_project: self.schema_project.clone(),
872 schema_before_project: self.schema_before_project.clone(),
873 output_batch: None,
874 output_batch_offset: 0,
875 }))
876 }
877
878 fn metrics(&self) -> Option<MetricsSet> {
879 Some(self.metric.clone_inner())
880 }
881
882 fn name(&self) -> &str {
883 "RanegSelectExec"
884 }
885}
886
887struct RangeSelectStream {
888 batch_size: usize,
889 schema: SchemaRef,
891 range_exec: Vec<RangeFnExec>,
892 input: SendableRecordBatchStream,
893 time_index: usize,
895 align: Millisecond,
897 align_to: i64,
898 by: Vec<Arc<dyn PhysicalExpr>>,
899 exec_state: ExecutionState,
900 row_converter: RowConverter,
902 random_state: RandomState,
903 series_map: HashMap<u64, SeriesState>,
906 modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
910 num_not_null_rows: usize,
912 metric: BaselineMetrics,
913 schema_project: Option<Vec<usize>>,
914 schema_before_project: SchemaRef,
915 output_batch: Option<RecordBatch>,
916 output_batch_offset: usize,
917}
918
919#[derive(Debug)]
920struct SeriesState {
921 row: OwnedRow,
923 align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
926}
927
928fn produce_align_time(
934 align_to: i64,
935 range: Millisecond,
936 align: Millisecond,
937 ts_column: &TimestampMillisecondArray,
938 by_columns_hash: &[u64],
939 modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
940) {
941 modify_map.clear();
942 for (row, hash) in by_columns_hash.iter().enumerate() {
944 let ts = ts_column.value(row);
945 let ith_slot = (ts - align_to).div_floor(align);
946 let mut align_ts = ith_slot * align + align_to;
947 while align_ts <= ts && ts < align_ts + range {
948 modify_map
949 .entry((*hash, align_ts))
950 .or_default()
951 .push(row as u32);
952 align_ts -= align;
953 }
954 }
955}
956
957fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
958 let array = ScalarValue::iter_to_array(values.to_vec())?;
959 let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
960 for (i, value) in values.iter_mut().enumerate() {
961 *value = ScalarValue::try_from_array(&cast_array, i)?;
962 }
963 Ok(())
964}
965
966impl RangeSelectStream {
967 fn evaluate_many(
968 &self,
969 batch: &RecordBatch,
970 exprs: &[Arc<dyn PhysicalExpr>],
971 ) -> DfResult<Vec<ArrayRef>> {
972 exprs
973 .iter()
974 .map(|expr| {
975 let value = expr.evaluate(batch)?;
976 value.into_array(batch.num_rows())
977 })
978 .collect::<DfResult<Vec<_>>>()
979 }
980
981 fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
982 let _timer = self.metric.elapsed_compute().timer();
983 let num_rows = batch.num_rows();
984 let by_arrays = self.evaluate_many(&batch, &self.by)?;
985 let mut hashes = vec![0; num_rows];
986 create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
987 let by_rows = self.row_converter.convert_columns(&by_arrays)?;
988 let mut ts_column = batch.column(self.time_index).clone();
989 if !matches!(
990 ts_column.data_type(),
991 DataType::Timestamp(TimeUnit::Millisecond, _)
992 ) {
993 ts_column = compute::cast(
994 ts_column.as_ref(),
995 &DataType::Timestamp(TimeUnit::Millisecond, None),
996 )?;
997 }
998 let ts_column_ref = ts_column
999 .as_any()
1000 .downcast_ref::<TimestampMillisecondArray>()
1001 .ok_or_else(|| {
1002 DataFusionError::Execution(
1003 "Time index Column downcast to TimestampMillisecondArray failed".into(),
1004 )
1005 })?;
1006 for i in 0..self.range_exec.len() {
1007 let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1008 produce_align_time(
1010 self.align_to,
1011 self.range_exec[i].range,
1012 self.align,
1013 ts_column_ref,
1014 &hashes,
1015 &mut self.modify_map,
1016 );
1017 let mut modify_rows = UInt32Builder::with_capacity(0);
1019 let mut modify_index = Vec::with_capacity(self.modify_map.len());
1023 let mut offsets = vec![0];
1024 let mut offset_so_far = 0;
1025 for ((hash, ts), modify) in &self.modify_map {
1026 modify_rows.append_slice(modify);
1027 offset_so_far += modify.len();
1028 offsets.push(offset_so_far);
1029 modify_index.push((*hash, *ts, modify[0]));
1030 }
1031 let modify_rows = modify_rows.finish();
1032 let args = take_arrays(&args, &modify_rows, None)?;
1033 modify_index.iter().zip(offsets.windows(2)).try_for_each(
1034 |((hash, ts, row), offset)| {
1035 let (offset, length) = (offset[0], offset[1] - offset[0]);
1036 let sliced_arrays: Vec<ArrayRef> = args
1037 .iter()
1038 .map(|array| array.slice(offset, length))
1039 .collect();
1040 let accumulators_map =
1041 self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1042 row: by_rows.row(*row as usize).owned(),
1043 align_ts_accumulator: BTreeMap::new(),
1044 });
1045 match accumulators_map.align_ts_accumulator.entry(*ts) {
1046 Entry::Occupied(mut e) => {
1047 let accumulators = e.get_mut();
1048 accumulators[i].update_batch(&sliced_arrays)
1049 }
1050 Entry::Vacant(e) => {
1051 self.num_not_null_rows += 1;
1052 let mut accumulators = self
1053 .range_exec
1054 .iter()
1055 .map(|range| range.expr.create_accumulator())
1056 .collect::<DfResult<Vec<_>>>()?;
1057 let result = accumulators[i].update_batch(&sliced_arrays);
1058 e.insert(accumulators);
1059 result
1060 }
1061 }
1062 },
1063 )?;
1064 }
1065 Ok(())
1066 }
1067
1068 fn generate_output(&mut self) -> DfResult<RecordBatch> {
1069 let _timer = self.metric.elapsed_compute().timer();
1070 if self.series_map.is_empty() {
1071 return Ok(RecordBatch::new_empty(self.schema.clone()));
1072 }
1073 let mut columns: Vec<Arc<dyn Array>> =
1075 Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1076 let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1077 let mut all_scalar =
1078 vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1079 let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1080 let mut start_index = 0;
1081 let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1083 let padding_values = self
1085 .range_exec
1086 .iter()
1087 .map(|e| e.expr.create_accumulator()?.evaluate())
1088 .collect::<DfResult<Vec<_>>>()?;
1089 for SeriesState {
1090 row,
1091 align_ts_accumulator,
1092 } in self.series_map.values_mut()
1093 {
1094 if align_ts_accumulator.is_empty() {
1096 continue;
1097 }
1098 let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1100 let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1101 let align_ts = if need_fill_output {
1102 (begin_ts..=end_ts).step_by(self.align as usize).collect()
1104 } else {
1105 align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1106 };
1107 for ts in &align_ts {
1108 if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1109 for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1110 column.push(acc.evaluate()?);
1111 }
1112 } else {
1113 for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1115 column.push(padding.clone())
1116 }
1117 }
1118 }
1119 ts_builder.append_slice(&align_ts);
1120 for (
1122 i,
1123 RangeFnExec {
1124 fill, need_cast, ..
1125 },
1126 ) in self.range_exec.iter().enumerate()
1127 {
1128 let time_series_data =
1129 &mut all_scalar[i][start_index..start_index + align_ts.len()];
1130 if let Some(data_type) = need_cast {
1131 cast_scalar_values(time_series_data, data_type)?;
1132 }
1133 if let Some(fill) = fill {
1134 fill.apply_fill_strategy(&align_ts, time_series_data)?;
1135 }
1136 }
1137 by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1138 start_index += align_ts.len();
1139 }
1140 for column_scalar in all_scalar {
1141 columns.push(ScalarValue::iter_to_array(column_scalar)?);
1142 }
1143 let ts_column = ts_builder.finish();
1144 let ts_column = compute::cast(
1146 &ts_column,
1147 self.schema_before_project.field(columns.len()).data_type(),
1148 )?;
1149 columns.push(ts_column);
1150 columns.extend(self.row_converter.convert_rows(by_rows)?);
1151 let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1152 let project_output = if let Some(project) = &self.schema_project {
1153 output.project(project)?
1154 } else {
1155 output
1156 };
1157 Ok(project_output)
1158 }
1159
1160 fn next_output_batch(&mut self) -> DfResult<Option<RecordBatch>> {
1161 if self.output_batch.is_none() {
1162 self.output_batch = Some(self.generate_output()?);
1163 self.output_batch_offset = 0;
1164 }
1165
1166 let num_rows = self.output_batch.as_ref().unwrap().num_rows();
1167 if num_rows == 0 {
1168 self.output_batch = None;
1169 self.output_batch_offset = 0;
1170 return Ok(None);
1171 }
1172
1173 if self.output_batch_offset == 0 && num_rows <= self.batch_size {
1174 return Ok(self.output_batch.take());
1175 }
1176
1177 let offset = self.output_batch_offset;
1178 let len = (num_rows - offset).min(self.batch_size);
1179 let batch = self.output_batch.as_ref().unwrap().slice(offset, len);
1180 self.output_batch_offset += len;
1181
1182 if self.output_batch_offset >= num_rows {
1183 self.output_batch = None;
1184 self.output_batch_offset = 0;
1185 }
1186
1187 Ok(Some(batch))
1188 }
1189}
1190
1191enum ExecutionState {
1192 ReadingInput,
1193 ProducingOutput,
1194 Done,
1195}
1196
1197impl RecordBatchStream for RangeSelectStream {
1198 fn schema(&self) -> SchemaRef {
1199 self.schema.clone()
1200 }
1201}
1202
1203impl Stream for RangeSelectStream {
1204 type Item = DataFusionResult<RecordBatch>;
1205
1206 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1207 loop {
1208 match self.exec_state {
1209 ExecutionState::ReadingInput => {
1210 match ready!(self.input.poll_next_unpin(cx)) {
1211 Some(Ok(batch)) => {
1213 if let Err(e) = self.update_range_context(batch) {
1214 common_telemetry::debug!(
1215 "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}",
1216 self.schema,
1217 e
1218 );
1219 return Poll::Ready(Some(Err(e)));
1220 }
1221 }
1222 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1224 None => {
1226 self.exec_state = ExecutionState::ProducingOutput;
1227 }
1228 }
1229 }
1230 ExecutionState::ProducingOutput => {
1231 let result = self.next_output_batch();
1232 return match result {
1233 Ok(Some(batch)) => {
1235 if self.output_batch.is_none() {
1236 self.exec_state = ExecutionState::Done;
1237 }
1238 Poll::Ready(Some(Ok(batch)))
1239 }
1240 Ok(None) => {
1241 self.exec_state = ExecutionState::Done;
1242 Poll::Ready(None)
1243 }
1244 Err(error) => Poll::Ready(Some(Err(error))),
1246 };
1247 }
1248 ExecutionState::Done => return Poll::Ready(None),
1249 }
1250 }
1251 }
1252}
1253
1254#[cfg(test)]
1255mod test {
1256 macro_rules! nullable_array {
1257 ($builder:ident,) => {
1258 };
1259 ($array_type:ident ; $($tail:tt)*) => {
1260 paste::item! {
1261 {
1262 let mut builder = arrow::array::[<$array_type Builder>]::new();
1263 nullable_array!(builder, $($tail)*);
1264 builder.finish()
1265 }
1266 }
1267 };
1268 ($builder:ident, null) => {
1269 $builder.append_null();
1270 };
1271 ($builder:ident, null, $($tail:tt)*) => {
1272 $builder.append_null();
1273 nullable_array!($builder, $($tail)*);
1274 };
1275 ($builder:ident, $value:literal) => {
1276 $builder.append_value($value);
1277 };
1278 ($builder:ident, $value:literal, $($tail:tt)*) => {
1279 $builder.append_value($value);
1280 nullable_array!($builder, $($tail)*);
1281 };
1282 }
1283
1284 use std::sync::Arc;
1285
1286 use arrow_schema::SortOptions;
1287 use datafusion::arrow::datatypes::{
1288 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1289 };
1290 use datafusion::datasource::memory::MemorySourceConfig;
1291 use datafusion::datasource::source::DataSourceExec;
1292 use datafusion::functions_aggregate::min_max;
1293 use datafusion::physical_plan::sorts::sort::SortExec;
1294 use datafusion::prelude::SessionContext;
1295 use datafusion_physical_expr::PhysicalSortExpr;
1296 use datafusion_physical_expr::expressions::Column;
1297 use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray};
1298 use datatypes::arrow_array::StringArray;
1299
1300 use super::*;
1301
1302 const TIME_INDEX_COLUMN: &str = "timestamp";
1303
1304 fn prepare_test_data(is_float: bool, is_gap: bool) -> DataSourceExec {
1305 let schema = Arc::new(Schema::new(vec![
1306 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1307 Field::new(
1308 "value",
1309 if is_float {
1310 DataType::Float64
1311 } else {
1312 DataType::Int64
1313 },
1314 true,
1315 ),
1316 Field::new("host", DataType::Utf8, true),
1317 ]));
1318 let timestamp_column: Arc<dyn Array> = if !is_gap {
1319 Arc::new(TimestampMillisecondArray::from(vec![
1320 0, 5_000, 10_000, 15_000, 20_000, 0, 5_000, 10_000, 15_000, 20_000, ])) as _
1323 } else {
1324 Arc::new(TimestampMillisecondArray::from(vec![
1325 0, 15_000, 0, 15_000, ])) as _
1328 };
1329 let mut host = vec!["host1"; timestamp_column.len() / 2];
1330 host.extend(vec!["host2"; timestamp_column.len() / 2]);
1331 let mut value_column: Arc<dyn Array> = if is_gap {
1332 Arc::new(nullable_array!(Int64;
1333 0, 6, 6, 12 )) as _
1336 } else {
1337 Arc::new(nullable_array!(Int64;
1338 0, null, 1, null, 2, 3, null, 4, null, 5 )) as _
1341 };
1342 if is_float {
1343 value_column =
1344 cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1345 .unwrap();
1346 }
1347 let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1348 let data = RecordBatch::try_new(
1349 schema.clone(),
1350 vec![timestamp_column, value_column, host_column],
1351 )
1352 .unwrap();
1353
1354 DataSourceExec::new(Arc::new(
1355 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1356 ))
1357 }
1358
1359 fn prepare_empty_test_data(is_float: bool) -> DataSourceExec {
1360 let schema = Arc::new(Schema::new(vec![
1361 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1362 Field::new(
1363 "value",
1364 if is_float {
1365 DataType::Float64
1366 } else {
1367 DataType::Int64
1368 },
1369 true,
1370 ),
1371 Field::new("host", DataType::Utf8, true),
1372 ]));
1373 let timestamp_column: Arc<dyn Array> =
1374 Arc::new(TimestampMillisecondArray::from(Vec::<i64>::new())) as _;
1375 let value_column: Arc<dyn Array> = if is_float {
1376 Arc::new(Float64Array::from(Vec::<Option<f64>>::new())) as _
1377 } else {
1378 Arc::new(Int64Array::from(Vec::<Option<i64>>::new())) as _
1379 };
1380 let host_column: Arc<dyn Array> =
1381 Arc::new(StringArray::from(Vec::<Option<&str>>::new())) as _;
1382 let data = RecordBatch::try_new(
1383 schema.clone(),
1384 vec![timestamp_column, value_column, host_column],
1385 )
1386 .unwrap();
1387
1388 DataSourceExec::new(Arc::new(
1389 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1390 ))
1391 }
1392
1393 async fn collect_range_select_test(
1394 range1: Millisecond,
1395 range2: Millisecond,
1396 align: Millisecond,
1397 fill: Option<Fill>,
1398 is_float: bool,
1399 is_gap: bool,
1400 batch_size: usize,
1401 ) -> Vec<RecordBatch> {
1402 let data_type = if is_float {
1403 DataType::Float64
1404 } else {
1405 DataType::Int64
1406 };
1407 let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1408 (Some(DataType::Float64), DataType::Float64)
1410 } else {
1411 (None, data_type.clone())
1412 };
1413 let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1414 let schema = Arc::new(Schema::new(vec![
1415 Field::new("MIN(value)", schema_data_type.clone(), true),
1416 Field::new("MAX(value)", schema_data_type, true),
1417 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1418 Field::new("host", DataType::Utf8, true),
1419 ]));
1420 let cache = Arc::new(PlanProperties::new(
1421 EquivalenceProperties::new(schema.clone()),
1422 Partitioning::UnknownPartitioning(1),
1423 EmissionType::Incremental,
1424 Boundedness::Bounded,
1425 ));
1426 let input_schema = memory_exec.schema().clone();
1427 let range_select_exec = Arc::new(RangeSelectExec {
1428 input: memory_exec,
1429 range_exec: vec![
1430 RangeFnExec {
1431 expr: Arc::new(
1432 AggregateExprBuilder::new(
1433 min_max::min_udaf(),
1434 vec![Arc::new(Column::new("value", 1))],
1435 )
1436 .schema(input_schema.clone())
1437 .alias("MIN(value)")
1438 .build()
1439 .unwrap(),
1440 ),
1441 range: range1,
1442 fill: fill.clone(),
1443 need_cast: need_cast.clone(),
1444 },
1445 RangeFnExec {
1446 expr: Arc::new(
1447 AggregateExprBuilder::new(
1448 min_max::max_udaf(),
1449 vec![Arc::new(Column::new("value", 1))],
1450 )
1451 .schema(input_schema.clone())
1452 .alias("MAX(value)")
1453 .build()
1454 .unwrap(),
1455 ),
1456 range: range2,
1457 fill,
1458 need_cast,
1459 },
1460 ],
1461 align,
1462 align_to: 0,
1463 by: vec![Arc::new(Column::new("host", 2))],
1464 time_index: TIME_INDEX_COLUMN.to_string(),
1465 schema: schema.clone(),
1466 schema_before_project: schema.clone(),
1467 schema_project: None,
1468 by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1469 metric: ExecutionPlanMetricsSet::new(),
1470 cache,
1471 });
1472 let sort_exec = SortExec::new(
1473 [
1474 PhysicalSortExpr {
1475 expr: Arc::new(Column::new("host", 3)),
1476 options: SortOptions {
1477 descending: false,
1478 nulls_first: true,
1479 },
1480 },
1481 PhysicalSortExpr {
1482 expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1483 options: SortOptions {
1484 descending: false,
1485 nulls_first: true,
1486 },
1487 },
1488 ]
1489 .into(),
1490 range_select_exec,
1491 );
1492 let session_context = SessionContext::new_with_config(
1493 datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size),
1494 );
1495 datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1496 .await
1497 .unwrap()
1498 }
1499
1500 async fn do_range_select_test(
1501 range1: Millisecond,
1502 range2: Millisecond,
1503 align: Millisecond,
1504 fill: Option<Fill>,
1505 is_float: bool,
1506 is_gap: bool,
1507 expected: String,
1508 ) {
1509 let result =
1510 collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await;
1511
1512 let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1513 .unwrap()
1514 .to_string();
1515
1516 assert_eq!(result_literal, expected);
1517 }
1518
1519 #[tokio::test]
1520 async fn range_10s_align_1000s() {
1521 let expected = String::from(
1522 "+------------+------------+---------------------+-------+\
1523 \n| MIN(value) | MAX(value) | timestamp | host |\
1524 \n+------------+------------+---------------------+-------+\
1525 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1526 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1527 \n+------------+------------+---------------------+-------+",
1528 );
1529 do_range_select_test(
1530 10_000,
1531 10_000,
1532 1_000_000,
1533 Some(Fill::Null),
1534 true,
1535 false,
1536 expected,
1537 )
1538 .await;
1539 }
1540
1541 #[tokio::test]
1542 async fn range_fill_null() {
1543 let expected = String::from(
1544 "+------------+------------+---------------------+-------+\
1545 \n| MIN(value) | MAX(value) | timestamp | host |\
1546 \n+------------+------------+---------------------+-------+\
1547 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1548 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1549 \n| 1.0 | | 1970-01-01T00:00:05 | host1 |\
1550 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1551 \n| 2.0 | | 1970-01-01T00:00:15 | host1 |\
1552 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1553 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1554 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1555 \n| 4.0 | | 1970-01-01T00:00:05 | host2 |\
1556 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1557 \n| 5.0 | | 1970-01-01T00:00:15 | host2 |\
1558 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1559 \n+------------+------------+---------------------+-------+",
1560 );
1561 do_range_select_test(
1562 10_000,
1563 5_000,
1564 5_000,
1565 Some(Fill::Null),
1566 true,
1567 false,
1568 expected,
1569 )
1570 .await;
1571 }
1572
1573 #[tokio::test]
1574 async fn range_fill_prev() {
1575 let expected = String::from(
1576 "+------------+------------+---------------------+-------+\
1577 \n| MIN(value) | MAX(value) | timestamp | host |\
1578 \n+------------+------------+---------------------+-------+\
1579 \n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
1580 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1581 \n| 1.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1582 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1583 \n| 2.0 | 1.0 | 1970-01-01T00:00:15 | host1 |\
1584 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1585 \n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
1586 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1587 \n| 4.0 | 3.0 | 1970-01-01T00:00:05 | host2 |\
1588 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1589 \n| 5.0 | 4.0 | 1970-01-01T00:00:15 | host2 |\
1590 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1591 \n+------------+------------+---------------------+-------+",
1592 );
1593 do_range_select_test(
1594 10_000,
1595 5_000,
1596 5_000,
1597 Some(Fill::Prev),
1598 true,
1599 false,
1600 expected,
1601 )
1602 .await;
1603 }
1604
1605 #[tokio::test]
1606 async fn range_fill_linear() {
1607 let expected = String::from(
1608 "+------------+------------+---------------------+-------+\
1609 \n| MIN(value) | MAX(value) | timestamp | host |\
1610 \n+------------+------------+---------------------+-------+\
1611 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1612 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1613 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1614 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1615 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1616 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1617 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1618 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1619 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1620 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1621 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1622 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1623 \n+------------+------------+---------------------+-------+",
1624 );
1625 do_range_select_test(
1626 10_000,
1627 5_000,
1628 5_000,
1629 Some(Fill::Linear),
1630 true,
1631 false,
1632 expected,
1633 )
1634 .await;
1635 }
1636
1637 #[tokio::test]
1638 async fn range_fill_integer_linear() {
1639 let expected = String::from(
1640 "+------------+------------+---------------------+-------+\
1641 \n| MIN(value) | MAX(value) | timestamp | host |\
1642 \n+------------+------------+---------------------+-------+\
1643 \n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
1644 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1645 \n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
1646 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1647 \n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
1648 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1649 \n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
1650 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1651 \n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
1652 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1653 \n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
1654 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1655 \n+------------+------------+---------------------+-------+",
1656 );
1657 do_range_select_test(
1658 10_000,
1659 5_000,
1660 5_000,
1661 Some(Fill::Linear),
1662 false,
1663 false,
1664 expected,
1665 )
1666 .await;
1667 }
1668
1669 #[tokio::test]
1670 async fn range_fill_const() {
1671 let expected = String::from(
1672 "+------------+------------+---------------------+-------+\
1673 \n| MIN(value) | MAX(value) | timestamp | host |\
1674 \n+------------+------------+---------------------+-------+\
1675 \n| 0.0 | 6.6 | 1969-12-31T23:59:55 | host1 |\
1676 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1677 \n| 1.0 | 6.6 | 1970-01-01T00:00:05 | host1 |\
1678 \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
1679 \n| 2.0 | 6.6 | 1970-01-01T00:00:15 | host1 |\
1680 \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
1681 \n| 3.0 | 6.6 | 1969-12-31T23:59:55 | host2 |\
1682 \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
1683 \n| 4.0 | 6.6 | 1970-01-01T00:00:05 | host2 |\
1684 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
1685 \n| 5.0 | 6.6 | 1970-01-01T00:00:15 | host2 |\
1686 \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
1687 \n+------------+------------+---------------------+-------+",
1688 );
1689 do_range_select_test(
1690 10_000,
1691 5_000,
1692 5_000,
1693 Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1694 true,
1695 false,
1696 expected,
1697 )
1698 .await;
1699 }
1700
1701 #[tokio::test]
1702 async fn range_fill_gap() {
1703 let expected = String::from(
1704 "+------------+------------+---------------------+-------+\
1705 \n| MIN(value) | MAX(value) | timestamp | host |\
1706 \n+------------+------------+---------------------+-------+\
1707 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1708 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1709 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1710 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1711 \n+------------+------------+---------------------+-------+",
1712 );
1713 do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1714 let expected = String::from(
1715 "+------------+------------+---------------------+-------+\
1716 \n| MIN(value) | MAX(value) | timestamp | host |\
1717 \n+------------+------------+---------------------+-------+\
1718 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1719 \n| | | 1970-01-01T00:00:05 | host1 |\
1720 \n| | | 1970-01-01T00:00:10 | host1 |\
1721 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1722 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1723 \n| | | 1970-01-01T00:00:05 | host2 |\
1724 \n| | | 1970-01-01T00:00:10 | host2 |\
1725 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1726 \n+------------+------------+---------------------+-------+",
1727 );
1728 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1729 let expected = String::from(
1730 "+------------+------------+---------------------+-------+\
1731 \n| MIN(value) | MAX(value) | timestamp | host |\
1732 \n+------------+------------+---------------------+-------+\
1733 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1734 \n| 0.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
1735 \n| 0.0 | 0.0 | 1970-01-01T00:00:10 | host1 |\
1736 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1737 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1738 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1739 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1740 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1741 \n+------------+------------+---------------------+-------+",
1742 );
1743 do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1744 let expected = String::from(
1745 "+------------+------------+---------------------+-------+\
1746 \n| MIN(value) | MAX(value) | timestamp | host |\
1747 \n+------------+------------+---------------------+-------+\
1748 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1749 \n| 2.0 | 2.0 | 1970-01-01T00:00:05 | host1 |\
1750 \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host1 |\
1751 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1752 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1753 \n| 8.0 | 8.0 | 1970-01-01T00:00:05 | host2 |\
1754 \n| 10.0 | 10.0 | 1970-01-01T00:00:10 | host2 |\
1755 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1756 \n+------------+------------+---------------------+-------+",
1757 );
1758 do_range_select_test(
1759 5_000,
1760 5_000,
1761 5_000,
1762 Some(Fill::Linear),
1763 true,
1764 true,
1765 expected,
1766 )
1767 .await;
1768 let expected = String::from(
1769 "+------------+------------+---------------------+-------+\
1770 \n| MIN(value) | MAX(value) | timestamp | host |\
1771 \n+------------+------------+---------------------+-------+\
1772 \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
1773 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host1 |\
1774 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host1 |\
1775 \n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
1776 \n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
1777 \n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
1778 \n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
1779 \n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
1780 \n+------------+------------+---------------------+-------+",
1781 );
1782 do_range_select_test(
1783 5_000,
1784 5_000,
1785 5_000,
1786 Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1787 true,
1788 true,
1789 expected,
1790 )
1791 .await;
1792 }
1793
1794 #[tokio::test]
1795 async fn range_select_respects_session_batch_size() {
1796 let result =
1797 collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await;
1798
1799 let row_counts = result
1800 .iter()
1801 .map(|batch| batch.num_rows())
1802 .collect::<Vec<_>>();
1803 assert_eq!(vec![3, 3, 3, 3], row_counts);
1804 }
1805
1806 #[tokio::test]
1807 async fn range_select_skips_empty_output_batch() {
1808 let memory_exec = Arc::new(prepare_empty_test_data(true));
1809 let schema = Arc::new(Schema::new(vec![
1810 Field::new("MIN(value)", DataType::Float64, true),
1811 Field::new("MAX(value)", DataType::Float64, true),
1812 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1813 Field::new("host", DataType::Utf8, true),
1814 ]));
1815 let cache = Arc::new(PlanProperties::new(
1816 EquivalenceProperties::new(schema.clone()),
1817 Partitioning::UnknownPartitioning(1),
1818 EmissionType::Incremental,
1819 Boundedness::Bounded,
1820 ));
1821 let input_schema = memory_exec.schema().clone();
1822 let range_select_exec = Arc::new(RangeSelectExec {
1823 input: memory_exec,
1824 range_exec: vec![
1825 RangeFnExec {
1826 expr: Arc::new(
1827 AggregateExprBuilder::new(
1828 min_max::min_udaf(),
1829 vec![Arc::new(Column::new("value", 1))],
1830 )
1831 .schema(input_schema.clone())
1832 .alias("MIN(value)")
1833 .build()
1834 .unwrap(),
1835 ),
1836 range: 10_000,
1837 fill: Some(Fill::Null),
1838 need_cast: None,
1839 },
1840 RangeFnExec {
1841 expr: Arc::new(
1842 AggregateExprBuilder::new(
1843 min_max::max_udaf(),
1844 vec![Arc::new(Column::new("value", 1))],
1845 )
1846 .schema(input_schema)
1847 .alias("MAX(value)")
1848 .build()
1849 .unwrap(),
1850 ),
1851 range: 5_000,
1852 fill: Some(Fill::Null),
1853 need_cast: None,
1854 },
1855 ],
1856 align: 5_000,
1857 align_to: 0,
1858 by: vec![Arc::new(Column::new("host", 2))],
1859 time_index: TIME_INDEX_COLUMN.to_string(),
1860 schema: schema.clone(),
1861 schema_before_project: schema.clone(),
1862 schema_project: None,
1863 by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1864 metric: ExecutionPlanMetricsSet::new(),
1865 cache,
1866 });
1867 let session_context = SessionContext::new();
1868 let result =
1869 datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx())
1870 .await
1871 .unwrap();
1872
1873 assert!(result.is_empty());
1874 }
1875
1876 #[test]
1877 fn fill_test() {
1878 assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1879 assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1880 assert_eq!(
1881 Fill::try_from_str("Linear", &DataType::Boolean)
1882 .unwrap_err()
1883 .to_string(),
1884 "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1885 );
1886 assert_eq!(
1887 Fill::try_from_str("WHAT", &DataType::UInt8)
1888 .unwrap_err()
1889 .to_string(),
1890 "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1891 );
1892 assert_eq!(
1893 Fill::try_from_str("8.0", &DataType::UInt8)
1894 .unwrap_err()
1895 .to_string(),
1896 "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1897 );
1898 assert!(
1899 Fill::try_from_str("8", &DataType::UInt8).unwrap()
1900 == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1901 );
1902 let mut test1 = vec![
1903 ScalarValue::UInt8(Some(8)),
1904 ScalarValue::UInt8(None),
1905 ScalarValue::UInt8(Some(9)),
1906 ];
1907 Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1908 assert_eq!(test1[1], ScalarValue::UInt8(None));
1909 Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1910 assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1911 test1[1] = ScalarValue::UInt8(None);
1912 Fill::Const(ScalarValue::UInt8(Some(10)))
1913 .apply_fill_strategy(&[], &mut test1)
1914 .unwrap();
1915 assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1916 }
1917
1918 #[test]
1919 fn test_fill_linear() {
1920 let ts = vec![1, 2, 3, 4, 5];
1921 let mut test = vec![
1922 ScalarValue::Float32(Some(1.0)),
1923 ScalarValue::Float32(None),
1924 ScalarValue::Float32(Some(3.0)),
1925 ScalarValue::Float32(None),
1926 ScalarValue::Float32(Some(5.0)),
1927 ];
1928 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1929 let mut test1 = vec![
1930 ScalarValue::Float32(None),
1931 ScalarValue::Float32(Some(2.0)),
1932 ScalarValue::Float32(None),
1933 ScalarValue::Float32(Some(4.0)),
1934 ScalarValue::Float32(None),
1935 ];
1936 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1937 assert_eq!(test, test1);
1938 let ts = vec![
1940 1, 3, 8, 30, 88, 108, 128, ];
1948 let mut test = vec![
1949 ScalarValue::Float64(None),
1950 ScalarValue::Float64(Some(1.0)),
1951 ScalarValue::Float64(Some(11.0)),
1952 ScalarValue::Float64(None),
1953 ScalarValue::Float64(Some(10.0)),
1954 ScalarValue::Float64(Some(5.0)),
1955 ScalarValue::Float64(None),
1956 ];
1957 Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1958 let data: Vec<_> = test
1959 .into_iter()
1960 .map(|x| {
1961 let ScalarValue::Float64(Some(f)) = x else {
1962 unreachable!()
1963 };
1964 f
1965 })
1966 .collect();
1967 assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1968 let ts = vec![1];
1970 let test = vec![ScalarValue::Float32(None)];
1971 let mut test1 = test.clone();
1972 Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1973 assert_eq!(test, test1);
1974 }
1975}