common_function/scalars/geo/
s2.rs1use std::sync::{Arc, LazyLock};
16
17use common_query::error::InvalidFuncArgsSnafu;
18use datafusion_common::ScalarValue;
19use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder, UInt64Builder};
20use datafusion_common::arrow::datatypes::{DataType, Float64Type};
21use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
22use derive_more::Display;
23use s2::cellid::{CellID, MAX_LEVEL};
24use s2::latlng::LatLng;
25use snafu::ensure;
26
27use crate::function::{Function, extract_args};
28use crate::scalars::geo::helpers;
29use crate::scalars::geo::helpers::ensure_and_coerce;
30
31static CELL_TYPES: LazyLock<Vec<DataType>> =
32 LazyLock::new(|| vec![DataType::Int64, DataType::UInt64]);
33
34static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
35 LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
36
37static LEVEL_TYPES: &[DataType] = datafusion_expr::type_coercion::aggregates::INTEGERS;
38
39#[derive(Clone, Debug, Display)]
43#[display("{}", self.name())]
44pub(crate) struct S2LatLngToCell {
45 signature: Signature,
46}
47
48impl Default for S2LatLngToCell {
49 fn default() -> Self {
50 let mut signatures = Vec::with_capacity(COORDINATE_TYPES.len());
51 for coord_type in COORDINATE_TYPES.as_slice() {
52 signatures.push(TypeSignature::Exact(vec![
53 coord_type.clone(),
55 coord_type.clone(),
57 ]));
58 }
59 Self {
60 signature: Signature::one_of(signatures, Volatility::Stable),
61 }
62 }
63}
64
65impl Function for S2LatLngToCell {
66 fn name(&self) -> &str {
67 "s2_latlng_to_cell"
68 }
69
70 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
71 Ok(DataType::UInt64)
72 }
73
74 fn signature(&self) -> &Signature {
75 &self.signature
76 }
77
78 fn invoke_with_args(
79 &self,
80 args: ScalarFunctionArgs,
81 ) -> datafusion_common::Result<ColumnarValue> {
82 let [arg0, arg1] = extract_args(self.name(), &args)?;
83
84 let arg0 = helpers::cast::<Float64Type>(&arg0)?;
85 let lat_vec = arg0.as_primitive::<Float64Type>();
86 let arg1 = helpers::cast::<Float64Type>(&arg1)?;
87 let lon_vec = arg1.as_primitive::<Float64Type>();
88
89 let size = lat_vec.len();
90 let mut builder = UInt64Builder::with_capacity(size);
91
92 for i in 0..size {
93 let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i));
94 let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i));
95
96 let result = match (lat, lon) {
97 (Some(lat), Some(lon)) => {
98 let coord = LatLng::from_degrees(lat, lon);
99 ensure!(
100 coord.is_valid(),
101 InvalidFuncArgsSnafu {
102 err_msg: "The input coordinates are invalid",
103 }
104 );
105 let cellid = CellID::from(coord);
106 let encoded: u64 = cellid.0;
107 Some(encoded)
108 }
109 _ => None,
110 };
111
112 builder.append_option(result);
113 }
114
115 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
116 }
117}
118
119#[derive(Clone, Debug, Display)]
121#[display("{}", self.name())]
122pub(crate) struct S2CellLevel {
123 signature: Signature,
124}
125
126impl Default for S2CellLevel {
127 fn default() -> Self {
128 Self {
129 signature: signature_of_cell(),
130 }
131 }
132}
133
134impl Function for S2CellLevel {
135 fn name(&self) -> &str {
136 "s2_cell_level"
137 }
138
139 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
140 Ok(DataType::UInt64)
141 }
142
143 fn signature(&self) -> &Signature {
144 &self.signature
145 }
146
147 fn invoke_with_args(
148 &self,
149 args: ScalarFunctionArgs,
150 ) -> datafusion_common::Result<ColumnarValue> {
151 let [cell_vec] = extract_args(self.name(), &args)?;
152
153 let size = cell_vec.len();
154 let mut builder = UInt64Builder::with_capacity(size);
155
156 for i in 0..size {
157 let v = ScalarValue::try_from_array(&cell_vec, i)?;
158 let v = cell_from_value(v).map(|x| x.level());
159
160 builder.append_option(v);
161 }
162
163 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
164 }
165}
166
167#[derive(Clone, Debug, Display)]
169#[display("{}", self.name())]
170pub(crate) struct S2CellToToken {
171 signature: Signature,
172}
173
174impl Default for S2CellToToken {
175 fn default() -> Self {
176 Self {
177 signature: signature_of_cell(),
178 }
179 }
180}
181
182impl Function for S2CellToToken {
183 fn name(&self) -> &str {
184 "s2_cell_to_token"
185 }
186
187 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
188 Ok(DataType::Utf8View)
189 }
190
191 fn signature(&self) -> &Signature {
192 &self.signature
193 }
194
195 fn invoke_with_args(
196 &self,
197 args: ScalarFunctionArgs,
198 ) -> datafusion_common::Result<ColumnarValue> {
199 let [cell_vec] = extract_args(self.name(), &args)?;
200
201 let size = cell_vec.len();
202 let mut builder = StringViewBuilder::with_capacity(size);
203
204 for i in 0..size {
205 let v = ScalarValue::try_from_array(&cell_vec, i)?;
206 let v = cell_from_value(v).map(|x| x.to_token());
207
208 builder.append_option(v.as_deref());
209 }
210
211 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
212 }
213}
214
215#[derive(Clone, Debug, Display)]
217#[display("{}", self.name())]
218pub(crate) struct S2CellParent {
219 signature: Signature,
220}
221
222impl Default for S2CellParent {
223 fn default() -> Self {
224 Self {
225 signature: signature_of_cell_and_level(),
226 }
227 }
228}
229
230impl Function for S2CellParent {
231 fn name(&self) -> &str {
232 "s2_cell_parent"
233 }
234
235 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
236 Ok(DataType::UInt64)
237 }
238
239 fn signature(&self) -> &Signature {
240 &self.signature
241 }
242
243 fn invoke_with_args(
244 &self,
245 args: ScalarFunctionArgs,
246 ) -> datafusion_common::Result<ColumnarValue> {
247 let [cell_vec, levels] = extract_args(self.name(), &args)?;
248
249 let size = cell_vec.len();
250 let mut builder = UInt64Builder::with_capacity(size);
251
252 for i in 0..size {
253 let cell = ScalarValue::try_from_array(&cell_vec, i).map(cell_from_value)?;
254 let level = ScalarValue::try_from_array(&levels, i).and_then(value_to_level)?;
255 let result = if let (Some(cell), Some(level)) = (cell, level) {
256 Some(cell.parent(level).0)
257 } else {
258 None
259 };
260
261 builder.append_option(result);
262 }
263
264 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
265 }
266}
267
268fn signature_of_cell() -> Signature {
269 let mut signatures = Vec::with_capacity(CELL_TYPES.len());
270 for cell_type in CELL_TYPES.as_slice() {
271 signatures.push(TypeSignature::Exact(vec![cell_type.clone()]));
272 }
273
274 Signature::one_of(signatures, Volatility::Stable)
275}
276
277fn signature_of_cell_and_level() -> Signature {
278 let mut signatures = Vec::with_capacity(CELL_TYPES.len() * LEVEL_TYPES.len());
279 for cell_type in CELL_TYPES.as_slice() {
280 for level_type in LEVEL_TYPES {
281 signatures.push(TypeSignature::Exact(vec![
282 cell_type.clone(),
283 level_type.clone(),
284 ]));
285 }
286 }
287 Signature::one_of(signatures, Volatility::Stable)
288}
289
290fn cell_from_value(v: ScalarValue) -> Option<CellID> {
291 match v {
292 ScalarValue::Int64(v) => v.map(|x| CellID(x as u64)),
293 ScalarValue::UInt64(v) => v.map(CellID),
294 _ => None,
295 }
296}
297
298fn value_to_level(v: ScalarValue) -> datafusion_common::Result<Option<u64>> {
299 match v {
300 ScalarValue::Int8(Some(v)) => ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i8, v as u64),
301 ScalarValue::Int16(Some(v)) => {
302 ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i16, v as u64)
303 }
304 ScalarValue::Int32(Some(v)) => {
305 ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i32, v as u64)
306 }
307 ScalarValue::Int64(Some(v)) => {
308 ensure_and_coerce!(v >= 0 && v <= MAX_LEVEL as i64, v as u64)
309 }
310 ScalarValue::UInt8(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u8, v as u64),
311 ScalarValue::UInt16(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u16, v as u64),
312 ScalarValue::UInt32(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL as u32, v as u64),
313 ScalarValue::UInt64(Some(v)) => ensure_and_coerce!(v <= MAX_LEVEL, v),
314 _ => Ok(None),
315 }
316}