common_function/scalars/geo/
measure.rs1use std::sync::Arc;
16
17use common_error::ext::{BoxedError, PlainError};
18use common_error::status_code::StatusCode;
19use common_query::error;
20use datafusion_common::arrow::array::{Array, AsArray, Float64Builder};
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
24use derive_more::Display;
25use geo::algorithm::line_measures::metric_spaces::Euclidean;
26use geo::{Area, Distance, Haversine};
27use geo_types::Geometry;
28use snafu::ResultExt;
29
30use crate::function::{Function, extract_args};
31use crate::scalars::geo::wkt::parse_wkt;
32
33#[derive(Clone, Debug, Display)]
35#[display("{}", self.name())]
36pub(crate) struct STDistance {
37 signature: Signature,
38}
39
40impl Default for STDistance {
41 fn default() -> Self {
42 Self {
43 signature: Signature::string(2, Volatility::Stable),
44 }
45 }
46}
47
48impl Function for STDistance {
49 fn name(&self) -> &str {
50 "st_distance"
51 }
52
53 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
54 Ok(DataType::Float64)
55 }
56
57 fn signature(&self) -> &Signature {
58 &self.signature
59 }
60
61 fn invoke_with_args(
62 &self,
63 args: ScalarFunctionArgs,
64 ) -> datafusion_common::Result<ColumnarValue> {
65 let [arg0, arg1] = extract_args(self.name(), &args)?;
66
67 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
68 let wkt_this_vec = arg0.as_string_view();
69 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
70 let wkt_that_vec = arg1.as_string_view();
71
72 let size = wkt_this_vec.len();
73 let mut builder = Float64Builder::with_capacity(size);
74
75 for i in 0..size {
76 let wkt_this = wkt_this_vec.is_valid(i).then(|| wkt_this_vec.value(i));
77 let wkt_that = wkt_that_vec.is_valid(i).then(|| wkt_that_vec.value(i));
78
79 let result = match (wkt_this, wkt_that) {
80 (Some(wkt_this), Some(wkt_that)) => {
81 let geom_this = parse_wkt(wkt_this)?;
82 let geom_that = parse_wkt(wkt_that)?;
83
84 Some(Euclidean::distance(&geom_this, &geom_that))
85 }
86 _ => None,
87 };
88
89 builder.append_option(result);
90 }
91
92 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
93 }
94}
95
96#[derive(Clone, Debug, Display)]
98#[display("{}", self.name())]
99pub(crate) struct STDistanceSphere {
100 signature: Signature,
101}
102
103impl Default for STDistanceSphere {
104 fn default() -> Self {
105 Self {
106 signature: Signature::string(2, Volatility::Stable),
107 }
108 }
109}
110
111impl Function for STDistanceSphere {
112 fn name(&self) -> &str {
113 "st_distance_sphere_m"
114 }
115
116 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
117 Ok(DataType::Float64)
118 }
119
120 fn signature(&self) -> &Signature {
121 &self.signature
122 }
123
124 fn invoke_with_args(
125 &self,
126 args: ScalarFunctionArgs,
127 ) -> datafusion_common::Result<ColumnarValue> {
128 let [arg0, arg1] = extract_args(self.name(), &args)?;
129
130 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
131 let wkt_this_vec = arg0.as_string_view();
132 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
133 let wkt_that_vec = arg1.as_string_view();
134
135 let size = wkt_this_vec.len();
136 let mut builder = Float64Builder::with_capacity(size);
137
138 for i in 0..size {
139 let wkt_this = wkt_this_vec.is_valid(i).then(|| wkt_this_vec.value(i));
140 let wkt_that = wkt_that_vec.is_valid(i).then(|| wkt_that_vec.value(i));
141
142 let result = match (wkt_this, wkt_that) {
143 (Some(wkt_this), Some(wkt_that)) => {
144 let geom_this = parse_wkt(wkt_this)?;
145 let geom_that = parse_wkt(wkt_that)?;
146
147 match (geom_this, geom_that) {
148 (Geometry::Point(this), Geometry::Point(that)) => {
149 Some(Haversine::distance(this, that))
150 }
151 _ => {
152 Err(BoxedError::new(PlainError::new(
153 "Great circle distance between non-point objects are not supported for now.".to_string(),
154 StatusCode::Unsupported,
155 ))).context(error::ExecuteSnafu)?
156 }
157 }
158 }
159 _ => None,
160 };
161
162 builder.append_option(result);
163 }
164
165 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
166 }
167}
168
169#[derive(Clone, Debug, Display)]
171#[display("{}", self.name())]
172pub(crate) struct STArea {
173 signature: Signature,
174}
175
176impl Default for STArea {
177 fn default() -> Self {
178 Self {
179 signature: Signature::string(1, Volatility::Stable),
180 }
181 }
182}
183
184impl Function for STArea {
185 fn name(&self) -> &str {
186 "st_area"
187 }
188
189 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
190 Ok(DataType::Float64)
191 }
192
193 fn signature(&self) -> &Signature {
194 &self.signature
195 }
196
197 fn invoke_with_args(
198 &self,
199 args: ScalarFunctionArgs,
200 ) -> datafusion_common::Result<ColumnarValue> {
201 let [arg0] = extract_args(self.name(), &args)?;
202
203 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
204 let wkt_vec = arg0.as_string_view();
205
206 let size = wkt_vec.len();
207 let mut builder = Float64Builder::with_capacity(size);
208
209 for i in 0..size {
210 let wkt = wkt_vec.is_valid(i).then(|| wkt_vec.value(i));
211
212 let result = if let Some(wkt) = wkt {
213 let geom = parse_wkt(wkt)?;
214 Some(geom.unsigned_area())
215 } else {
216 None
217 };
218
219 builder.append_option(result);
220 }
221
222 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
223 }
224}