common_function/scalars/math/
rate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use common_query::error::{self, Result};
18use common_query::prelude::{Signature, Volatility};
19use datafusion::arrow::compute::kernels::numeric;
20use datatypes::arrow::compute::kernels::cast;
21use datatypes::arrow::datatypes::DataType;
22use datatypes::prelude::*;
23use datatypes::vectors::{Helper, VectorRef};
24use snafu::ResultExt;
25
26use crate::function::{Function, FunctionContext};
27
28/// generates rates from a sequence of adjacent data points.
29#[derive(Clone, Debug, Default)]
30pub struct RateFunction;
31
32impl fmt::Display for RateFunction {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        write!(f, "RATE")
35    }
36}
37
38impl Function for RateFunction {
39    fn name(&self) -> &str {
40        "prom_rate"
41    }
42
43    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
44        Ok(ConcreteDataType::float64_datatype())
45    }
46
47    fn signature(&self) -> Signature {
48        Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
49    }
50
51    fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
52        let val = &columns[0].to_arrow_array();
53        let val_0 = val.slice(0, val.len() - 1);
54        let val_1 = val.slice(1, val.len() - 1);
55        let dv = numeric::sub(&val_1, &val_0).context(error::ArrowComputeSnafu)?;
56        let ts = &columns[1].to_arrow_array();
57        let ts_0 = ts.slice(0, ts.len() - 1);
58        let ts_1 = ts.slice(1, ts.len() - 1);
59        let dt = numeric::sub(&ts_1, &ts_0).context(error::ArrowComputeSnafu)?;
60
61        let dv = cast::cast(&dv, &DataType::Float64).context(error::TypeCastSnafu {
62            typ: DataType::Float64,
63        })?;
64        let dt = cast::cast(&dt, &DataType::Float64).context(error::TypeCastSnafu {
65            typ: DataType::Float64,
66        })?;
67        let rate = numeric::div(&dv, &dt).context(error::ArrowComputeSnafu)?;
68        let v = Helper::try_into_vector(&rate).context(error::FromArrowArraySnafu)?;
69
70        Ok(v)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use std::sync::Arc;
77
78    use common_query::prelude::TypeSignature;
79    use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector};
80
81    use super::*;
82    #[test]
83    fn test_rate_function() {
84        let rate = RateFunction;
85        assert_eq!("prom_rate", rate.name());
86        assert_eq!(
87            ConcreteDataType::float64_datatype(),
88            rate.return_type(&[]).unwrap()
89        );
90        assert!(matches!(rate.signature(),
91                         Signature {
92                             type_signature: TypeSignature::Uniform(2, valid_types),
93                             volatility: Volatility::Immutable
94                         } if  valid_types == ConcreteDataType::numerics()
95        ));
96        let values = vec![1.0, 3.0, 6.0];
97        let ts = vec![0, 1, 2];
98
99        let args: Vec<VectorRef> = vec![
100            Arc::new(Float32Vector::from_vec(values)),
101            Arc::new(Int64Vector::from_vec(ts)),
102        ];
103        let vector = rate.eval(&FunctionContext::default(), &args).unwrap();
104        let expect: VectorRef = Arc::new(Float64Vector::from_vec(vec![2.0, 3.0]));
105        assert_eq!(expect, vector);
106    }
107}