common_function/scalars/geo/
encoding.rs1use std::sync::Arc;
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
20use common_query::error::{self, InvalidInputStateSnafu, Result};
21use common_query::logical_plan::accumulator::AggrFuncTypeStore;
22use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
23use common_query::prelude::AccumulatorCreatorFunction;
24use common_time::Timestamp;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::value::{ListValue, Value};
27use datatypes::vectors::VectorRef;
28use snafu::{ensure, ResultExt};
29
30use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
31
32#[derive(Debug)]
34pub struct JsonPathAccumulator {
35 timestamp_type: ConcreteDataType,
36 lat: Vec<Option<f64>>,
37 lng: Vec<Option<f64>>,
38 timestamp: Vec<Option<Timestamp>>,
39}
40
41impl JsonPathAccumulator {
42 fn new(timestamp_type: ConcreteDataType) -> Self {
43 Self {
44 lat: Vec::default(),
45 lng: Vec::default(),
46 timestamp: Vec::default(),
47 timestamp_type,
48 }
49 }
50}
51
52impl Accumulator for JsonPathAccumulator {
53 fn state(&self) -> Result<Vec<Value>> {
54 Ok(vec![
55 Value::List(ListValue::new(
56 self.lat.iter().map(|i| Value::from(*i)).collect(),
57 ConcreteDataType::float64_datatype(),
58 )),
59 Value::List(ListValue::new(
60 self.lng.iter().map(|i| Value::from(*i)).collect(),
61 ConcreteDataType::float64_datatype(),
62 )),
63 Value::List(ListValue::new(
64 self.timestamp.iter().map(|i| Value::from(*i)).collect(),
65 self.timestamp_type.clone(),
66 )),
67 ])
68 }
69
70 fn update_batch(&mut self, columns: &[VectorRef]) -> Result<()> {
71 ensure_columns_n!(columns, 3);
80
81 let lat = &columns[0];
82 let lng = &columns[1];
83 let ts = &columns[2];
84
85 let size = lat.len();
86
87 for idx in 0..size {
88 self.lat.push(lat.get(idx).as_f64_lossy());
89 self.lng.push(lng.get(idx).as_f64_lossy());
90 self.timestamp.push(ts.get(idx).as_timestamp());
91 }
92
93 Ok(())
94 }
95
96 fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
97 ensure_columns_n!(states, 3);
108
109 let lat_lists = &states[0];
110 let lng_lists = &states[1];
111 let ts_lists = &states[2];
112
113 let len = lat_lists.len();
114
115 for idx in 0..len {
116 if let Some(lat_list) = lat_lists
117 .get(idx)
118 .as_list()
119 .map_err(BoxedError::new)
120 .context(error::ExecuteSnafu)?
121 {
122 for v in lat_list.items() {
123 self.lat.push(v.as_f64_lossy());
124 }
125 }
126
127 if let Some(lng_list) = lng_lists
128 .get(idx)
129 .as_list()
130 .map_err(BoxedError::new)
131 .context(error::ExecuteSnafu)?
132 {
133 for v in lng_list.items() {
134 self.lng.push(v.as_f64_lossy());
135 }
136 }
137
138 if let Some(ts_list) = ts_lists
139 .get(idx)
140 .as_list()
141 .map_err(BoxedError::new)
142 .context(error::ExecuteSnafu)?
143 {
144 for v in ts_list.items() {
145 self.timestamp.push(v.as_timestamp());
146 }
147 }
148 }
149
150 Ok(())
151 }
152
153 fn evaluate(&self) -> Result<Value> {
154 let mut work_vec: Vec<(&Option<f64>, &Option<f64>, &Option<Timestamp>)> = self
155 .lat
156 .iter()
157 .zip(self.lng.iter())
158 .zip(self.timestamp.iter())
159 .map(|((a, b), c)| (a, b, c))
160 .collect();
161
162 work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or_else(|| Timestamp::new_second(0)));
164
165 let result = serde_json::to_string(
166 &work_vec
167 .into_iter()
168 .map(|(lat, lng, _)| vec![lng, lat])
170 .collect::<Vec<Vec<&Option<f64>>>>(),
171 )
172 .map_err(|e| {
173 BoxedError::new(PlainError::new(
174 format!("Serialization failure: {}", e),
175 StatusCode::EngineExecuteQuery,
176 ))
177 })
178 .context(error::ExecuteSnafu)?;
179
180 Ok(Value::String(result.into()))
181 }
182}
183
184#[as_aggr_func_creator]
194#[derive(Debug, Default, AggrFuncTypeStore)]
195pub struct JsonPathEncodeFunctionCreator {}
196
197impl AggregateFunctionCreator for JsonPathEncodeFunctionCreator {
198 fn creator(&self) -> AccumulatorCreatorFunction {
199 let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
200 let ts_type = types[2].clone();
201 Ok(Box::new(JsonPathAccumulator::new(ts_type)))
202 });
203
204 creator
205 }
206
207 fn output_type(&self) -> Result<ConcreteDataType> {
208 Ok(ConcreteDataType::string_datatype())
209 }
210
211 fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
212 let input_types = self.input_types()?;
213 ensure!(input_types.len() == 3, InvalidInputStateSnafu);
214
215 let timestamp_type = input_types[2].clone();
216
217 Ok(vec![
218 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
219 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
220 ConcreteDataType::list_datatype(timestamp_type),
221 ])
222 }
223}