common_function/scalars/math/
rate.rs1use std::fmt;
16
17use common_query::error::{self, Result};
18use common_query::prelude::{Signature, Volatility};
19use datafusion::arrow::compute::kernels::numeric;
20use datatypes::arrow::compute::kernels::cast;
21use datatypes::arrow::datatypes::DataType;
22use datatypes::prelude::*;
23use datatypes::vectors::{Helper, VectorRef};
24use snafu::ResultExt;
25
26use crate::function::{Function, FunctionContext};
27
28#[derive(Clone, Debug, Default)]
30pub struct RateFunction;
31
32impl fmt::Display for RateFunction {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "RATE")
35 }
36}
37
38impl Function for RateFunction {
39 fn name(&self) -> &str {
40 "prom_rate"
41 }
42
43 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
44 Ok(ConcreteDataType::float64_datatype())
45 }
46
47 fn signature(&self) -> Signature {
48 Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
49 }
50
51 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
52 let val = &columns[0].to_arrow_array();
53 let val_0 = val.slice(0, val.len() - 1);
54 let val_1 = val.slice(1, val.len() - 1);
55 let dv = numeric::sub(&val_1, &val_0).context(error::ArrowComputeSnafu)?;
56 let ts = &columns[1].to_arrow_array();
57 let ts_0 = ts.slice(0, ts.len() - 1);
58 let ts_1 = ts.slice(1, ts.len() - 1);
59 let dt = numeric::sub(&ts_1, &ts_0).context(error::ArrowComputeSnafu)?;
60
61 let dv = cast::cast(&dv, &DataType::Float64).context(error::TypeCastSnafu {
62 typ: DataType::Float64,
63 })?;
64 let dt = cast::cast(&dt, &DataType::Float64).context(error::TypeCastSnafu {
65 typ: DataType::Float64,
66 })?;
67 let rate = numeric::div(&dv, &dt).context(error::ArrowComputeSnafu)?;
68 let v = Helper::try_into_vector(&rate).context(error::FromArrowArraySnafu)?;
69
70 Ok(v)
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use std::sync::Arc;
77
78 use common_query::prelude::TypeSignature;
79 use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector};
80
81 use super::*;
82 #[test]
83 fn test_rate_function() {
84 let rate = RateFunction;
85 assert_eq!("prom_rate", rate.name());
86 assert_eq!(
87 ConcreteDataType::float64_datatype(),
88 rate.return_type(&[]).unwrap()
89 );
90 assert!(matches!(rate.signature(),
91 Signature {
92 type_signature: TypeSignature::Uniform(2, valid_types),
93 volatility: Volatility::Immutable
94 } if valid_types == ConcreteDataType::numerics()
95 ));
96 let values = vec![1.0, 3.0, 6.0];
97 let ts = vec![0, 1, 2];
98
99 let args: Vec<VectorRef> = vec![
100 Arc::new(Float32Vector::from_vec(values)),
101 Arc::new(Int64Vector::from_vec(ts)),
102 ];
103 let vector = rate.eval(&FunctionContext::default(), &args).unwrap();
104 let expect: VectorRef = Arc::new(Float64Vector::from_vec(vec![2.0, 3.0]));
105 assert_eq!(expect, vector);
106 }
107}