common_function/scalars/
uddsketch_calc.rs1use std::fmt;
18use std::fmt::Display;
19use std::sync::Arc;
20
21use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
22use common_query::prelude::{Signature, Volatility};
23use datatypes::data_type::ConcreteDataType;
24use datatypes::prelude::Vector;
25use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
26use datatypes::vectors::{BinaryVector, Float64VectorBuilder, MutableVector, VectorRef};
27use snafu::OptionExt;
28use uddsketch::UDDSketch;
29
30use crate::function::{Function, FunctionContext};
31use crate::function_registry::FunctionRegistry;
32
33const NAME: &str = "uddsketch_calc";
34
35#[derive(Debug, Default)]
43pub struct UddSketchCalcFunction;
44
45impl UddSketchCalcFunction {
46 pub fn register(registry: &FunctionRegistry) {
47 registry.register(Arc::new(UddSketchCalcFunction));
48 }
49}
50
51impl Display for UddSketchCalcFunction {
52 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53 write!(f, "{}", NAME.to_ascii_uppercase())
54 }
55}
56
57impl Function for UddSketchCalcFunction {
58 fn name(&self) -> &str {
59 NAME
60 }
61
62 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
63 Ok(ConcreteDataType::float64_datatype())
64 }
65
66 fn signature(&self) -> Signature {
67 Signature::exact(
70 vec![
71 ConcreteDataType::float64_datatype(),
72 ConcreteDataType::binary_datatype(),
73 ],
74 Volatility::Immutable,
75 )
76 }
77
78 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
79 if columns.len() != 2 {
80 return InvalidFuncArgsSnafu {
81 err_msg: format!("uddsketch_calc expects 2 arguments, got {}", columns.len()),
82 }
83 .fail();
84 }
85
86 let perc_vec = &columns[0];
87 let sketch_vec = columns[1]
88 .as_any()
89 .downcast_ref::<BinaryVector>()
90 .with_context(|| DowncastVectorSnafu {
91 err_msg: format!("expect BinaryVector, got {}", columns[1].vector_type_name()),
92 })?;
93 let len = sketch_vec.len();
94 let mut builder = Float64VectorBuilder::with_capacity(len);
95
96 for i in 0..len {
97 let perc_opt = perc_vec.get(i).as_f64_lossy();
98 let sketch_opt = sketch_vec.get_data(i);
99
100 if sketch_opt.is_none() || perc_opt.is_none() {
101 builder.push_null();
102 continue;
103 }
104
105 let sketch_bytes = sketch_opt.unwrap();
106 let perc = perc_opt.unwrap();
107
108 let sketch: UDDSketch = match bincode::deserialize(sketch_bytes) {
110 Ok(s) => s,
111 Err(e) => {
112 common_telemetry::trace!("Failed to deserialize UDDSketch: {}", e);
113 builder.push_null();
114 continue;
115 }
116 };
117
118 if sketch.bucket_iter().count() == 0 {
122 builder.push_null();
123 continue;
124 }
125 let result = sketch.estimate_quantile(perc);
127 builder.push(Some(result));
128 }
129
130 Ok(builder.to_vector())
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use std::sync::Arc;
137
138 use datatypes::vectors::{BinaryVector, Float64Vector};
139
140 use super::*;
141
142 #[test]
143 fn test_uddsketch_calc_function() {
144 let function = UddSketchCalcFunction;
145 assert_eq!("uddsketch_calc", function.name());
146 assert_eq!(
147 ConcreteDataType::float64_datatype(),
148 function
149 .return_type(&[ConcreteDataType::float64_datatype()])
150 .unwrap()
151 );
152
153 let mut sketch = UDDSketch::new(128, 0.01);
155 sketch.add_value(10.0);
156 sketch.add_value(20.0);
157 sketch.add_value(30.0);
158 sketch.add_value(40.0);
159 sketch.add_value(50.0);
160 sketch.add_value(60.0);
161 sketch.add_value(70.0);
162 sketch.add_value(80.0);
163 sketch.add_value(90.0);
164 sketch.add_value(100.0);
165
166 let expected_p50 = sketch.estimate_quantile(0.5);
168 let expected_p90 = sketch.estimate_quantile(0.9);
169 let expected_p95 = sketch.estimate_quantile(0.95);
170
171 let serialized = bincode::serialize(&sketch).unwrap();
172 let percentiles = vec![0.5, 0.9, 0.95];
173
174 let args: Vec<VectorRef> = vec![
175 Arc::new(Float64Vector::from_vec(percentiles.clone())),
176 Arc::new(BinaryVector::from(vec![Some(serialized.clone()); 3])),
177 ];
178
179 let result = function.eval(&FunctionContext::default(), &args).unwrap();
180 assert_eq!(result.len(), 3);
181
182 assert!(
184 matches!(result.get(0), datatypes::value::Value::Float64(v) if (v - expected_p50).abs() < 1e-10)
185 );
186 assert!(
188 matches!(result.get(1), datatypes::value::Value::Float64(v) if (v - expected_p90).abs() < 1e-10)
189 );
190 assert!(
192 matches!(result.get(2), datatypes::value::Value::Float64(v) if (v - expected_p95).abs() < 1e-10)
193 );
194 }
195
196 #[test]
197 fn test_uddsketch_calc_function_errors() {
198 let function = UddSketchCalcFunction;
199
200 let args: Vec<VectorRef> = vec![Arc::new(Float64Vector::from_vec(vec![0.95]))];
202 let result = function.eval(&FunctionContext::default(), &args);
203 assert!(result.is_err());
204 assert!(result
205 .unwrap_err()
206 .to_string()
207 .contains("uddsketch_calc expects 2 arguments"));
208
209 let args: Vec<VectorRef> = vec![
211 Arc::new(Float64Vector::from_vec(vec![0.95])),
212 Arc::new(BinaryVector::from(vec![Some(vec![1, 2, 3])])), ];
214 let result = function.eval(&FunctionContext::default(), &args).unwrap();
215 assert_eq!(result.len(), 1);
216 assert!(matches!(result.get(0), datatypes::value::Value::Null));
217 }
218}