common_function/scalars/geo/
wkt.rs1use std::sync::{Arc, LazyLock};
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_query::error::{self, Result};
20use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
21use datafusion_common::arrow::datatypes::{DataType, Float64Type};
22use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
23use derive_more::Display;
24use geo_types::{Geometry, Point};
25use snafu::ResultExt;
26use wkt::{ToWkt, TryFromWkt};
27
28use crate::function::{Function, extract_args};
29use crate::scalars::geo::helpers;
30
31static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
32 LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
33
34#[derive(Clone, Debug, Display)]
36#[display("{}", self.name())]
37pub(crate) struct LatLngToPointWkt {
38 signature: Signature,
39}
40
41impl Default for LatLngToPointWkt {
42 fn default() -> Self {
43 let mut signatures = Vec::new();
44 for coord_type in COORDINATE_TYPES.as_slice() {
45 signatures.push(TypeSignature::Exact(vec![
46 coord_type.clone(),
48 coord_type.clone(),
50 ]));
51 }
52 Self {
53 signature: Signature::one_of(signatures, Volatility::Stable),
54 }
55 }
56}
57
58impl Function for LatLngToPointWkt {
59 fn name(&self) -> &str {
60 "wkt_point_from_latlng"
61 }
62
63 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
64 Ok(DataType::Utf8View)
65 }
66
67 fn signature(&self) -> &Signature {
68 &self.signature
69 }
70
71 fn invoke_with_args(
72 &self,
73 args: ScalarFunctionArgs,
74 ) -> datafusion_common::Result<ColumnarValue> {
75 let [arg0, arg1] = extract_args(self.name(), &args)?;
76
77 let arg0 = helpers::cast::<Float64Type>(&arg0)?;
78 let lat_vec = arg0.as_primitive::<Float64Type>();
79 let arg1 = helpers::cast::<Float64Type>(&arg1)?;
80 let lng_vec = arg1.as_primitive::<Float64Type>();
81
82 let size = lat_vec.len();
83 let mut builder = StringViewBuilder::with_capacity(size);
84
85 for i in 0..size {
86 let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i));
87 let lng = lng_vec.is_valid(i).then(|| lng_vec.value(i));
88
89 let result = match (lat, lng) {
90 (Some(lat), Some(lng)) => Some(Point::new(lng, lat).wkt_string()),
91 _ => None,
92 };
93
94 builder.append_option(result.as_deref());
95 }
96
97 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
98 }
99}
100
101pub(super) fn parse_wkt(s: &str) -> Result<Geometry> {
102 Geometry::try_from_wkt_str(s)
103 .map_err(|e| {
104 BoxedError::new(PlainError::new(
105 format!("Fail to parse WKT: {}", e),
106 StatusCode::EngineExecuteQuery,
107 ))
108 })
109 .context(error::ExecuteSnafu)
110}