common_function/scalars/geo/
wkt.rs1use std::sync::LazyLock;
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_query::error::{self, Result};
20use datafusion_expr::{Signature, TypeSignature, Volatility};
21use datatypes::arrow::datatypes::DataType;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
24use derive_more::Display;
25use geo_types::{Geometry, Point};
26use snafu::ResultExt;
27use wkt::{ToWkt, TryFromWkt};
28
29use crate::function::{Function, FunctionContext};
30use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
31
32static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
33 LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
34
35#[derive(Clone, Debug, Default, Display)]
37#[display("{}", self.name())]
38pub struct LatLngToPointWkt;
39
40impl Function for LatLngToPointWkt {
41 fn name(&self) -> &str {
42 "wkt_point_from_latlng"
43 }
44
45 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
46 Ok(DataType::Utf8)
47 }
48
49 fn signature(&self) -> Signature {
50 let mut signatures = Vec::new();
51 for coord_type in COORDINATE_TYPES.as_slice() {
52 signatures.push(TypeSignature::Exact(vec![
53 coord_type.clone(),
55 coord_type.clone(),
57 ]));
58 }
59 Signature::one_of(signatures, Volatility::Stable)
60 }
61
62 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
63 ensure_columns_n!(columns, 2);
64
65 let lat_vec = &columns[0];
66 let lng_vec = &columns[1];
67
68 let size = lat_vec.len();
69 let mut results = StringVectorBuilder::with_capacity(size);
70
71 for i in 0..size {
72 let lat = lat_vec.get(i).as_f64_lossy();
73 let lng = lng_vec.get(i).as_f64_lossy();
74
75 let result = match (lat, lng) {
76 (Some(lat), Some(lng)) => Some(Point::new(lng, lat).wkt_string()),
77 _ => None,
78 };
79
80 results.push(result.as_deref());
81 }
82
83 Ok(results.to_vector())
84 }
85}
86
87pub(super) fn parse_wkt(s: &str) -> Result<Geometry> {
88 Geometry::try_from_wkt_str(s)
89 .map_err(|e| {
90 BoxedError::new(PlainError::new(
91 format!("Fail to parse WKT: {}", e),
92 StatusCode::EngineExecuteQuery,
93 ))
94 })
95 .context(error::ExecuteSnafu)
96}