common_function/scalars/
uddsketch_calc.rs1use std::fmt;
18use std::fmt::Display;
19
20use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
21use common_query::prelude::{Signature, Volatility};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::prelude::Vector;
24use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
25use datatypes::vectors::{BinaryVector, Float64VectorBuilder, MutableVector, VectorRef};
26use snafu::OptionExt;
27use uddsketch::UDDSketch;
28
29use crate::function::{Function, FunctionContext};
30use crate::function_registry::FunctionRegistry;
31
32const NAME: &str = "uddsketch_calc";
33
34#[derive(Debug, Default)]
42pub struct UddSketchCalcFunction;
43
44impl UddSketchCalcFunction {
45 pub fn register(registry: &FunctionRegistry) {
46 registry.register_scalar(UddSketchCalcFunction);
47 }
48}
49
50impl Display for UddSketchCalcFunction {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 write!(f, "{}", NAME.to_ascii_uppercase())
53 }
54}
55
56impl Function for UddSketchCalcFunction {
57 fn name(&self) -> &str {
58 NAME
59 }
60
61 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
62 Ok(ConcreteDataType::float64_datatype())
63 }
64
65 fn signature(&self) -> Signature {
66 Signature::exact(
69 vec![
70 ConcreteDataType::float64_datatype(),
71 ConcreteDataType::binary_datatype(),
72 ],
73 Volatility::Immutable,
74 )
75 }
76
77 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
78 if columns.len() != 2 {
79 return InvalidFuncArgsSnafu {
80 err_msg: format!("uddsketch_calc expects 2 arguments, got {}", columns.len()),
81 }
82 .fail();
83 }
84
85 let perc_vec = &columns[0];
86 let sketch_vec = columns[1]
87 .as_any()
88 .downcast_ref::<BinaryVector>()
89 .with_context(|| DowncastVectorSnafu {
90 err_msg: format!("expect BinaryVector, got {}", columns[1].vector_type_name()),
91 })?;
92 let len = sketch_vec.len();
93 let mut builder = Float64VectorBuilder::with_capacity(len);
94
95 for i in 0..len {
96 let perc_opt = perc_vec.get(i).as_f64_lossy();
97 let sketch_opt = sketch_vec.get_data(i);
98
99 if sketch_opt.is_none() || perc_opt.is_none() {
100 builder.push_null();
101 continue;
102 }
103
104 let sketch_bytes = sketch_opt.unwrap();
105 let perc = perc_opt.unwrap();
106
107 let sketch: UDDSketch = match bincode::deserialize(sketch_bytes) {
109 Ok(s) => s,
110 Err(e) => {
111 common_telemetry::trace!("Failed to deserialize UDDSketch: {}", e);
112 builder.push_null();
113 continue;
114 }
115 };
116
117 if sketch.bucket_iter().count() == 0 {
121 builder.push_null();
122 continue;
123 }
124 let result = sketch.estimate_quantile(perc);
126 builder.push(Some(result));
127 }
128
129 Ok(builder.to_vector())
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::sync::Arc;
136
137 use datatypes::vectors::{BinaryVector, Float64Vector};
138
139 use super::*;
140
141 #[test]
142 fn test_uddsketch_calc_function() {
143 let function = UddSketchCalcFunction;
144 assert_eq!("uddsketch_calc", function.name());
145 assert_eq!(
146 ConcreteDataType::float64_datatype(),
147 function
148 .return_type(&[ConcreteDataType::float64_datatype()])
149 .unwrap()
150 );
151
152 let mut sketch = UDDSketch::new(128, 0.01);
154 sketch.add_value(10.0);
155 sketch.add_value(20.0);
156 sketch.add_value(30.0);
157 sketch.add_value(40.0);
158 sketch.add_value(50.0);
159 sketch.add_value(60.0);
160 sketch.add_value(70.0);
161 sketch.add_value(80.0);
162 sketch.add_value(90.0);
163 sketch.add_value(100.0);
164
165 let expected_p50 = sketch.estimate_quantile(0.5);
167 let expected_p90 = sketch.estimate_quantile(0.9);
168 let expected_p95 = sketch.estimate_quantile(0.95);
169
170 let serialized = bincode::serialize(&sketch).unwrap();
171 let percentiles = vec![0.5, 0.9, 0.95];
172
173 let args: Vec<VectorRef> = vec![
174 Arc::new(Float64Vector::from_vec(percentiles.clone())),
175 Arc::new(BinaryVector::from(vec![Some(serialized.clone()); 3])),
176 ];
177
178 let result = function.eval(&FunctionContext::default(), &args).unwrap();
179 assert_eq!(result.len(), 3);
180
181 assert!(
183 matches!(result.get(0), datatypes::value::Value::Float64(v) if (v - expected_p50).abs() < 1e-10)
184 );
185 assert!(
187 matches!(result.get(1), datatypes::value::Value::Float64(v) if (v - expected_p90).abs() < 1e-10)
188 );
189 assert!(
191 matches!(result.get(2), datatypes::value::Value::Float64(v) if (v - expected_p95).abs() < 1e-10)
192 );
193 }
194
195 #[test]
196 fn test_uddsketch_calc_function_errors() {
197 let function = UddSketchCalcFunction;
198
199 let args: Vec<VectorRef> = vec![Arc::new(Float64Vector::from_vec(vec![0.95]))];
201 let result = function.eval(&FunctionContext::default(), &args);
202 assert!(result.is_err());
203 assert!(result
204 .unwrap_err()
205 .to_string()
206 .contains("uddsketch_calc expects 2 arguments"));
207
208 let args: Vec<VectorRef> = vec![
210 Arc::new(Float64Vector::from_vec(vec![0.95])),
211 Arc::new(BinaryVector::from(vec![Some(vec![1, 2, 3])])), ];
213 let result = function.eval(&FunctionContext::default(), &args).unwrap();
214 assert_eq!(result.len(), 1);
215 assert!(matches!(result.get(0), datatypes::value::Value::Null));
216 }
217}