common_function/scalars/
uddsketch_calc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of the scalar function `uddsketch_calc`.
16
17use std::fmt;
18use std::fmt::Display;
19use std::sync::Arc;
20
21use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
22use common_query::prelude::{Signature, Volatility};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Vector;
25use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
26use datatypes::vectors::{BinaryVector, Float64VectorBuilder, MutableVector, VectorRef};
27use snafu::OptionExt;
28use uddsketch::UDDSketch;
29
30use crate::function::{Function, FunctionContext};
31use crate::function_registry::FunctionRegistry;
32
33const NAME: &str = "uddsketch_calc";
34
35/// UddSketchCalcFunction implements the scalar function `uddsketch_calc`.
36///
37/// It accepts two arguments:
38/// 1. A percentile (as f64) for which to compute the estimated quantile (e.g. 0.95 for p95).
39/// 2. The serialized UDDSketch state, as produced by the aggregator (binary).
40///
41/// For each row, it deserializes the sketch and returns the computed quantile value.
42#[derive(Debug, Default)]
43pub struct UddSketchCalcFunction;
44
45impl UddSketchCalcFunction {
46    pub fn register(registry: &FunctionRegistry) {
47        registry.register(Arc::new(UddSketchCalcFunction));
48    }
49}
50
51impl Display for UddSketchCalcFunction {
52    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53        write!(f, "{}", NAME.to_ascii_uppercase())
54    }
55}
56
57impl Function for UddSketchCalcFunction {
58    fn name(&self) -> &str {
59        NAME
60    }
61
62    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
63        Ok(ConcreteDataType::float64_datatype())
64    }
65
66    fn signature(&self) -> Signature {
67        // First argument: percentile (float64)
68        // Second argument: UDDSketch state (binary)
69        Signature::exact(
70            vec![
71                ConcreteDataType::float64_datatype(),
72                ConcreteDataType::binary_datatype(),
73            ],
74            Volatility::Immutable,
75        )
76    }
77
78    fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
79        if columns.len() != 2 {
80            return InvalidFuncArgsSnafu {
81                err_msg: format!("uddsketch_calc expects 2 arguments, got {}", columns.len()),
82            }
83            .fail();
84        }
85
86        let perc_vec = &columns[0];
87        let sketch_vec = columns[1]
88            .as_any()
89            .downcast_ref::<BinaryVector>()
90            .with_context(|| DowncastVectorSnafu {
91                err_msg: format!("expect BinaryVector, got {}", columns[1].vector_type_name()),
92            })?;
93        let len = sketch_vec.len();
94        let mut builder = Float64VectorBuilder::with_capacity(len);
95
96        for i in 0..len {
97            let perc_opt = perc_vec.get(i).as_f64_lossy();
98            let sketch_opt = sketch_vec.get_data(i);
99
100            if sketch_opt.is_none() || perc_opt.is_none() {
101                builder.push_null();
102                continue;
103            }
104
105            let sketch_bytes = sketch_opt.unwrap();
106            let perc = perc_opt.unwrap();
107
108            // Deserialize the UDDSketch from its bincode representation
109            let sketch: UDDSketch = match bincode::deserialize(sketch_bytes) {
110                Ok(s) => s,
111                Err(e) => {
112                    common_telemetry::trace!("Failed to deserialize UDDSketch: {}", e);
113                    builder.push_null();
114                    continue;
115                }
116            };
117
118            // Check if the sketch is empty, if so, return null
119            // This is important to avoid panics when calling estimate_quantile on an empty sketch
120            // In practice, this will happen if input is all null
121            if sketch.bucket_iter().count() == 0 {
122                builder.push_null();
123                continue;
124            }
125            // Compute the estimated quantile from the sketch
126            let result = sketch.estimate_quantile(perc);
127            builder.push(Some(result));
128        }
129
130        Ok(builder.to_vector())
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use std::sync::Arc;
137
138    use datatypes::vectors::{BinaryVector, Float64Vector};
139
140    use super::*;
141
142    #[test]
143    fn test_uddsketch_calc_function() {
144        let function = UddSketchCalcFunction;
145        assert_eq!("uddsketch_calc", function.name());
146        assert_eq!(
147            ConcreteDataType::float64_datatype(),
148            function
149                .return_type(&[ConcreteDataType::float64_datatype()])
150                .unwrap()
151        );
152
153        // Create a test sketch
154        let mut sketch = UDDSketch::new(128, 0.01);
155        sketch.add_value(10.0);
156        sketch.add_value(20.0);
157        sketch.add_value(30.0);
158        sketch.add_value(40.0);
159        sketch.add_value(50.0);
160        sketch.add_value(60.0);
161        sketch.add_value(70.0);
162        sketch.add_value(80.0);
163        sketch.add_value(90.0);
164        sketch.add_value(100.0);
165
166        // Get expected values directly from the sketch
167        let expected_p50 = sketch.estimate_quantile(0.5);
168        let expected_p90 = sketch.estimate_quantile(0.9);
169        let expected_p95 = sketch.estimate_quantile(0.95);
170
171        let serialized = bincode::serialize(&sketch).unwrap();
172        let percentiles = vec![0.5, 0.9, 0.95];
173
174        let args: Vec<VectorRef> = vec![
175            Arc::new(Float64Vector::from_vec(percentiles.clone())),
176            Arc::new(BinaryVector::from(vec![Some(serialized.clone()); 3])),
177        ];
178
179        let result = function.eval(&FunctionContext::default(), &args).unwrap();
180        assert_eq!(result.len(), 3);
181
182        // Test median (p50)
183        assert!(
184            matches!(result.get(0), datatypes::value::Value::Float64(v) if (v - expected_p50).abs() < 1e-10)
185        );
186        // Test p90
187        assert!(
188            matches!(result.get(1), datatypes::value::Value::Float64(v) if (v - expected_p90).abs() < 1e-10)
189        );
190        // Test p95
191        assert!(
192            matches!(result.get(2), datatypes::value::Value::Float64(v) if (v - expected_p95).abs() < 1e-10)
193        );
194    }
195
196    #[test]
197    fn test_uddsketch_calc_function_errors() {
198        let function = UddSketchCalcFunction;
199
200        // Test with invalid number of arguments
201        let args: Vec<VectorRef> = vec![Arc::new(Float64Vector::from_vec(vec![0.95]))];
202        let result = function.eval(&FunctionContext::default(), &args);
203        assert!(result.is_err());
204        assert!(result
205            .unwrap_err()
206            .to_string()
207            .contains("uddsketch_calc expects 2 arguments"));
208
209        // Test with invalid binary data
210        let args: Vec<VectorRef> = vec![
211            Arc::new(Float64Vector::from_vec(vec![0.95])),
212            Arc::new(BinaryVector::from(vec![Some(vec![1, 2, 3])])), // Invalid binary data
213        ];
214        let result = function.eval(&FunctionContext::default(), &args).unwrap();
215        assert_eq!(result.len(), 1);
216        assert!(matches!(result.get(0), datatypes::value::Value::Null));
217    }
218}