common_function/scalars/geo/
wkt.rs1use common_error::ext::{BoxedError, PlainError};
16use common_error::status_code::StatusCode;
17use common_query::error::{self, Result};
18use common_query::prelude::{Signature, TypeSignature};
19use datafusion::logical_expr::Volatility;
20use datatypes::prelude::ConcreteDataType;
21use datatypes::scalars::ScalarVectorBuilder;
22use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
23use derive_more::Display;
24use geo_types::{Geometry, Point};
25use once_cell::sync::Lazy;
26use snafu::ResultExt;
27use wkt::{ToWkt, TryFromWkt};
28
29use crate::function::{Function, FunctionContext};
30use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
31
32static COORDINATE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
33 vec![
34 ConcreteDataType::float32_datatype(),
35 ConcreteDataType::float64_datatype(),
36 ]
37});
38
39#[derive(Clone, Debug, Default, Display)]
41#[display("{}", self.name())]
42pub struct LatLngToPointWkt;
43
44impl Function for LatLngToPointWkt {
45 fn name(&self) -> &str {
46 "wkt_point_from_latlng"
47 }
48
49 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
50 Ok(ConcreteDataType::string_datatype())
51 }
52
53 fn signature(&self) -> Signature {
54 let mut signatures = Vec::new();
55 for coord_type in COORDINATE_TYPES.as_slice() {
56 signatures.push(TypeSignature::Exact(vec![
57 coord_type.clone(),
59 coord_type.clone(),
61 ]));
62 }
63 Signature::one_of(signatures, Volatility::Stable)
64 }
65
66 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
67 ensure_columns_n!(columns, 2);
68
69 let lat_vec = &columns[0];
70 let lng_vec = &columns[1];
71
72 let size = lat_vec.len();
73 let mut results = StringVectorBuilder::with_capacity(size);
74
75 for i in 0..size {
76 let lat = lat_vec.get(i).as_f64_lossy();
77 let lng = lng_vec.get(i).as_f64_lossy();
78
79 let result = match (lat, lng) {
80 (Some(lat), Some(lng)) => Some(Point::new(lng, lat).wkt_string()),
81 _ => None,
82 };
83
84 results.push(result.as_deref());
85 }
86
87 Ok(results.to_vector())
88 }
89}
90
91pub(super) fn parse_wkt(s: &str) -> Result<Geometry> {
92 Geometry::try_from_wkt_str(s)
93 .map_err(|e| {
94 BoxedError::new(PlainError::new(
95 format!("Fail to parse WKT: {}", e),
96 StatusCode::EngineExecuteQuery,
97 ))
98 })
99 .context(error::ExecuteSnafu)
100}