common_function/scalars/geo/
s2.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use common_query::error::InvalidFuncArgsSnafu;
18use datafusion_common::ScalarValue;
19use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder, UInt64Builder};
20use datafusion_common::arrow::datatypes::{DataType, Float64Type};
21use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
22use derive_more::Display;
23use s2::cellid::{CellID, MAX_LEVEL};
24use s2::latlng::LatLng;
25use snafu::ensure;
26
27use crate::function::{Function, extract_args};
28use crate::scalars::geo::helpers;
29use crate::scalars::geo::helpers::ensure_and_coerce;
30
31static CELL_TYPES: LazyLock<Vec<DataType>> =
32    LazyLock::new(|| vec![DataType::Int64, DataType::UInt64]);
33
34static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
35    LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
36
37static LEVEL_TYPES: &[DataType] = datafusion_expr::type_coercion::aggregates::INTEGERS;
38
39/// Function that returns [s2] encoding cellid for a given geospatial coordinate.
40///
41/// [s2]: http://s2geometry.io
42#[derive(Clone, Debug, Display)]
43#[display("{}", self.name())]
44pub(crate) struct S2LatLngToCell {
45    signature: Signature,
46}
47
48impl Default for S2LatLngToCell {
49    fn default() -> Self {
50        let mut signatures = Vec::with_capacity(COORDINATE_TYPES.len());
51        for coord_type in COORDINATE_TYPES.as_slice() {
52            signatures.push(TypeSignature::Exact(vec![
53                // latitude
54                coord_type.clone(),
55                // longitude
56                coord_type.clone(),
57            ]));
58        }
59        Self {
60            signature: Signature::one_of(signatures, Volatility::Stable),
61        }
62    }
63}
64
65impl Function for S2LatLngToCell {
66    fn name(&self) -> &str {
67        "s2_latlng_to_cell"
68    }
69
70    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
71        Ok(DataType::UInt64)
72    }
73
74    fn signature(&self) -> &Signature {
75        &self.signature
76    }
77
78    fn invoke_with_args(
79        &self,
80        args: ScalarFunctionArgs,
81    ) -> datafusion_common::Result<ColumnarValue> {
82        let [arg0, arg1] = extract_args(self.name(), &args)?;
83
84        let arg0 = helpers::cast::<Float64Type>(&arg0)?;
85        let lat_vec = arg0.as_primitive::<Float64Type>();
86        let arg1 = helpers::cast::<Float64Type>(&arg1)?;
87        let lon_vec = arg1.as_primitive::<Float64Type>();
88
89        let size = lat_vec.len();
90        let mut builder = UInt64Builder::with_capacity(size);
91
92        for i in 0..size {
93            let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i));
94            let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i));
95
96            let result = match (lat, lon) {
97                (Some(lat), Some(lon)) => {
98                    let coord = LatLng::from_degrees(lat, lon);
99                    ensure!(
100                        coord.is_valid(),
101                        InvalidFuncArgsSnafu {
102                            err_msg: "The input coordinates are invalid",
103                        }
104                    );
105                    let cellid = CellID::from(coord);
106                    let encoded: u64 = cellid.0;
107                    Some(encoded)
108                }
109                _ => None,
110            };
111
112            builder.append_option(result);
113        }
114
115        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
116    }
117}
118
119/// Return the level of current s2 cell
120#[derive(Clone, Debug, Display)]
121#[display("{}", self.name())]
122pub(crate) struct S2CellLevel {
123    signature: Signature,
124}
125
126impl Default for S2CellLevel {
127    fn default() -> Self {
128        Self {
129            signature: signature_of_cell(),
130        }
131    }
132}
133
134impl Function for S2CellLevel {
135    fn name(&self) -> &str {
136        "s2_cell_level"
137    }
138
139    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
140        Ok(DataType::UInt64)
141    }
142
143    fn signature(&self) -> &Signature {
144        &self.signature
145    }
146
147    fn invoke_with_args(
148        &self,
149        args: ScalarFunctionArgs,
150    ) -> datafusion_common::Result<ColumnarValue> {
151        let [cell_vec] = extract_args(self.name(), &args)?;
152
153        let size = cell_vec.len();
154        let mut builder = UInt64Builder::with_capacity(size);
155
156        for i in 0..size {
157            let v = ScalarValue::try_from_array(&cell_vec, i)?;
158            let v = cell_from_value(v).map(|x| x.level());
159
160            builder.append_option(v);
161        }
162
163        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
164    }
165}
166
167/// Return the string presentation of the cell
168#[derive(Clone, Debug, Display)]
169#[display("{}", self.name())]
170pub(crate) struct S2CellToToken {
171    signature: Signature,
172}
173
174impl Default for S2CellToToken {
175    fn default() -> Self {
176        Self {
177            signature: signature_of_cell(),
178        }
179    }
180}
181
182impl Function for S2CellToToken {
183    fn name(&self) -> &str {
184        "s2_cell_to_token"
185    }
186
187    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
188        Ok(DataType::Utf8View)
189    }
190
191    fn signature(&self) -> &Signature {
192        &self.signature
193    }
194
195    fn invoke_with_args(
196        &self,
197        args: ScalarFunctionArgs,
198    ) -> datafusion_common::Result<ColumnarValue> {
199        let [cell_vec] = extract_args(self.name(), &args)?;
200
201        let size = cell_vec.len();
202        let mut builder = StringViewBuilder::with_capacity(size);
203
204        for i in 0..size {
205            let v = ScalarValue::try_from_array(&cell_vec, i)?;
206            let v = cell_from_value(v).map(|x| x.to_token());
207
208            builder.append_option(v.as_deref());
209        }
210
211        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
212    }
213}
214
215/// Return parent at given level of current s2 cell
216#[derive(Clone, Debug, Display)]
217#[display("{}", self.name())]
218pub(crate) struct S2CellParent {
219    signature: Signature,
220}
221
222impl Default for S2CellParent {
223    fn default() -> Self {
224        Self {
225            signature: signature_of_cell_and_level(),
226        }
227    }
228}
229
230impl Function for S2CellParent {
231    fn name(&self) -> &str {
232        "s2_cell_parent"
233    }
234
235    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
236        Ok(DataType::UInt64)
237    }
238
239    fn signature(&self) -> &Signature {
240        &self.signature
241    }
242
243    fn invoke_with_args(
244        &self,
245        args: ScalarFunctionArgs,
246    ) -> datafusion_common::Result<ColumnarValue> {
247        let [cell_vec, levels] = extract_args(self.name(), &args)?;
248
249        let size = cell_vec.len();
250        let mut builder = UInt64Builder::with_capacity(size);
251
252        for i in 0..size {
253            let cell = ScalarValue::try_from_array(&cell_vec, i).map(cell_from_value)?;
254            let level = ScalarValue::try_from_array(&levels, i).and_then(value_to_level)?;
255            let result = if let (Some(cell), Some(level)) = (cell, level) {
256                Some(cell.parent(level).0)
257            } else {
258                None
259            };
260
261            builder.append_option(result);
262        }
263
264        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
265    }
266}
267
268fn signature_of_cell() -> Signature {
269    let mut signatures = Vec::with_capacity(CELL_TYPES.len());
270    for cell_type in CELL_TYPES.as_slice() {
271        signatures.push(TypeSignature::Exact(vec![cell_type.clone()]));
272    }
273
274    Signature::one_of(signatures, Volatility::Stable)
275}
276
277fn signature_of_cell_and_level() -> Signature {
278    let mut signatures = Vec::with_capacity(CELL_TYPES.len() * LEVEL_TYPES.len());
279    for cell_type in CELL_TYPES.as_slice() {
280        for level_type in LEVEL_TYPES {
281            signatures.push(TypeSignature::Exact(vec![
282                cell_type.clone(),
283                level_type.clone(),
284            ]));
285        }
286    }
287    Signature::one_of(signatures, Volatility::Stable)
288}
289
290fn cell_from_value(v: ScalarValue) -> Option<CellID> {
291    match v {
292        ScalarValue::Int64(v) => v.map(|x| CellID(x as u64)),
293        ScalarValue::UInt64(v) => v.map(CellID),
294        _ => None,
295    }
296}
297
298fn value_to_level(v: ScalarValue) -> datafusion_common::Result<Option<u64>> {
299    match v {
300        ScalarValue::Int8(Some(v)) => ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i8, v as u64),
301        ScalarValue::Int16(Some(v)) => {
302            ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i16, v as u64)
303        }
304        ScalarValue::Int32(Some(v)) => {
305            ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i32, v as u64)
306        }
307        ScalarValue::Int64(Some(v)) => {
308            ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i64, v as u64)
309        }
310        ScalarValue::UInt8(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u8, v as u64),
311        ScalarValue::UInt16(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u16, v as u64),
312        ScalarValue::UInt32(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u32, v as u64),
313        ScalarValue::UInt64(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL, v),
314        _ => Ok(None),
315    }
316}