common_function/scalars/geo/
geohash.rs1use std::fmt;
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_query::error::{self, InvalidFuncArgsSnafu, Result};
20use common_query::prelude::{Signature, TypeSignature};
21use datafusion::logical_expr::Volatility;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::scalars::{Scalar, ScalarVectorBuilder};
24use datatypes::value::{ListValue, Value};
25use datatypes::vectors::{ListVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
26use geohash::Coord;
27use snafu::{ensure, ResultExt};
28
29use crate::function::{Function, FunctionContext};
30
31macro_rules! ensure_resolution_usize {
32 ($v: ident) => {
33 if !($v > 0 && $v <= 12) {
34 Err(BoxedError::new(PlainError::new(
35 format!("Invalid geohash resolution {}, expect value: [1, 12]", $v),
36 StatusCode::EngineExecuteQuery,
37 )))
38 .context(error::ExecuteSnafu)
39 } else {
40 Ok($v as usize)
41 }
42 };
43}
44
45fn try_into_resolution(v: Value) -> Result<usize> {
46 match v {
47 Value::Int8(v) => {
48 ensure_resolution_usize!(v)
49 }
50 Value::Int16(v) => {
51 ensure_resolution_usize!(v)
52 }
53 Value::Int32(v) => {
54 ensure_resolution_usize!(v)
55 }
56 Value::Int64(v) => {
57 ensure_resolution_usize!(v)
58 }
59 Value::UInt8(v) => {
60 ensure_resolution_usize!(v)
61 }
62 Value::UInt16(v) => {
63 ensure_resolution_usize!(v)
64 }
65 Value::UInt32(v) => {
66 ensure_resolution_usize!(v)
67 }
68 Value::UInt64(v) => {
69 ensure_resolution_usize!(v)
70 }
71 _ => unreachable!(),
72 }
73}
74
75#[derive(Clone, Debug, Default)]
77pub struct GeohashFunction;
78
79impl GeohashFunction {
80 const NAME: &'static str = "geohash";
81}
82
83impl Function for GeohashFunction {
84 fn name(&self) -> &str {
85 Self::NAME
86 }
87
88 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
89 Ok(ConcreteDataType::string_datatype())
90 }
91
92 fn signature(&self) -> Signature {
93 let mut signatures = Vec::new();
94 for coord_type in &[
95 ConcreteDataType::float32_datatype(),
96 ConcreteDataType::float64_datatype(),
97 ] {
98 for resolution_type in &[
99 ConcreteDataType::int8_datatype(),
100 ConcreteDataType::int16_datatype(),
101 ConcreteDataType::int32_datatype(),
102 ConcreteDataType::int64_datatype(),
103 ConcreteDataType::uint8_datatype(),
104 ConcreteDataType::uint16_datatype(),
105 ConcreteDataType::uint32_datatype(),
106 ConcreteDataType::uint64_datatype(),
107 ] {
108 signatures.push(TypeSignature::Exact(vec![
109 coord_type.clone(),
111 coord_type.clone(),
113 resolution_type.clone(),
115 ]));
116 }
117 }
118 Signature::one_of(signatures, Volatility::Stable)
119 }
120
121 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
122 ensure!(
123 columns.len() == 3,
124 InvalidFuncArgsSnafu {
125 err_msg: format!(
126 "The length of the args is not correct, expect 3, provided : {}",
127 columns.len()
128 ),
129 }
130 );
131
132 let lat_vec = &columns[0];
133 let lon_vec = &columns[1];
134 let resolution_vec = &columns[2];
135
136 let size = lat_vec.len();
137 let mut results = StringVectorBuilder::with_capacity(size);
138
139 for i in 0..size {
140 let lat = lat_vec.get(i).as_f64_lossy();
141 let lon = lon_vec.get(i).as_f64_lossy();
142 let r = try_into_resolution(resolution_vec.get(i))?;
143
144 let result = match (lat, lon) {
145 (Some(lat), Some(lon)) => {
146 let coord = Coord { x: lon, y: lat };
147 let encoded = geohash::encode(coord, r)
148 .map_err(|e| {
149 BoxedError::new(PlainError::new(
150 format!("Geohash error: {}", e),
151 StatusCode::EngineExecuteQuery,
152 ))
153 })
154 .context(error::ExecuteSnafu)?;
155 Some(encoded)
156 }
157 _ => None,
158 };
159
160 results.push(result.as_deref());
161 }
162
163 Ok(results.to_vector())
164 }
165}
166
167impl fmt::Display for GeohashFunction {
168 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
169 write!(f, "{}", Self::NAME)
170 }
171}
172
173#[derive(Clone, Debug, Default)]
175pub struct GeohashNeighboursFunction;
176
177impl GeohashNeighboursFunction {
178 const NAME: &'static str = "geohash_neighbours";
179}
180
181impl Function for GeohashNeighboursFunction {
182 fn name(&self) -> &str {
183 GeohashNeighboursFunction::NAME
184 }
185
186 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
187 Ok(ConcreteDataType::list_datatype(
188 ConcreteDataType::string_datatype(),
189 ))
190 }
191
192 fn signature(&self) -> Signature {
193 let mut signatures = Vec::new();
194 for coord_type in &[
195 ConcreteDataType::float32_datatype(),
196 ConcreteDataType::float64_datatype(),
197 ] {
198 for resolution_type in &[
199 ConcreteDataType::int8_datatype(),
200 ConcreteDataType::int16_datatype(),
201 ConcreteDataType::int32_datatype(),
202 ConcreteDataType::int64_datatype(),
203 ConcreteDataType::uint8_datatype(),
204 ConcreteDataType::uint16_datatype(),
205 ConcreteDataType::uint32_datatype(),
206 ConcreteDataType::uint64_datatype(),
207 ] {
208 signatures.push(TypeSignature::Exact(vec![
209 coord_type.clone(),
211 coord_type.clone(),
213 resolution_type.clone(),
215 ]));
216 }
217 }
218 Signature::one_of(signatures, Volatility::Stable)
219 }
220
221 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
222 ensure!(
223 columns.len() == 3,
224 InvalidFuncArgsSnafu {
225 err_msg: format!(
226 "The length of the args is not correct, expect 3, provided : {}",
227 columns.len()
228 ),
229 }
230 );
231
232 let lat_vec = &columns[0];
233 let lon_vec = &columns[1];
234 let resolution_vec = &columns[2];
235
236 let size = lat_vec.len();
237 let mut results =
238 ListVectorBuilder::with_type_capacity(ConcreteDataType::string_datatype(), size);
239
240 for i in 0..size {
241 let lat = lat_vec.get(i).as_f64_lossy();
242 let lon = lon_vec.get(i).as_f64_lossy();
243 let r = try_into_resolution(resolution_vec.get(i))?;
244
245 let result = match (lat, lon) {
246 (Some(lat), Some(lon)) => {
247 let coord = Coord { x: lon, y: lat };
248 let encoded = geohash::encode(coord, r)
249 .map_err(|e| {
250 BoxedError::new(PlainError::new(
251 format!("Geohash error: {}", e),
252 StatusCode::EngineExecuteQuery,
253 ))
254 })
255 .context(error::ExecuteSnafu)?;
256 let neighbours = geohash::neighbors(&encoded)
257 .map_err(|e| {
258 BoxedError::new(PlainError::new(
259 format!("Geohash error: {}", e),
260 StatusCode::EngineExecuteQuery,
261 ))
262 })
263 .context(error::ExecuteSnafu)?;
264 Some(ListValue::new(
265 vec![
266 neighbours.n,
267 neighbours.nw,
268 neighbours.w,
269 neighbours.sw,
270 neighbours.s,
271 neighbours.se,
272 neighbours.e,
273 neighbours.ne,
274 ]
275 .into_iter()
276 .map(Value::from)
277 .collect(),
278 ConcreteDataType::string_datatype(),
279 ))
280 }
281 _ => None,
282 };
283
284 if let Some(list_value) = result {
285 results.push(Some(list_value.as_scalar_ref()));
286 } else {
287 results.push(None);
288 }
289 }
290
291 Ok(results.to_vector())
292 }
293}
294
295impl fmt::Display for GeohashNeighboursFunction {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 write!(f, "{}", GeohashNeighboursFunction::NAME)
298 }
299}