common_function/scalars/geo/
encoding.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
20use common_query::error::{self, InvalidInputStateSnafu, Result};
21use common_query::logical_plan::accumulator::AggrFuncTypeStore;
22use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
23use common_query::prelude::AccumulatorCreatorFunction;
24use common_time::Timestamp;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::value::{ListValue, Value};
27use datatypes::vectors::VectorRef;
28use snafu::{ensure, ResultExt};
29
30use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
31
32/// Accumulator of lat, lng, timestamp tuples
33#[derive(Debug)]
34pub struct JsonPathAccumulator {
35    timestamp_type: ConcreteDataType,
36    lat: Vec<Option<f64>>,
37    lng: Vec<Option<f64>>,
38    timestamp: Vec<Option<Timestamp>>,
39}
40
41impl JsonPathAccumulator {
42    fn new(timestamp_type: ConcreteDataType) -> Self {
43        Self {
44            lat: Vec::default(),
45            lng: Vec::default(),
46            timestamp: Vec::default(),
47            timestamp_type,
48        }
49    }
50}
51
52impl Accumulator for JsonPathAccumulator {
53    fn state(&self) -> Result<Vec<Value>> {
54        Ok(vec![
55            Value::List(ListValue::new(
56                self.lat.iter().map(|i| Value::from(*i)).collect(),
57                ConcreteDataType::float64_datatype(),
58            )),
59            Value::List(ListValue::new(
60                self.lng.iter().map(|i| Value::from(*i)).collect(),
61                ConcreteDataType::float64_datatype(),
62            )),
63            Value::List(ListValue::new(
64                self.timestamp.iter().map(|i| Value::from(*i)).collect(),
65                self.timestamp_type.clone(),
66            )),
67        ])
68    }
69
70    fn update_batch(&mut self, columns: &[VectorRef]) -> Result<()> {
71        // update batch as in datafusion just provides the accumulator original
72        //  input.
73        //
74        // columns is vec of [`lat`, `lng`, `timestamp`]
75        // where
76        // - `lat` is a vector of `Value::Float64` or similar type. Each item in
77        //  the vector is a row in given dataset.
78        // - so on so forth for `lng` and `timestamp`
79        ensure_columns_n!(columns, 3);
80
81        let lat = &columns[0];
82        let lng = &columns[1];
83        let ts = &columns[2];
84
85        let size = lat.len();
86
87        for idx in 0..size {
88            self.lat.push(lat.get(idx).as_f64_lossy());
89            self.lng.push(lng.get(idx).as_f64_lossy());
90            self.timestamp.push(ts.get(idx).as_timestamp());
91        }
92
93        Ok(())
94    }
95
96    fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
97        // merge batch as in datafusion gives state accumulated from the data
98        //  returned from child accumulators' state() call
99        // In our particular implementation, the data structure is like
100        //
101        // states is vec of [`lat`, `lng`, `timestamp`]
102        // where
103        // - `lat` is a vector of `Value::List`. Each item in the list is all
104        //  coordinates from a child accumulator.
105        // - so on so forth for `lng` and `timestamp`
106
107        ensure_columns_n!(states, 3);
108
109        let lat_lists = &states[0];
110        let lng_lists = &states[1];
111        let ts_lists = &states[2];
112
113        let len = lat_lists.len();
114
115        for idx in 0..len {
116            if let Some(lat_list) = lat_lists
117                .get(idx)
118                .as_list()
119                .map_err(BoxedError::new)
120                .context(error::ExecuteSnafu)?
121            {
122                for v in lat_list.items() {
123                    self.lat.push(v.as_f64_lossy());
124                }
125            }
126
127            if let Some(lng_list) = lng_lists
128                .get(idx)
129                .as_list()
130                .map_err(BoxedError::new)
131                .context(error::ExecuteSnafu)?
132            {
133                for v in lng_list.items() {
134                    self.lng.push(v.as_f64_lossy());
135                }
136            }
137
138            if let Some(ts_list) = ts_lists
139                .get(idx)
140                .as_list()
141                .map_err(BoxedError::new)
142                .context(error::ExecuteSnafu)?
143            {
144                for v in ts_list.items() {
145                    self.timestamp.push(v.as_timestamp());
146                }
147            }
148        }
149
150        Ok(())
151    }
152
153    fn evaluate(&self) -> Result<Value> {
154        let mut work_vec: Vec<(&Option<f64>, &Option<f64>, &Option<Timestamp>)> = self
155            .lat
156            .iter()
157            .zip(self.lng.iter())
158            .zip(self.timestamp.iter())
159            .map(|((a, b), c)| (a, b, c))
160            .collect();
161
162        // sort by timestamp, we treat null timestamp as 0
163        work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or_else(|| Timestamp::new_second(0)));
164
165        let result = serde_json::to_string(
166            &work_vec
167                .into_iter()
168                // note that we transform to lng,lat for geojson compatibility
169                .map(|(lat, lng, _)| vec![lng, lat])
170                .collect::<Vec<Vec<&Option<f64>>>>(),
171        )
172        .map_err(|e| {
173            BoxedError::new(PlainError::new(
174                format!("Serialization failure: {}", e),
175                StatusCode::EngineExecuteQuery,
176            ))
177        })
178        .context(error::ExecuteSnafu)?;
179
180        Ok(Value::String(result.into()))
181    }
182}
183
184/// This function accept rows of lat, lng and timestamp, sort with timestamp and
185/// encoding them into a geojson-like path.
186///
187/// Example:
188///
189/// ```sql
190/// SELECT json_encode_path(lat, lon, timestamp) FROM table [group by ...];
191/// ```
192///
193#[as_aggr_func_creator]
194#[derive(Debug, Default, AggrFuncTypeStore)]
195pub struct JsonPathEncodeFunctionCreator {}
196
197impl AggregateFunctionCreator for JsonPathEncodeFunctionCreator {
198    fn creator(&self) -> AccumulatorCreatorFunction {
199        let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
200            let ts_type = types[2].clone();
201            Ok(Box::new(JsonPathAccumulator::new(ts_type)))
202        });
203
204        creator
205    }
206
207    fn output_type(&self) -> Result<ConcreteDataType> {
208        Ok(ConcreteDataType::string_datatype())
209    }
210
211    fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
212        let input_types = self.input_types()?;
213        ensure!(input_types.len() == 3, InvalidInputStateSnafu);
214
215        let timestamp_type = input_types[2].clone();
216
217        Ok(vec![
218            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
219            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
220            ConcreteDataType::list_datatype(timestamp_type),
221        ])
222    }
223}