common_function/scalars/geo/
geohash.rs1use std::fmt;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, PlainError};
19use common_error::status_code::StatusCode;
20use common_query::error::{self, InvalidFuncArgsSnafu, Result};
21use datafusion::arrow::datatypes::Field;
22use datafusion_expr::type_coercion::aggregates::INTEGERS;
23use datafusion_expr::{Signature, TypeSignature, Volatility};
24use datatypes::arrow::datatypes::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::scalars::{Scalar, ScalarVectorBuilder};
27use datatypes::value::{ListValue, Value};
28use datatypes::vectors::{ListVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
29use geohash::Coord;
30use snafu::{ResultExt, ensure};
31
32use crate::function::{Function, FunctionContext};
33
34macro_rules! ensure_resolution_usize {
35 ($v: ident) => {
36 if !($v > 0 && $v <= 12) {
37 Err(BoxedError::new(PlainError::new(
38 format!("Invalid geohash resolution {}, expect value: [1, 12]", $v),
39 StatusCode::EngineExecuteQuery,
40 )))
41 .context(error::ExecuteSnafu)
42 } else {
43 Ok($v as usize)
44 }
45 };
46}
47
48fn try_into_resolution(v: Value) -> Result<usize> {
49 match v {
50 Value::Int8(v) => {
51 ensure_resolution_usize!(v)
52 }
53 Value::Int16(v) => {
54 ensure_resolution_usize!(v)
55 }
56 Value::Int32(v) => {
57 ensure_resolution_usize!(v)
58 }
59 Value::Int64(v) => {
60 ensure_resolution_usize!(v)
61 }
62 Value::UInt8(v) => {
63 ensure_resolution_usize!(v)
64 }
65 Value::UInt16(v) => {
66 ensure_resolution_usize!(v)
67 }
68 Value::UInt32(v) => {
69 ensure_resolution_usize!(v)
70 }
71 Value::UInt64(v) => {
72 ensure_resolution_usize!(v)
73 }
74 _ => unreachable!(),
75 }
76}
77
78#[derive(Clone, Debug, Default)]
80pub struct GeohashFunction;
81
82impl GeohashFunction {
83 const NAME: &'static str = "geohash";
84}
85
86impl Function for GeohashFunction {
87 fn name(&self) -> &str {
88 Self::NAME
89 }
90
91 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
92 Ok(DataType::Utf8)
93 }
94
95 fn signature(&self) -> Signature {
96 let mut signatures = Vec::new();
97 for coord_type in &[DataType::Float32, DataType::Float64] {
98 for resolution_type in INTEGERS {
99 signatures.push(TypeSignature::Exact(vec![
100 coord_type.clone(),
102 coord_type.clone(),
104 resolution_type.clone(),
106 ]));
107 }
108 }
109 Signature::one_of(signatures, Volatility::Stable)
110 }
111
112 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
113 ensure!(
114 columns.len() == 3,
115 InvalidFuncArgsSnafu {
116 err_msg: format!(
117 "The length of the args is not correct, expect 3, provided : {}",
118 columns.len()
119 ),
120 }
121 );
122
123 let lat_vec = &columns[0];
124 let lon_vec = &columns[1];
125 let resolution_vec = &columns[2];
126
127 let size = lat_vec.len();
128 let mut results = StringVectorBuilder::with_capacity(size);
129
130 for i in 0..size {
131 let lat = lat_vec.get(i).as_f64_lossy();
132 let lon = lon_vec.get(i).as_f64_lossy();
133 let r = try_into_resolution(resolution_vec.get(i))?;
134
135 let result = match (lat, lon) {
136 (Some(lat), Some(lon)) => {
137 let coord = Coord { x: lon, y: lat };
138 let encoded = geohash::encode(coord, r)
139 .map_err(|e| {
140 BoxedError::new(PlainError::new(
141 format!("Geohash error: {}", e),
142 StatusCode::EngineExecuteQuery,
143 ))
144 })
145 .context(error::ExecuteSnafu)?;
146 Some(encoded)
147 }
148 _ => None,
149 };
150
151 results.push(result.as_deref());
152 }
153
154 Ok(results.to_vector())
155 }
156}
157
158impl fmt::Display for GeohashFunction {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 write!(f, "{}", Self::NAME)
161 }
162}
163
164#[derive(Clone, Debug, Default)]
166pub struct GeohashNeighboursFunction;
167
168impl GeohashNeighboursFunction {
169 const NAME: &'static str = "geohash_neighbours";
170}
171
172impl Function for GeohashNeighboursFunction {
173 fn name(&self) -> &str {
174 GeohashNeighboursFunction::NAME
175 }
176
177 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
178 Ok(DataType::List(Arc::new(Field::new(
179 "x",
180 DataType::Utf8,
181 false,
182 ))))
183 }
184
185 fn signature(&self) -> Signature {
186 let mut signatures = Vec::new();
187 for coord_type in &[DataType::Float32, DataType::Float64] {
188 for resolution_type in INTEGERS {
189 signatures.push(TypeSignature::Exact(vec![
190 coord_type.clone(),
192 coord_type.clone(),
194 resolution_type.clone(),
196 ]));
197 }
198 }
199 Signature::one_of(signatures, Volatility::Stable)
200 }
201
202 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
203 ensure!(
204 columns.len() == 3,
205 InvalidFuncArgsSnafu {
206 err_msg: format!(
207 "The length of the args is not correct, expect 3, provided : {}",
208 columns.len()
209 ),
210 }
211 );
212
213 let lat_vec = &columns[0];
214 let lon_vec = &columns[1];
215 let resolution_vec = &columns[2];
216
217 let size = lat_vec.len();
218 let mut results =
219 ListVectorBuilder::with_type_capacity(ConcreteDataType::string_datatype(), size);
220
221 for i in 0..size {
222 let lat = lat_vec.get(i).as_f64_lossy();
223 let lon = lon_vec.get(i).as_f64_lossy();
224 let r = try_into_resolution(resolution_vec.get(i))?;
225
226 let result = match (lat, lon) {
227 (Some(lat), Some(lon)) => {
228 let coord = Coord { x: lon, y: lat };
229 let encoded = geohash::encode(coord, r)
230 .map_err(|e| {
231 BoxedError::new(PlainError::new(
232 format!("Geohash error: {}", e),
233 StatusCode::EngineExecuteQuery,
234 ))
235 })
236 .context(error::ExecuteSnafu)?;
237 let neighbours = geohash::neighbors(&encoded)
238 .map_err(|e| {
239 BoxedError::new(PlainError::new(
240 format!("Geohash error: {}", e),
241 StatusCode::EngineExecuteQuery,
242 ))
243 })
244 .context(error::ExecuteSnafu)?;
245 Some(ListValue::new(
246 vec![
247 neighbours.n,
248 neighbours.nw,
249 neighbours.w,
250 neighbours.sw,
251 neighbours.s,
252 neighbours.se,
253 neighbours.e,
254 neighbours.ne,
255 ]
256 .into_iter()
257 .map(Value::from)
258 .collect(),
259 ConcreteDataType::string_datatype(),
260 ))
261 }
262 _ => None,
263 };
264
265 if let Some(list_value) = result {
266 results.push(Some(list_value.as_scalar_ref()));
267 } else {
268 results.push(None);
269 }
270 }
271
272 Ok(results.to_vector())
273 }
274}
275
276impl fmt::Display for GeohashNeighboursFunction {
277 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278 write!(f, "{}", GeohashNeighboursFunction::NAME)
279 }
280}