use std::any::Any;
use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use ahash::RandomState;
use arrow::compute::{self, cast_with_options, CastOptions, SortColumn};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
use common_recordbatch::DfSendableRecordBatchStream;
use datafusion::common::{Result as DataFusionResult, Statistics};
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::udaf::create_aggregate_expr as create_aggr_udf_expr;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion::physical_planner::create_physical_sort_expr;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::{get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION};
use datafusion_expr::{
lit, Accumulator, AggregateFunction, Expr, ExprSchemable, LogicalPlan,
UserDefinedLogicalNodeCore,
};
use datafusion_physical_expr::aggregate::utils::down_cast_any_ref;
use datafusion_physical_expr::expressions::create_aggregate_expr as create_aggr_expr;
use datafusion_physical_expr::{
create_physical_expr, AggregateExpr, Distribution, EquivalenceProperties, Partitioning,
PhysicalExpr, PhysicalSortExpr,
};
use datatypes::arrow::array::{
Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
};
use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
use futures::{ready, Stream};
use futures_util::StreamExt;
use snafu::{ensure, ResultExt};
use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result};
type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
#[derive(Debug)]
struct RangeFirstListValue {
expr: Arc<dyn PhysicalExpr>,
order_bys: Vec<PhysicalSortExpr>,
}
impl RangeFirstListValue {
pub fn new_aggregate_expr(
expr: Arc<dyn PhysicalExpr>,
order_bys: Vec<PhysicalSortExpr>,
) -> Arc<dyn AggregateExpr> {
Arc::new(Self { expr, order_bys })
}
}
impl PartialEq<dyn Any> for RangeFirstListValue {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| self.expr.eq(&x.expr) && self.order_bys.iter().eq(x.order_bys.iter()))
.unwrap_or(false)
}
}
impl AggregateExpr for RangeFirstListValue {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn create_accumulator(&self) -> DataFusionResult<Box<dyn Accumulator>> {
Ok(Box::new(RangeFirstListValueAcc::new(
self.order_bys.iter().map(|order| order.options).collect(),
)))
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
let mut exprs: Vec<_> = self
.order_bys
.iter()
.map(|order| order.expr.clone())
.collect();
exprs.push(self.expr.clone());
exprs
}
fn field(&self) -> DataFusionResult<Field> {
unreachable!("AggregateExpr::field will not be used in range query")
}
fn state_fields(&self) -> DataFusionResult<Vec<Field>> {
unreachable!("AggregateExpr::state_fields will not be used in range query")
}
}
#[derive(Debug)]
pub struct RangeFirstListValueAcc {
pub sort_options: Vec<SortOptions>,
pub sort_columns: Vec<ScalarValue>,
pub data: Option<ScalarValue>,
}
impl RangeFirstListValueAcc {
pub fn new(sort_options: Vec<SortOptions>) -> Self {
Self {
sort_options,
sort_columns: vec![],
data: None,
}
}
}
impl Accumulator for RangeFirstListValueAcc {
fn update_batch(&mut self, values: &[ArrayRef]) -> DataFusionResult<()> {
let columns: Vec<_> = values
.iter()
.zip(self.sort_options.iter())
.map(|(v, s)| SortColumn {
values: v.clone(),
options: Some(*s),
})
.collect();
let idx = compute::lexsort_to_indices(&columns, Some(1))?.value(0);
let vs = get_row_at_idx(values, idx as usize)?;
let need_update = self.data.is_none()
|| vs
.iter()
.zip(self.sort_columns.iter())
.zip(self.sort_options.iter())
.find_map(|((new_value, old_value), sort_option)| {
if new_value.is_null() && old_value.is_null() {
None
} else if sort_option.nulls_first
&& (new_value.is_null() || old_value.is_null())
{
Some(new_value.is_null())
} else {
new_value.partial_cmp(old_value).map(|x| {
(x == Ordering::Greater && sort_option.descending)
|| (x == Ordering::Less && !sort_option.descending)
})
}
})
.unwrap_or(false);
if need_update {
self.sort_columns = vs;
self.data = Some(ScalarValue::try_from_array(
&values[self.sort_options.len()],
idx as usize,
)?);
}
Ok(())
}
fn evaluate(&mut self) -> DataFusionResult<ScalarValue> {
Ok(self.data.clone().unwrap_or(ScalarValue::Null))
}
fn size(&self) -> usize {
std::mem::size_of_val(self)
}
fn state(&mut self) -> DataFusionResult<Vec<ScalarValue>> {
unreachable!("Accumulator::state will not be used in range query")
}
fn merge_batch(&mut self, _states: &[ArrayRef]) -> DataFusionResult<()> {
unreachable!("Accumulator::merge_batch will not be used in range query")
}
}
#[derive(PartialEq, Eq, Debug, Hash, Clone)]
pub enum Fill {
Null,
Prev,
Linear,
Const(ScalarValue),
}
impl Display for Fill {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Fill::Null => write!(f, "NULL"),
Fill::Prev => write!(f, "PREV"),
Fill::Linear => write!(f, "LINEAR"),
Fill::Const(x) => write!(f, "{}", x),
}
}
}
impl Fill {
pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
let s = value.to_uppercase();
match s.as_str() {
"" => Ok(None),
"NULL" => Ok(Some(Self::Null)),
"PREV" => Ok(Some(Self::Prev)),
"LINEAR" => {
if datatype.is_numeric() {
Ok(Some(Self::Linear))
} else {
Err(DataFusionError::Plan(format!(
"Use FILL LINEAR on Non-numeric DataType {}",
datatype
)))
}
}
_ => ScalarValue::try_from_string(s.clone(), datatype)
.map_err(|err| {
DataFusionError::Plan(format!(
"{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
s, err
))
})
.map(|x| Some(Fill::Const(x))),
}
}
pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
if matches!(self, Fill::Null) {
return Ok(());
}
let len = data.len();
if *self == Fill::Linear {
return Self::fill_linear(ts, data);
}
for i in 0..len {
if data[i].is_null() {
match self {
Fill::Prev => {
if i != 0 {
data[i] = data[i - 1].clone()
}
}
Fill::Linear | Fill::Null => unreachable!(),
Fill::Const(v) => data[i] = v.clone(),
}
}
}
Ok(())
}
fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
let not_null_num = data
.iter()
.fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
if not_null_num < 2 {
return Ok(());
}
let mut index = 0;
let mut head: Option<usize> = None;
let mut tail: Option<usize> = None;
while index < data.len() {
let start = data[index..]
.iter()
.position(ScalarValue::is_null)
.unwrap_or(data.len() - index)
+ index;
if start == data.len() {
break;
}
let end = data[start..]
.iter()
.position(|r| !r.is_null())
.unwrap_or(data.len() - start)
+ start;
index = end + 1;
if start == 0 {
head = Some(end);
} else if end == data.len() {
tail = Some(start);
} else {
linear_interpolation(ts, data, start - 1, end, start, end)?;
}
}
if let Some(end) = head {
linear_interpolation(ts, data, end, end + 1, 0, end)?;
}
if let Some(start) = tail {
linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
}
Ok(())
}
}
fn linear_interpolation(
ts: &[i64],
data: &mut [ScalarValue],
i1: usize,
i2: usize,
start: usize,
end: usize,
) -> DfResult<()> {
let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
(ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
(ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
(*y0 as f64, *y1 as f64, true)
}
_ => {
return Err(DataFusionError::Execution(
"RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
));
}
};
if x1 == x0 {
return Err(DataFusionError::Execution(
"RangePlan: Linear interpolation using the same coordinate points".to_string(),
));
}
for i in start..end {
let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
data[i] = if is_float32 {
ScalarValue::Float32(Some(val as f32))
} else {
ScalarValue::Float64(Some(val))
}
}
Ok(())
}
#[derive(Eq, Clone, Debug)]
pub struct RangeFn {
pub name: String,
pub data_type: DataType,
pub expr: Expr,
pub range: Duration,
pub fill: Option<Fill>,
pub need_cast: bool,
}
impl PartialEq for RangeFn {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
impl PartialOrd for RangeFn {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for RangeFn {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.name.cmp(&other.name)
}
}
impl std::hash::Hash for RangeFn {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
impl Display for RangeFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct RangeSelect {
pub input: Arc<LogicalPlan>,
pub range_expr: Vec<RangeFn>,
pub align: Duration,
pub align_to: i64,
pub time_index: String,
pub time_expr: Expr,
pub by: Vec<Expr>,
pub schema: DFSchemaRef,
pub by_schema: DFSchemaRef,
pub schema_project: Option<Vec<usize>>,
pub schema_before_project: DFSchemaRef,
}
impl RangeSelect {
pub fn try_new(
input: Arc<LogicalPlan>,
range_expr: Vec<RangeFn>,
align: Duration,
align_to: i64,
time_index: Expr,
by: Vec<Expr>,
projection_expr: &[Expr],
) -> Result<Self> {
ensure!(
align.as_millis() != 0,
RangeQuerySnafu {
msg: "Can't use 0 as align in Range Query"
}
);
for expr in &range_expr {
ensure!(
expr.range.as_millis() != 0,
RangeQuerySnafu {
msg: format!(
"Invalid Range expr `{}`, Can't use 0 as range in Range Query",
expr.name
)
}
);
}
let mut fields = range_expr
.iter()
.map(
|RangeFn {
name,
data_type,
fill,
..
}| {
let field = Field::new(
name,
data_type.clone(),
!matches!(fill, Some(Fill::Const(..))),
);
Ok((None, Arc::new(field)))
},
)
.collect::<DfResult<Vec<_>>>()
.context(DataFusionSnafu)?;
let ts_field = time_index
.to_field(input.schema().as_ref())
.context(DataFusionSnafu)?;
let time_index_name = ts_field.1.name().clone();
fields.push(ts_field);
let by_fields = exprlist_to_fields(&by, &input).context(DataFusionSnafu)?;
fields.extend(by_fields.clone());
let schema_before_project = Arc::new(
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())
.context(DataFusionSnafu)?,
);
let by_schema = Arc::new(
DFSchema::new_with_metadata(by_fields, input.schema().metadata().clone())
.context(DataFusionSnafu)?,
);
let schema_project = projection_expr
.iter()
.map(|project_expr| {
if let Expr::Column(column) = project_expr {
schema_before_project
.index_of_column_by_name(column.relation.as_ref(), &column.name)
.ok_or(())
} else {
let (qualifier, field) = project_expr
.to_field(input.schema().as_ref())
.map_err(|_| ())?;
schema_before_project
.index_of_column_by_name(qualifier.as_ref(), field.name())
.ok_or(())
}
})
.collect::<std::result::Result<Vec<usize>, ()>>()
.ok();
let schema = if let Some(project) = &schema_project {
let project_field = project
.iter()
.map(|i| {
let f = schema_before_project.qualified_field(*i);
(f.0.cloned(), Arc::new(f.1.clone()))
})
.collect();
Arc::new(
DFSchema::new_with_metadata(project_field, input.schema().metadata().clone())
.context(DataFusionSnafu)?,
)
} else {
schema_before_project.clone()
};
Ok(Self {
input,
range_expr,
align,
align_to,
time_index: time_index_name,
time_expr: time_index,
schema,
by_schema,
by,
schema_project,
schema_before_project,
})
}
}
impl UserDefinedLogicalNodeCore for RangeSelect {
fn name(&self) -> &str {
"RangeSelect"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
self.range_expr
.iter()
.map(|expr| expr.expr.clone())
.chain([self.time_expr.clone()])
.chain(self.by.clone())
.collect()
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
self.range_expr
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.align.as_millis(),
self.align_to,
self.by
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.time_index
)
}
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(DataFusionError::Plan(
"RangeSelect: inputs is empty".to_string(),
));
}
if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
return Err(DataFusionError::Plan(
"RangeSelect: exprs length not match".to_string(),
));
}
let range_expr = exprs
.iter()
.zip(self.range_expr.iter())
.map(|(e, range)| RangeFn {
name: range.name.clone(),
data_type: range.data_type.clone(),
expr: e.clone(),
range: range.range,
fill: range.fill.clone(),
need_cast: range.need_cast,
})
.collect();
let time_expr = exprs[self.range_expr.len()].clone();
let by = exprs[self.range_expr.len() + 1..].to_vec();
Ok(Self {
align: self.align,
align_to: self.align_to,
range_expr,
input: Arc::new(inputs[0].clone()),
time_index: self.time_index.clone(),
time_expr,
schema: self.schema.clone(),
by,
by_schema: self.by_schema.clone(),
schema_project: self.schema_project.clone(),
schema_before_project: self.schema_before_project.clone(),
})
}
}
impl RangeSelect {
fn create_physical_expr_list(
&self,
is_count_aggr: bool,
exprs: &[Expr],
df_schema: &Arc<DFSchema>,
session_state: &SessionState,
) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
exprs
.iter()
.map(|e| match e {
Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
&lit(COUNT_STAR_EXPANSION),
df_schema.as_ref(),
session_state.execution_props(),
),
_ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
})
.collect::<DfResult<Vec<_>>>()
}
pub fn to_execution_plan(
&self,
logical_input: &LogicalPlan,
exec_input: Arc<dyn ExecutionPlan>,
session_state: &SessionState,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let fields: Vec<_> = self
.schema_before_project
.fields()
.iter()
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
.collect();
let by_fields: Vec<_> = self
.by_schema
.fields()
.iter()
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
.collect();
let input_dfschema = logical_input.schema();
let input_schema = exec_input.schema();
let range_exec: Vec<RangeFnExec> = self
.range_expr
.iter()
.map(|range_fn| {
let name = range_fn.expr.display_name()?;
let range_expr = match &range_fn.expr {
Expr::Alias(expr) => expr.expr.as_ref(),
others => others,
};
let expr = match &range_expr {
Expr::AggregateFunction(aggr)
if (aggr.func_def.name() == "last_value"
|| aggr.func_def.name() == "first_value") =>
{
let is_last_value_func = aggr.func_def.name() == "last_value";
let order_by = if let Some(exprs) = &aggr.order_by {
exprs
.iter()
.map(|x| {
create_physical_sort_expr(
x,
input_dfschema.as_ref(),
session_state.execution_props(),
)
.map(|expr| {
if is_last_value_func {
PhysicalSortExpr {
expr: expr.expr,
options: SortOptions {
descending: !expr.options.descending,
nulls_first: !expr.options.nulls_first,
},
}
} else {
expr
}
})
})
.collect::<DfResult<Vec<_>>>()?
} else {
let time_index = create_physical_expr(
&self.time_expr,
input_dfschema.as_ref(),
session_state.execution_props(),
)?;
vec![PhysicalSortExpr {
expr: time_index,
options: SortOptions {
descending: is_last_value_func,
nulls_first: false,
},
}]
};
let arg = self.create_physical_expr_list(
false,
&aggr.args,
input_dfschema,
session_state,
)?;
Ok(RangeFirstListValue::new_aggregate_expr(
arg[0].clone(),
order_by,
))
}
Expr::AggregateFunction(aggr) => {
let order_by = if let Some(exprs) = &aggr.order_by {
exprs
.iter()
.map(|x| {
create_physical_sort_expr(
x,
input_dfschema.as_ref(),
session_state.execution_props(),
)
})
.collect::<DfResult<Vec<_>>>()?
} else {
vec![]
};
let distinct = aggr.distinct;
let input_phy_exprs = self.create_physical_expr_list(
matches!(
aggr.func_def,
AggregateFunctionDefinition::BuiltIn(AggregateFunction::Count,)
),
&aggr.args,
input_dfschema,
session_state,
)?;
match &aggr.func_def {
AggregateFunctionDefinition::BuiltIn(fun) => create_aggr_expr(
fun,
distinct,
&input_phy_exprs,
&order_by,
&input_schema,
name,
false,
),
AggregateFunctionDefinition::UDF(fun) => create_aggr_udf_expr(
fun,
&input_phy_exprs,
&[],
&[],
&input_schema,
name,
false,
distinct,
),
}
}
_ => Err(DataFusionError::Plan(format!(
"Unexpected Expr: {} in RangeSelect",
range_fn.expr.canonical_name()
))),
}?;
let args = expr.expressions();
Ok(RangeFnExec {
expr,
args,
range: range_fn.range.as_millis() as Millisecond,
fill: range_fn.fill.clone(),
need_cast: if range_fn.need_cast {
Some(range_fn.data_type.clone())
} else {
None
},
})
})
.collect::<DfResult<Vec<_>>>()?;
let schema_before_project = Arc::new(Schema::new(fields));
let schema = if let Some(project) = &self.schema_project {
Arc::new(schema_before_project.project(project)?)
} else {
schema_before_project.clone()
};
let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
let cache = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
Ok(Arc::new(RangeSelectExec {
input: exec_input,
range_exec,
align: self.align.as_millis() as Millisecond,
align_to: self.align_to,
by,
time_index: self.time_index.clone(),
schema,
by_schema: Arc::new(Schema::new(by_fields)),
metric: ExecutionPlanMetricsSet::new(),
schema_before_project,
schema_project: self.schema_project.clone(),
cache,
}))
}
}
#[derive(Debug, Clone)]
struct RangeFnExec {
pub expr: Arc<dyn AggregateExpr>,
pub args: Vec<Arc<dyn PhysicalExpr>>,
pub range: Millisecond,
pub fill: Option<Fill>,
pub need_cast: Option<DataType>,
}
impl Display for RangeFnExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(fill) = &self.fill {
write!(
f,
"{} RANGE {}s FILL {}",
self.expr.name(),
self.range / 1000,
fill
)
} else {
write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
}
}
}
#[derive(Debug)]
pub struct RangeSelectExec {
input: Arc<dyn ExecutionPlan>,
range_exec: Vec<RangeFnExec>,
align: Millisecond,
align_to: i64,
time_index: String,
by: Vec<Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
by_schema: SchemaRef,
metric: ExecutionPlanMetricsSet,
schema_project: Option<Vec<usize>>,
schema_before_project: SchemaRef,
cache: PlanProperties,
}
impl DisplayAs for RangeSelectExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "RangeSelectExec: ")?;
let range_expr_strs: Vec<String> =
self.range_exec.iter().map(RangeFnExec::to_string).collect();
let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
write!(
f,
"range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
range_expr_strs.join(", "),
self.align,
self.align_to,
by.join(", "),
self.time_index,
)?;
}
}
Ok(())
}
}
impl ExecutionPlan for RangeSelectExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
Ok(Arc::new(Self {
input: children[0].clone(),
range_exec: self.range_exec.clone(),
time_index: self.time_index.clone(),
by: self.by.clone(),
align: self.align,
align_to: self.align_to,
schema: self.schema.clone(),
by_schema: self.by_schema.clone(),
metric: self.metric.clone(),
schema_before_project: self.schema_before_project.clone(),
schema_project: self.schema_project.clone(),
cache: self.cache.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let input = self.input.execute(partition, context)?;
let schema = input.schema();
let time_index = schema
.column_with_name(&self.time_index)
.ok_or(DataFusionError::Execution(
"time index column not found".into(),
))?
.0;
let row_converter = RowConverter::new(
self.by_schema
.fields()
.iter()
.map(|f| SortField::new(f.data_type().clone()))
.collect(),
)?;
Ok(Box::pin(RangeSelectStream {
schema: self.schema.clone(),
range_exec: self.range_exec.clone(),
input,
random_state: RandomState::new(),
time_index,
align: self.align,
align_to: self.align_to,
by: self.by.clone(),
series_map: HashMap::new(),
exec_state: ExecutionState::ReadingInput,
num_not_null_rows: 0,
row_converter,
modify_map: HashMap::new(),
metric: baseline_metric,
schema_project: self.schema_project.clone(),
schema_before_project: self.schema_before_project.clone(),
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> DataFusionResult<Statistics> {
Ok(Statistics::new_unknown(self.schema.as_ref()))
}
}
struct RangeSelectStream {
schema: SchemaRef,
range_exec: Vec<RangeFnExec>,
input: SendableRecordBatchStream,
time_index: usize,
align: Millisecond,
align_to: i64,
by: Vec<Arc<dyn PhysicalExpr>>,
exec_state: ExecutionState,
row_converter: RowConverter,
random_state: RandomState,
series_map: HashMap<u64, SeriesState>,
modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
num_not_null_rows: usize,
metric: BaselineMetrics,
schema_project: Option<Vec<usize>>,
schema_before_project: SchemaRef,
}
#[derive(Debug)]
struct SeriesState {
row: OwnedRow,
align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
}
fn produce_align_time(
align_to: i64,
range: Millisecond,
align: Millisecond,
ts_column: &TimestampMillisecondArray,
by_columns_hash: &[u64],
modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
) {
modify_map.clear();
for (row, hash) in by_columns_hash.iter().enumerate() {
let ts = ts_column.value(row);
let ith_slot = (ts - align_to).div_floor(align);
let mut align_ts = ith_slot * align + align_to;
while align_ts <= ts && ts < align_ts + range {
modify_map
.entry((*hash, align_ts))
.or_default()
.push(row as u32);
align_ts -= align;
}
}
}
fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
let array = ScalarValue::iter_to_array(values.to_vec())?;
let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
for (i, value) in values.iter_mut().enumerate() {
*value = ScalarValue::try_from_array(&cast_array, i)?;
}
Ok(())
}
impl RangeSelectStream {
fn evaluate_many(
&self,
batch: &RecordBatch,
exprs: &[Arc<dyn PhysicalExpr>],
) -> DfResult<Vec<ArrayRef>> {
exprs
.iter()
.map(|expr| {
let value = expr.evaluate(batch)?;
value.into_array(batch.num_rows())
})
.collect::<DfResult<Vec<_>>>()
}
fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
let _timer = self.metric.elapsed_compute().timer();
let num_rows = batch.num_rows();
let by_arrays = self.evaluate_many(&batch, &self.by)?;
let mut hashes = vec![0; num_rows];
create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
let by_rows = self.row_converter.convert_columns(&by_arrays)?;
let mut ts_column = batch.column(self.time_index).clone();
if !matches!(
ts_column.data_type(),
DataType::Timestamp(TimeUnit::Millisecond, _)
) {
ts_column = compute::cast(
ts_column.as_ref(),
&DataType::Timestamp(TimeUnit::Millisecond, None),
)?;
}
let ts_column_ref = ts_column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
produce_align_time(
self.align_to,
self.range_exec[i].range,
self.align,
ts_column_ref,
&hashes,
&mut self.modify_map,
);
let mut modify_rows = UInt32Builder::with_capacity(0);
let mut modify_index = Vec::with_capacity(self.modify_map.len());
let mut offsets = vec![0];
let mut offset_so_far = 0;
for ((hash, ts), modify) in &self.modify_map {
modify_rows.append_slice(modify);
offset_so_far += modify.len();
offsets.push(offset_so_far);
modify_index.push((*hash, *ts, modify[0]));
}
let modify_rows = modify_rows.finish();
let args = get_arrayref_at_indices(&args, &modify_rows)?;
modify_index.iter().zip(offsets.windows(2)).try_for_each(
|((hash, ts, row), offset)| {
let (offset, length) = (offset[0], offset[1] - offset[0]);
let sliced_arrays: Vec<ArrayRef> = args
.iter()
.map(|array| array.slice(offset, length))
.collect();
let accumulators_map =
self.series_map.entry(*hash).or_insert_with(|| SeriesState {
row: by_rows.row(*row as usize).owned(),
align_ts_accumulator: BTreeMap::new(),
});
match accumulators_map.align_ts_accumulator.entry(*ts) {
Entry::Occupied(mut e) => {
let accumulators = e.get_mut();
accumulators[i].update_batch(&sliced_arrays)
}
Entry::Vacant(e) => {
self.num_not_null_rows += 1;
let mut accumulators = self
.range_exec
.iter()
.map(|range| range.expr.create_accumulator())
.collect::<DfResult<Vec<_>>>()?;
let result = accumulators[i].update_batch(&sliced_arrays);
e.insert(accumulators);
result
}
}
},
)?;
}
Ok(())
}
fn generate_output(&mut self) -> DfResult<RecordBatch> {
let _timer = self.metric.elapsed_compute().timer();
if self.series_map.is_empty() {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let mut columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
let mut all_scalar =
vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
let mut start_index = 0;
let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
let padding_values = self
.range_exec
.iter()
.map(|e| e.expr.create_accumulator()?.evaluate())
.collect::<DfResult<Vec<_>>>()?;
for SeriesState {
row,
align_ts_accumulator,
} in self.series_map.values_mut()
{
if align_ts_accumulator.is_empty() {
continue;
}
let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
let align_ts = if need_fill_output {
(begin_ts..=end_ts).step_by(self.align as usize).collect()
} else {
align_ts_accumulator.keys().copied().collect::<Vec<_>>()
};
for ts in &align_ts {
if let Some(slot) = align_ts_accumulator.get_mut(ts) {
for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
column.push(acc.evaluate()?);
}
} else {
for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
column.push(padding.clone())
}
}
}
ts_builder.append_slice(&align_ts);
for (
i,
RangeFnExec {
fill, need_cast, ..
},
) in self.range_exec.iter().enumerate()
{
let time_series_data =
&mut all_scalar[i][start_index..start_index + align_ts.len()];
if let Some(data_type) = need_cast {
cast_scalar_values(time_series_data, data_type)?;
}
if let Some(fill) = fill {
fill.apply_fill_strategy(&align_ts, time_series_data)?;
}
}
by_rows.resize(by_rows.len() + align_ts.len(), row.row());
start_index += align_ts.len();
}
for column_scalar in all_scalar {
columns.push(ScalarValue::iter_to_array(column_scalar)?);
}
let ts_column = ts_builder.finish();
let ts_column = compute::cast(
&ts_column,
self.schema_before_project.field(columns.len()).data_type(),
)?;
columns.push(ts_column);
columns.extend(self.row_converter.convert_rows(by_rows)?);
let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
let project_output = if let Some(project) = &self.schema_project {
output.project(project)?
} else {
output
};
Ok(project_output)
}
}
enum ExecutionState {
ReadingInput,
ProducingOutput,
Done,
}
impl RecordBatchStream for RangeSelectStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for RangeSelectStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.exec_state {
ExecutionState::ReadingInput => {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if let Err(e) = self.update_range_context(batch) {
return Poll::Ready(Some(Err(e)));
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
self.exec_state = ExecutionState::ProducingOutput;
}
}
}
ExecutionState::ProducingOutput => {
let result = self.generate_output();
return match result {
Ok(batch) => {
self.exec_state = ExecutionState::Done;
Poll::Ready(Some(Ok(batch)))
}
Err(error) => Poll::Ready(Some(Err(error))),
};
}
ExecutionState::Done => return Poll::Ready(None),
}
}
}
}
#[cfg(test)]
mod test {
macro_rules! nullable_array {
($builder:ident,) => {
};
($array_type:ident ; $($tail:tt)*) => {
paste::item! {
{
let mut builder = arrow::array::[<$array_type Builder>]::new();
nullable_array!(builder, $($tail)*);
builder.finish()
}
}
};
($builder:ident, null) => {
$builder.append_null();
};
($builder:ident, null, $($tail:tt)*) => {
$builder.append_null();
nullable_array!($builder, $($tail)*);
};
($builder:ident, $value:literal) => {
$builder.append_value($value);
};
($builder:ident, $value:literal, $($tail:tt)*) => {
$builder.append_value($value);
nullable_array!($builder, $($tail)*);
};
}
use arrow_schema::SortOptions;
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::SessionContext;
use datafusion_physical_expr::expressions::{self, Column};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
use super::*;
const TIME_INDEX_COLUMN: &str = "timestamp";
fn prepare_test_data(is_float: bool, is_gap: bool) -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new(
"value",
if is_float {
DataType::Float64
} else {
DataType::Int64
},
true,
),
Field::new("host", DataType::Utf8, true),
]));
let timestamp_column: Arc<dyn Array> = if !is_gap {
Arc::new(TimestampMillisecondArray::from(vec![
0, 5_000, 10_000, 15_000, 20_000, 0, 5_000, 10_000, 15_000, 20_000, ])) as _
} else {
Arc::new(TimestampMillisecondArray::from(vec![
0, 15_000, 0, 15_000, ])) as _
};
let mut host = vec!["host1"; timestamp_column.len() / 2];
host.extend(vec!["host2"; timestamp_column.len() / 2]);
let mut value_column: Arc<dyn Array> = if is_gap {
Arc::new(nullable_array!(Int64;
0, 6, 6, 12 )) as _
} else {
Arc::new(nullable_array!(Int64;
0, null, 1, null, 2, 3, null, 4, null, 5 )) as _
};
if is_float {
value_column =
cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
.unwrap();
}
let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, value_column, host_column],
)
.unwrap();
MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
async fn do_range_select_test(
range1: Millisecond,
range2: Millisecond,
align: Millisecond,
fill: Option<Fill>,
is_float: bool,
is_gap: bool,
expected: String,
) {
let data_type = if is_float {
DataType::Float64
} else {
DataType::Int64
};
let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
(Some(DataType::Float64), DataType::Float64)
} else {
(None, data_type.clone())
};
let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
let schema = Arc::new(Schema::new(vec![
Field::new("MIN(value)", schema_data_type.clone(), true),
Field::new("MAX(value)", schema_data_type, true),
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("host", DataType::Utf8, true),
]));
let cache = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let range_select_exec = Arc::new(RangeSelectExec {
input: memory_exec,
range_exec: vec![
RangeFnExec {
expr: Arc::new(expressions::Min::new(
Arc::new(Column::new("value", 1)),
"MIN(value)",
data_type.clone(),
)),
args: vec![Arc::new(Column::new("value", 1))],
range: range1,
fill: fill.clone(),
need_cast: need_cast.clone(),
},
RangeFnExec {
expr: Arc::new(expressions::Max::new(
Arc::new(Column::new("value", 1)),
"MAX(value)",
data_type,
)),
args: vec![Arc::new(Column::new("value", 1))],
range: range2,
fill,
need_cast,
},
],
align,
align_to: 0,
by: vec![Arc::new(Column::new("host", 2))],
time_index: TIME_INDEX_COLUMN.to_string(),
schema: schema.clone(),
schema_before_project: schema.clone(),
schema_project: None,
by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
metric: ExecutionPlanMetricsSet::new(),
cache,
});
let sort_exec = SortExec::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("host", 3)),
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
range_select_exec,
);
let session_context = SessionContext::default();
let result =
datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
.await
.unwrap();
let result_literal = arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn range_10s_align_1000s() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
10_000,
1_000_000,
Some(Fill::Null),
true,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_null() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 1.0 | | 1970-01-01T00:00:05 | host1 |\
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 2.0 | | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 4.0 | | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 5.0 | | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
5_000,
5_000,
Some(Fill::Null),
true,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_prev() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | | 1969-12-31T23:59:55 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 1.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 2.0 | 1.0 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 3.0 | | 1969-12-31T23:59:55 | host2 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 4.0 | 3.0 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 5.0 | 4.0 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
5_000,
5_000,
Some(Fill::Prev),
true,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_linear() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
5_000,
5_000,
Some(Fill::Linear),
true,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_integer_linear() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | -0.5 | 1969-12-31T23:59:55 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 1.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 2.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 3.0 | 2.5 | 1969-12-31T23:59:55 | host2 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 4.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 5.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
5_000,
5_000,
Some(Fill::Linear),
false,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_const() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 6.6 | 1969-12-31T23:59:55 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 1.0 | 6.6 | 1970-01-01T00:00:05 | host1 |\
\n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\
\n| 2.0 | 6.6 | 1970-01-01T00:00:15 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\
\n| 3.0 | 6.6 | 1969-12-31T23:59:55 | host2 |\
\n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\
\n| 4.0 | 6.6 | 1970-01-01T00:00:05 | host2 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\
\n| 5.0 | 6.6 | 1970-01-01T00:00:15 | host2 |\
\n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
10_000,
5_000,
5_000,
Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
true,
false,
expected,
)
.await;
}
#[tokio::test]
async fn range_fill_gap() {
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
\n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| | | 1970-01-01T00:00:05 | host1 |\
\n| | | 1970-01-01T00:00:10 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
\n| | | 1970-01-01T00:00:05 | host2 |\
\n| | | 1970-01-01T00:00:10 | host2 |\
\n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\
\n| 0.0 | 0.0 | 1970-01-01T00:00:10 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
\n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 2.0 | 2.0 | 1970-01-01T00:00:05 | host1 |\
\n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
\n| 8.0 | 8.0 | 1970-01-01T00:00:05 | host2 |\
\n| 10.0 | 10.0 | 1970-01-01T00:00:10 | host2 |\
\n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
5_000,
5_000,
5_000,
Some(Fill::Linear),
true,
true,
expected,
)
.await;
let expected = String::from(
"+------------+------------+---------------------+-------+\
\n| MIN(value) | MAX(value) | timestamp | host |\
\n+------------+------------+---------------------+-------+\
\n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:15 | host1 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:00 | host2 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:05 | host2 |\
\n| 6.0 | 6.0 | 1970-01-01T00:00:10 | host2 |\
\n| 12.0 | 12.0 | 1970-01-01T00:00:15 | host2 |\
\n+------------+------------+---------------------+-------+",
);
do_range_select_test(
5_000,
5_000,
5_000,
Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
true,
true,
expected,
)
.await;
}
#[test]
fn fill_test() {
assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
assert_eq!(
Fill::try_from_str("Linear", &DataType::Boolean)
.unwrap_err()
.to_string(),
"Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
);
assert_eq!(
Fill::try_from_str("WHAT", &DataType::UInt8)
.unwrap_err()
.to_string(),
"Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
);
assert_eq!(
Fill::try_from_str("8.0", &DataType::UInt8)
.unwrap_err()
.to_string(),
"Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
);
assert!(
Fill::try_from_str("8", &DataType::UInt8).unwrap()
== Some(Fill::Const(ScalarValue::UInt8(Some(8))))
);
let mut test1 = vec![
ScalarValue::UInt8(Some(8)),
ScalarValue::UInt8(None),
ScalarValue::UInt8(Some(9)),
];
Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(None));
Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
test1[1] = ScalarValue::UInt8(None);
Fill::Const(ScalarValue::UInt8(Some(10)))
.apply_fill_strategy(&[], &mut test1)
.unwrap();
assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
}
#[test]
fn test_fill_linear() {
let ts = vec![1, 2, 3, 4, 5];
let mut test = vec![
ScalarValue::Float32(Some(1.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(3.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(5.0)),
];
Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
let mut test1 = vec![
ScalarValue::Float32(None),
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(None),
ScalarValue::Float32(Some(4.0)),
ScalarValue::Float32(None),
];
Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
assert_eq!(test, test1);
let ts = vec![
1, 3, 8, 30, 88, 108, 128, ];
let mut test = vec![
ScalarValue::Float64(None),
ScalarValue::Float64(Some(1.0)),
ScalarValue::Float64(Some(11.0)),
ScalarValue::Float64(None),
ScalarValue::Float64(Some(10.0)),
ScalarValue::Float64(Some(5.0)),
ScalarValue::Float64(None),
];
Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
let data: Vec<_> = test
.into_iter()
.map(|x| {
let ScalarValue::Float64(Some(f)) = x else {
unreachable!()
};
f
})
.collect();
assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
let ts = vec![1];
let test = vec![ScalarValue::Float32(None)];
let mut test1 = test.clone();
Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
assert_eq!(test, test1);
}
#[test]
fn test_fist_last_accumulator() {
let mut acc = RangeFirstListValueAcc::new(vec![
SortOptions {
descending: true,
nulls_first: false,
},
SortOptions {
descending: false,
nulls_first: true,
},
]);
let batch1: Vec<Arc<dyn Array>> = vec![
Arc::new(nullable_array!(Float64;
0.0, null, 0.0, null, 1.0
)),
Arc::new(nullable_array!(Float64;
5.0, null, 4.0, null, 3.0
)),
Arc::new(nullable_array!(Int64;
1, 2, 3, 4, 5
)),
];
let batch2: Vec<Arc<dyn Array>> = vec![
Arc::new(nullable_array!(Float64;
3.0, 3.0, 3.0, 3.0, 3.0
)),
Arc::new(nullable_array!(Float64;
null,3.0, 3.0, 3.0, 3.0
)),
Arc::new(nullable_array!(Int64;
6, 7, 8, 9, 10
)),
];
acc.update_batch(&batch1).unwrap();
assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(5)));
acc.update_batch(&batch2).unwrap();
assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(6)));
}
}