1use std::str::FromStr;
16use std::sync::{Arc, LazyLock};
17
18use common_error::ext::{BoxedError, PlainError};
19use common_error::status_code::StatusCode;
20use common_query::error::{self, Result};
21use datafusion::arrow::array::{
22 Array, ArrayRef, AsArray, BooleanBuilder, Float64Builder, Int32Builder, ListBuilder,
23 StringViewArray, StringViewBuilder, UInt8Builder, UInt64Builder,
24};
25use datafusion::arrow::compute;
26use datafusion::arrow::datatypes::{
27 ArrowPrimitiveType, Float64Type, Int64Type, UInt8Type, UInt64Type,
28};
29use datafusion::logical_expr::ColumnarValue;
30use datafusion_common::{DataFusionError, ScalarValue, utils};
31use datafusion_expr::type_coercion::aggregates::INTEGERS;
32use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
33use datatypes::arrow::datatypes::{DataType, Field};
34use derive_more::Display;
35use h3o::{CellIndex, LatLng, Resolution};
36use snafu::prelude::*;
37
38use crate::function::Function;
39
40static CELL_TYPES: LazyLock<Vec<DataType>> =
41 LazyLock::new(|| vec![DataType::Int64, DataType::UInt64, DataType::Utf8]);
42
43static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
44 LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
45
46static RESOLUTION_TYPES: &[DataType] = INTEGERS;
47
48static DISTANCE_TYPES: &[DataType] = INTEGERS;
49
50static POSITION_TYPES: &[DataType] = INTEGERS;
51
52#[derive(Clone, Debug, Default, Display)]
56#[display("{}", self.name())]
57pub struct H3LatLngToCell;
58
59impl Function for H3LatLngToCell {
60 fn name(&self) -> &str {
61 "h3_latlng_to_cell"
62 }
63
64 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
65 Ok(DataType::UInt64)
66 }
67
68 fn signature(&self) -> Signature {
69 let mut signatures = Vec::new();
70 for coord_type in COORDINATE_TYPES.as_slice() {
71 for resolution_type in RESOLUTION_TYPES {
72 signatures.push(TypeSignature::Exact(vec![
73 coord_type.clone(),
75 coord_type.clone(),
77 resolution_type.clone(),
79 ]));
80 }
81 }
82 Signature::one_of(signatures, Volatility::Stable)
83 }
84
85 fn invoke_with_args(
86 &self,
87 args: ScalarFunctionArgs,
88 ) -> datafusion_common::Result<ColumnarValue> {
89 let args = ColumnarValue::values_to_arrays(&args.args)?;
90 let [lat_vec, lon_vec, resolution_vec] = utils::take_function_args(self.name(), args)?;
91
92 let lat_vec = cast::<Float64Type>(&lat_vec)?;
93 let lat_vec = lat_vec.as_primitive::<Float64Type>();
94 let lon_vec = cast::<Float64Type>(&lon_vec)?;
95 let lon_vec = lon_vec.as_primitive::<Float64Type>();
96 let resolutions = cast::<UInt8Type>(&resolution_vec)?;
97 let resolution_vec = resolutions.as_primitive::<UInt8Type>();
98
99 let size = lat_vec.len();
100 let mut builder = UInt64Builder::with_capacity(size);
101
102 for i in 0..size {
103 let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i));
104 let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i));
105 let r = resolution_vec
106 .is_valid(i)
107 .then(|| value_to_resolution(resolution_vec.value(i)))
108 .transpose()?;
109
110 let result = match (lat, lon, r) {
111 (Some(lat), Some(lon), Some(r)) => {
112 let coord = LatLng::new(lat, lon)
113 .map_err(|e| {
114 BoxedError::new(PlainError::new(
115 format!("H3 error: {}", e),
116 StatusCode::EngineExecuteQuery,
117 ))
118 })
119 .context(error::ExecuteSnafu)?;
120 let encoded: u64 = coord.to_cell(r).into();
121 Some(encoded)
122 }
123 _ => None,
124 };
125
126 builder.append_option(result);
127 }
128
129 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
130 }
131}
132
133#[derive(Clone, Debug, Default, Display)]
138#[display("{}", self.name())]
139pub struct H3LatLngToCellString;
140
141impl Function for H3LatLngToCellString {
142 fn name(&self) -> &str {
143 "h3_latlng_to_cell_string"
144 }
145
146 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
147 Ok(DataType::Utf8View)
148 }
149
150 fn signature(&self) -> Signature {
151 let mut signatures = Vec::new();
152 for coord_type in COORDINATE_TYPES.as_slice() {
153 for resolution_type in RESOLUTION_TYPES {
154 signatures.push(TypeSignature::Exact(vec![
155 coord_type.clone(),
157 coord_type.clone(),
159 resolution_type.clone(),
161 ]));
162 }
163 }
164 Signature::one_of(signatures, Volatility::Stable)
165 }
166
167 fn invoke_with_args(
168 &self,
169 args: ScalarFunctionArgs,
170 ) -> datafusion_common::Result<ColumnarValue> {
171 let args = ColumnarValue::values_to_arrays(&args.args)?;
172 let [lat_vec, lon_vec, resolution_vec] = utils::take_function_args(self.name(), args)?;
173
174 let lat_vec = cast::<Float64Type>(&lat_vec)?;
175 let lat_vec = lat_vec.as_primitive::<Float64Type>();
176 let lon_vec = cast::<Float64Type>(&lon_vec)?;
177 let lon_vec = lon_vec.as_primitive::<Float64Type>();
178 let resolutions = cast::<UInt8Type>(&resolution_vec)?;
179 let resolution_vec = resolutions.as_primitive::<UInt8Type>();
180
181 let size = lat_vec.len();
182 let mut builder = StringViewBuilder::with_capacity(size);
183
184 for i in 0..size {
185 let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i));
186 let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i));
187 let r = resolution_vec
188 .is_valid(i)
189 .then(|| value_to_resolution(resolution_vec.value(i)))
190 .transpose()?;
191
192 let result = match (lat, lon, r) {
193 (Some(lat), Some(lon), Some(r)) => {
194 let coord = LatLng::new(lat, lon)
195 .map_err(|e| {
196 BoxedError::new(PlainError::new(
197 format!("H3 error: {}", e),
198 StatusCode::EngineExecuteQuery,
199 ))
200 })
201 .context(error::ExecuteSnafu)?;
202 let encoded = coord.to_cell(r).to_string();
203 Some(encoded)
204 }
205 _ => None,
206 };
207
208 builder.append_option(result);
209 }
210
211 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
212 }
213}
214
215#[derive(Clone, Debug, Default, Display)]
217#[display("{}", self.name())]
218pub struct H3CellToString;
219
220impl Function for H3CellToString {
221 fn name(&self) -> &str {
222 "h3_cell_to_string"
223 }
224
225 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
226 Ok(DataType::Utf8View)
227 }
228
229 fn signature(&self) -> Signature {
230 signature_of_cell()
231 }
232
233 fn invoke_with_args(
234 &self,
235 args: ScalarFunctionArgs,
236 ) -> datafusion_common::Result<ColumnarValue> {
237 let args = ColumnarValue::values_to_arrays(&args.args)?;
238 let [cell_vec] = utils::take_function_args(self.name(), args)?;
239
240 let size = cell_vec.len();
241 let mut builder = StringViewBuilder::with_capacity(size);
242
243 for i in 0..size {
244 let v = ScalarValue::try_from_array(&cell_vec, i)
245 .and_then(cell_from_value)?
246 .map(|x| x.to_string());
247 builder.append_option(v);
248 }
249
250 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
251 }
252}
253
254#[derive(Clone, Debug, Default, Display)]
256#[display("{}", self.name())]
257pub struct H3StringToCell;
258
259impl Function for H3StringToCell {
260 fn name(&self) -> &str {
261 "h3_string_to_cell"
262 }
263
264 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
265 Ok(DataType::UInt64)
266 }
267
268 fn signature(&self) -> Signature {
269 Signature::string(1, Volatility::Stable)
270 }
271
272 fn invoke_with_args(
273 &self,
274 args: ScalarFunctionArgs,
275 ) -> datafusion_common::Result<ColumnarValue> {
276 let args = ColumnarValue::values_to_arrays(&args.args)?;
277 let [string_vec] = utils::take_function_args(self.name(), args)?;
278 let string_vec = compute::cast(string_vec.as_ref(), &DataType::Utf8View)?;
279 let string_vec = datafusion_common::downcast_value!(string_vec, StringViewArray);
280
281 let size = string_vec.len();
282 let mut builder = UInt64Builder::with_capacity(size);
283
284 for i in 0..size {
285 let cell_id = string_vec
286 .is_valid(i)
287 .then(|| {
288 CellIndex::from_str(string_vec.value(i))
289 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
290 .map(Into::into)
291 })
292 .transpose()?;
293
294 builder.append_option(cell_id);
295 }
296
297 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
298 }
299}
300
301#[derive(Clone, Debug, Default, Display)]
303#[display("{}", self.name())]
304pub struct H3CellCenterLatLng;
305
306impl Function for H3CellCenterLatLng {
307 fn name(&self) -> &str {
308 "h3_cell_center_latlng"
309 }
310
311 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
312 Ok(DataType::List(Arc::new(Field::new(
313 "x",
314 DataType::Float64,
315 false,
316 ))))
317 }
318
319 fn signature(&self) -> Signature {
320 signature_of_cell()
321 }
322
323 fn invoke_with_args(
324 &self,
325 args: ScalarFunctionArgs,
326 ) -> datafusion_common::Result<ColumnarValue> {
327 let args = ColumnarValue::values_to_arrays(&args.args)?;
328 let [cell_vec] = utils::take_function_args(self.name(), args)?;
329
330 let size = cell_vec.len();
331 let mut builder = ListBuilder::new(Float64Builder::new());
332
333 for i in 0..size {
334 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
335 let latlng = cell.map(LatLng::from);
336
337 if let Some(latlng) = latlng {
338 builder.values().append_value(latlng.lat());
339 builder.values().append_value(latlng.lng());
340 builder.append(true);
341 } else {
342 builder.append_null();
343 }
344 }
345
346 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
347 }
348}
349
350#[derive(Clone, Debug, Default, Display)]
352#[display("{}", self.name())]
353pub struct H3CellResolution;
354
355impl Function for H3CellResolution {
356 fn name(&self) -> &str {
357 "h3_cell_resolution"
358 }
359
360 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
361 Ok(DataType::UInt8)
362 }
363
364 fn signature(&self) -> Signature {
365 signature_of_cell()
366 }
367
368 fn invoke_with_args(
369 &self,
370 args: ScalarFunctionArgs,
371 ) -> datafusion_common::Result<ColumnarValue> {
372 let args = ColumnarValue::values_to_arrays(&args.args)?;
373 let [cell_vec] = utils::take_function_args(self.name(), args)?;
374
375 let size = cell_vec.len();
376 let mut builder = UInt8Builder::with_capacity(cell_vec.len());
377
378 for i in 0..size {
379 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
380 let res = cell.map(|cell| cell.resolution().into());
381 builder.append_option(res);
382 }
383
384 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
385 }
386}
387
388#[derive(Clone, Debug, Default, Display)]
390#[display("{}", self.name())]
391pub struct H3CellBase;
392
393impl Function for H3CellBase {
394 fn name(&self) -> &str {
395 "h3_cell_base"
396 }
397
398 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
399 Ok(DataType::UInt8)
400 }
401
402 fn signature(&self) -> Signature {
403 signature_of_cell()
404 }
405
406 fn invoke_with_args(
407 &self,
408 args: ScalarFunctionArgs,
409 ) -> datafusion_common::Result<ColumnarValue> {
410 let args = ColumnarValue::values_to_arrays(&args.args)?;
411 let [cell_vec] = utils::take_function_args(self.name(), args)?;
412
413 let size = cell_vec.len();
414 let mut builder = UInt8Builder::with_capacity(size);
415
416 for i in 0..size {
417 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
418 let res = cell.map(|cell| cell.base_cell().into());
419
420 builder.append_option(res);
421 }
422
423 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
424 }
425}
426
427#[derive(Clone, Debug, Default, Display)]
429#[display("{}", self.name())]
430pub struct H3CellIsPentagon;
431
432impl Function for H3CellIsPentagon {
433 fn name(&self) -> &str {
434 "h3_cell_is_pentagon"
435 }
436
437 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
438 Ok(DataType::Boolean)
439 }
440
441 fn signature(&self) -> Signature {
442 signature_of_cell()
443 }
444
445 fn invoke_with_args(
446 &self,
447 args: ScalarFunctionArgs,
448 ) -> datafusion_common::Result<ColumnarValue> {
449 let args = ColumnarValue::values_to_arrays(&args.args)?;
450 let [cell_vec] = utils::take_function_args(self.name(), args)?;
451
452 let size = cell_vec.len();
453 let mut builder = BooleanBuilder::with_capacity(size);
454
455 for i in 0..size {
456 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
457 let res = cell.map(|cell| cell.is_pentagon());
458
459 builder.append_option(res);
460 }
461
462 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
463 }
464}
465
466#[derive(Clone, Debug, Default, Display)]
468#[display("{}", self.name())]
469pub struct H3CellCenterChild;
470
471impl Function for H3CellCenterChild {
472 fn name(&self) -> &str {
473 "h3_cell_center_child"
474 }
475
476 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
477 Ok(DataType::UInt64)
478 }
479
480 fn signature(&self) -> Signature {
481 signature_of_cell_and_resolution()
482 }
483
484 fn invoke_with_args(
485 &self,
486 args: ScalarFunctionArgs,
487 ) -> datafusion_common::Result<ColumnarValue> {
488 calculate_cell_child_property(self.name(), args, |cell, resolution| {
489 cell.center_child(resolution).map(Into::into)
490 })
491 }
492}
493
494#[derive(Clone, Debug, Default, Display)]
496#[display("{}", self.name())]
497pub struct H3CellParent;
498
499impl Function for H3CellParent {
500 fn name(&self) -> &str {
501 "h3_cell_parent"
502 }
503
504 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
505 Ok(DataType::UInt64)
506 }
507
508 fn signature(&self) -> Signature {
509 signature_of_cell_and_resolution()
510 }
511
512 fn invoke_with_args(
513 &self,
514 args: ScalarFunctionArgs,
515 ) -> datafusion_common::Result<ColumnarValue> {
516 calculate_cell_child_property(self.name(), args, |cell, resolution| {
517 cell.parent(resolution).map(Into::into)
518 })
519 }
520}
521
522#[derive(Clone, Debug, Default, Display)]
524#[display("{}", self.name())]
525pub struct H3CellToChildren;
526
527impl Function for H3CellToChildren {
528 fn name(&self) -> &str {
529 "h3_cell_to_children"
530 }
531
532 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
533 Ok(DataType::List(Arc::new(Field::new(
534 "item",
535 DataType::UInt64,
536 true,
537 ))))
538 }
539
540 fn signature(&self) -> Signature {
541 signature_of_cell_and_resolution()
542 }
543
544 fn invoke_with_args(
545 &self,
546 args: ScalarFunctionArgs,
547 ) -> datafusion_common::Result<ColumnarValue> {
548 let args = ColumnarValue::values_to_arrays(&args.args)?;
549 let [cell_vec, res_vec] = utils::take_function_args(self.name(), args)?;
550 let resolutions = cast::<UInt8Type>(&res_vec)?;
551 let resolutions = resolutions.as_primitive::<UInt8Type>();
552
553 let size = cell_vec.len();
554 let mut builder = ListBuilder::new(UInt64Builder::new());
555
556 for i in 0..size {
557 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
558 let resolution = resolutions
559 .is_valid(i)
560 .then(|| value_to_resolution(resolutions.value(i)))
561 .transpose()?;
562
563 match (cell, resolution) {
564 (Some(c), Some(r)) => {
565 for x in c.children(r) {
566 builder.values().append_value(u64::from(x));
567 }
568 builder.append(true);
569 }
570 _ => builder.append_null(),
571 }
572 }
573
574 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
575 }
576}
577
578#[derive(Clone, Debug, Default, Display)]
580#[display("{}", self.name())]
581pub struct H3CellToChildrenSize;
582
583impl Function for H3CellToChildrenSize {
584 fn name(&self) -> &str {
585 "h3_cell_to_children_size"
586 }
587
588 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
589 Ok(DataType::UInt64)
590 }
591
592 fn signature(&self) -> Signature {
593 signature_of_cell_and_resolution()
594 }
595
596 fn invoke_with_args(
597 &self,
598 args: ScalarFunctionArgs,
599 ) -> datafusion_common::Result<ColumnarValue> {
600 calculate_cell_child_property(self.name(), args, |cell, resolution| {
601 Some(cell.children_count(resolution))
602 })
603 }
604}
605
606#[derive(Clone, Debug, Default, Display)]
608#[display("{}", self.name())]
609pub struct H3CellToChildPos;
610
611impl Function for H3CellToChildPos {
612 fn name(&self) -> &str {
613 "h3_cell_to_child_pos"
614 }
615
616 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
617 Ok(DataType::UInt64)
618 }
619
620 fn signature(&self) -> Signature {
621 signature_of_cell_and_resolution()
622 }
623
624 fn invoke_with_args(
625 &self,
626 args: ScalarFunctionArgs,
627 ) -> datafusion_common::Result<ColumnarValue> {
628 calculate_cell_child_property(self.name(), args, |cell, resolution| {
629 cell.child_position(resolution)
630 })
631 }
632}
633
634fn calculate_cell_child_property<F>(
635 name: &str,
636 args: ScalarFunctionArgs,
637 calculator: F,
638) -> datafusion_common::Result<ColumnarValue>
639where
640 F: Fn(CellIndex, Resolution) -> Option<u64>,
641{
642 let args = ColumnarValue::values_to_arrays(&args.args)?;
643 let [cells, resolutions] = utils::take_function_args(name, args)?;
644 let resolutions = cast::<UInt8Type>(&resolutions)?;
645 let resolutions = resolutions.as_primitive::<UInt8Type>();
646
647 let mut builder = UInt64Builder::with_capacity(cells.len());
648 for i in 0..cells.len() {
649 let cell = ScalarValue::try_from_array(&cells, i).and_then(cell_from_value)?;
650 let resolution = resolutions
651 .is_valid(i)
652 .then(|| value_to_resolution(resolutions.value(i)))
653 .transpose()?;
654 let v = match (cell, resolution) {
655 (Some(c), Some(r)) => calculator(c, r),
656 _ => None,
657 };
658 builder.append_option(v);
659 }
660
661 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
662}
663
664#[derive(Clone, Debug, Default, Display)]
666#[display("{}", self.name())]
667pub struct H3ChildPosToCell;
668
669impl Function for H3ChildPosToCell {
670 fn name(&self) -> &str {
671 "h3_child_pos_to_cell"
672 }
673
674 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
675 Ok(DataType::UInt64)
676 }
677
678 fn signature(&self) -> Signature {
679 let mut signatures =
680 Vec::with_capacity(POSITION_TYPES.len() * CELL_TYPES.len() * RESOLUTION_TYPES.len());
681 for position_type in POSITION_TYPES {
682 for cell_type in CELL_TYPES.as_slice() {
683 for resolution_type in RESOLUTION_TYPES {
684 signatures.push(TypeSignature::Exact(vec![
685 position_type.clone(),
686 cell_type.clone(),
687 resolution_type.clone(),
688 ]));
689 }
690 }
691 }
692 Signature::one_of(signatures, Volatility::Stable)
693 }
694
695 fn invoke_with_args(
696 &self,
697 args: ScalarFunctionArgs,
698 ) -> datafusion_common::Result<ColumnarValue> {
699 let args = ColumnarValue::values_to_arrays(&args.args)?;
700 let [pos_vec, cell_vec, res_vec] = utils::take_function_args(self.name(), args)?;
701 let resolutions = cast::<UInt8Type>(&res_vec)?;
702 let resolutions = resolutions.as_primitive::<UInt8Type>();
703
704 let size = cell_vec.len();
705 let mut builder = UInt64Builder::with_capacity(size);
706
707 for i in 0..size {
708 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
709 let pos = ScalarValue::try_from_array(&pos_vec, i).and_then(value_to_position)?;
710 let resolution = resolutions
711 .is_valid(i)
712 .then(|| value_to_resolution(resolutions.value(i)))
713 .transpose()?;
714 let result = match (cell, resolution) {
715 (Some(c), Some(r)) => c.child_at(pos, r).map(u64::from),
716 _ => None,
717 };
718 builder.append_option(result);
719 }
720
721 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
722 }
723}
724
725fn cast<T: ArrowPrimitiveType>(array: &ArrayRef) -> datafusion_common::Result<ArrayRef> {
726 let x = compute::cast_with_options(
727 array.as_ref(),
728 &T::DATA_TYPE,
729 &compute::CastOptions {
730 safe: false,
731 ..Default::default()
732 },
733 )?;
734 Ok(x)
735}
736
737#[derive(Clone, Debug, Default, Display)]
739#[display("{}", self.name())]
740pub struct H3GridDisk;
741
742impl Function for H3GridDisk {
743 fn name(&self) -> &str {
744 "h3_grid_disk"
745 }
746
747 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
748 Ok(DataType::List(Arc::new(Field::new(
749 "item",
750 DataType::UInt64,
751 true,
752 ))))
753 }
754
755 fn signature(&self) -> Signature {
756 signature_of_cell_and_distance()
757 }
758
759 fn invoke_with_args(
760 &self,
761 args: ScalarFunctionArgs,
762 ) -> datafusion_common::Result<ColumnarValue> {
763 let args = ColumnarValue::values_to_arrays(&args.args)?;
764 let [cell_vec, k_vec] = utils::take_function_args(self.name(), args)?;
765
766 let size = cell_vec.len();
767 let mut builder = ListBuilder::new(UInt64Builder::new());
768
769 for i in 0..size {
770 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
771 let k = ScalarValue::try_from_array(&k_vec, i).and_then(value_to_distance)?;
772
773 if let Some(cell) = cell {
774 for x in cell.grid_disk::<Vec<_>>(k) {
775 builder.values().append_value(u64::from(x));
776 }
777 builder.append(true);
778 } else {
779 builder.append_null();
780 }
781 }
782
783 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
784 }
785}
786
787#[derive(Clone, Debug, Default, Display)]
789#[display("{}", self.name())]
790pub struct H3GridDiskDistances;
791
792impl Function for H3GridDiskDistances {
793 fn name(&self) -> &str {
794 "h3_grid_disk_distances"
795 }
796
797 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
798 Ok(DataType::List(Arc::new(Field::new(
799 "item",
800 DataType::UInt64,
801 true,
802 ))))
803 }
804
805 fn signature(&self) -> Signature {
806 signature_of_cell_and_distance()
807 }
808
809 fn invoke_with_args(
810 &self,
811 args: ScalarFunctionArgs,
812 ) -> datafusion_common::Result<ColumnarValue> {
813 let args = ColumnarValue::values_to_arrays(&args.args)?;
814 let [cell_vec, k_vec] = utils::take_function_args(self.name(), args)?;
815
816 let size = cell_vec.len();
817 let mut builder = ListBuilder::new(UInt64Builder::new());
818
819 for i in 0..size {
820 let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?;
821 let k = ScalarValue::try_from_array(&k_vec, i).and_then(value_to_distance)?;
822
823 if let Some(cell) = cell {
824 for (x, _) in cell.grid_disk_distances::<Vec<_>>(k) {
825 builder.values().append_value(u64::from(x));
826 }
827 builder.append(true);
828 } else {
829 builder.append_null();
830 }
831 }
832
833 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
834 }
835}
836
837#[derive(Clone, Debug, Default, Display)]
839#[display("{}", self.name())]
840pub struct H3GridDistance;
841
842impl Function for H3GridDistance {
843 fn name(&self) -> &str {
844 "h3_grid_distance"
845 }
846 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
847 Ok(DataType::Int32)
848 }
849
850 fn signature(&self) -> Signature {
851 signature_of_double_cells()
852 }
853
854 fn invoke_with_args(
855 &self,
856 args: ScalarFunctionArgs,
857 ) -> datafusion_common::Result<ColumnarValue> {
858 let args = ColumnarValue::values_to_arrays(&args.args)?;
859 let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?;
860
861 let size = cell_this_vec.len();
862 let mut builder = Int32Builder::with_capacity(size);
863
864 for i in 0..size {
865 let cell_this =
866 ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?;
867 let cell_that =
868 ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?;
869 let result = match (cell_this, cell_that) {
870 (Some(cell_this), Some(cell_that)) => {
871 let dist = cell_this
872 .grid_distance(cell_that)
873 .map_err(|e| {
874 BoxedError::new(PlainError::new(
875 format!("H3 error: {}", e),
876 StatusCode::EngineExecuteQuery,
877 ))
878 })
879 .context(error::ExecuteSnafu)?;
880 Some(dist)
881 }
882 _ => None,
883 };
884
885 builder.append_option(result);
886 }
887
888 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
889 }
890}
891
892#[derive(Clone, Debug, Default, Display)]
894#[display("{}", self.name())]
895pub struct H3GridPathCells;
896
897impl Function for H3GridPathCells {
898 fn name(&self) -> &str {
899 "h3_grid_path_cells"
900 }
901
902 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
903 Ok(DataType::List(Arc::new(Field::new(
904 "item",
905 DataType::UInt64,
906 true,
907 ))))
908 }
909
910 fn signature(&self) -> Signature {
911 signature_of_double_cells()
912 }
913
914 fn invoke_with_args(
915 &self,
916 args: ScalarFunctionArgs,
917 ) -> datafusion_common::Result<ColumnarValue> {
918 let args = ColumnarValue::values_to_arrays(&args.args)?;
919 let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?;
920
921 let size = cell_this_vec.len();
922 let mut builder = ListBuilder::new(UInt64Builder::new());
923
924 for i in 0..size {
925 let cell_this =
926 ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?;
927 let cell_that =
928 ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?;
929 match (cell_this, cell_that) {
930 (Some(cell_this), Some(cell_that)) => {
931 let cells = cell_this
932 .grid_path_cells(cell_that)
933 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))?;
934 for cell in cells {
935 let cell = cell
936 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))?;
937 builder.values().append_value(u64::from(cell));
938 }
939 builder.append(true);
940 }
941 _ => builder.append_null(),
942 };
943 }
944
945 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
946 }
947}
948
949#[derive(Clone, Debug, Default, Display)]
951#[display("{}", self.name())]
952pub struct H3CellContains;
953
954impl Function for H3CellContains {
955 fn name(&self) -> &str {
956 "h3_cells_contains"
957 }
958
959 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
960 Ok(DataType::Boolean)
961 }
962
963 fn signature(&self) -> Signature {
964 let multi_cell_types = vec![
965 DataType::new_list(DataType::Int64, true),
966 DataType::new_list(DataType::UInt64, true),
967 DataType::new_list(DataType::Utf8, true),
968 DataType::Utf8,
969 ];
970
971 let mut signatures = Vec::with_capacity(multi_cell_types.len() * CELL_TYPES.len());
972 for multi_cell_type in &multi_cell_types {
973 for cell_type in CELL_TYPES.as_slice() {
974 signatures.push(TypeSignature::Exact(vec![
975 multi_cell_type.clone(),
976 cell_type.clone(),
977 ]));
978 }
979 }
980
981 Signature::one_of(signatures, Volatility::Stable)
982 }
983
984 fn invoke_with_args(
985 &self,
986 args: ScalarFunctionArgs,
987 ) -> datafusion_common::Result<ColumnarValue> {
988 let args = ColumnarValue::values_to_arrays(&args.args)?;
989 let [cells_vec, cell_this_vec] = utils::take_function_args(self.name(), args)?;
990
991 let size = cell_this_vec.len();
992 let mut builder = BooleanBuilder::with_capacity(size);
993
994 for i in 0..size {
995 let cells = ScalarValue::try_from_array(&cells_vec, i).and_then(cells_from_value)?;
996 let cell_this =
997 ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?;
998 let mut result = None;
999 if let (cells, Some(cell_this)) = (cells, cell_this) {
1000 result = Some(false);
1001
1002 for cell_that in cells.iter() {
1003 let resolution = cell_that.resolution();
1006 if let Some(cell_this_parent) = cell_this.parent(resolution)
1007 && cell_this_parent == *cell_that
1008 {
1009 result = Some(true);
1010 break;
1011 }
1012 }
1013 }
1014
1015 builder.append_option(result);
1016 }
1017
1018 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1019 }
1020}
1021
1022#[derive(Clone, Debug, Default, Display)]
1024#[display("{}", self.name())]
1025pub struct H3CellDistanceSphereKm;
1026
1027impl Function for H3CellDistanceSphereKm {
1028 fn name(&self) -> &str {
1029 "h3_distance_sphere_km"
1030 }
1031 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
1032 Ok(DataType::Float64)
1033 }
1034
1035 fn signature(&self) -> Signature {
1036 signature_of_double_cells()
1037 }
1038
1039 fn invoke_with_args(
1040 &self,
1041 args: ScalarFunctionArgs,
1042 ) -> datafusion_common::Result<ColumnarValue> {
1043 let args = ColumnarValue::values_to_arrays(&args.args)?;
1044 let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?;
1045
1046 let size = cell_this_vec.len();
1047 let mut builder = Float64Builder::with_capacity(size);
1048
1049 for i in 0..size {
1050 let cell_this =
1051 ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?;
1052 let cell_that =
1053 ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?;
1054 let result = match (cell_this, cell_that) {
1055 (Some(cell_this), Some(cell_that)) => {
1056 let centroid_this = LatLng::from(cell_this);
1057 let centroid_that = LatLng::from(cell_that);
1058
1059 Some(centroid_this.distance_km(centroid_that))
1060 }
1061 _ => None,
1062 };
1063
1064 builder.append_option(result);
1065 }
1066
1067 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1068 }
1069}
1070
1071#[derive(Clone, Debug, Default, Display)]
1073#[display("{}", self.name())]
1074pub struct H3CellDistanceEuclideanDegree;
1075
1076impl H3CellDistanceEuclideanDegree {
1077 fn distance(centroid_this: LatLng, centroid_that: LatLng) -> f64 {
1078 ((centroid_this.lat() - centroid_that.lat()).powi(2)
1079 + (centroid_this.lng() - centroid_that.lng()).powi(2))
1080 .sqrt()
1081 }
1082}
1083
1084impl Function for H3CellDistanceEuclideanDegree {
1085 fn name(&self) -> &str {
1086 "h3_distance_degree"
1087 }
1088 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
1089 Ok(DataType::Float64)
1090 }
1091
1092 fn signature(&self) -> Signature {
1093 signature_of_double_cells()
1094 }
1095
1096 fn invoke_with_args(
1097 &self,
1098 args: ScalarFunctionArgs,
1099 ) -> datafusion_common::Result<ColumnarValue> {
1100 let args = ColumnarValue::values_to_arrays(&args.args)?;
1101 let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?;
1102
1103 let size = cell_this_vec.len();
1104 let mut builder = Float64Builder::with_capacity(size);
1105
1106 for i in 0..size {
1107 let cell_this =
1108 ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?;
1109 let cell_that =
1110 ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?;
1111 let result = match (cell_this, cell_that) {
1112 (Some(cell_this), Some(cell_that)) => {
1113 let centroid_this = LatLng::from(cell_this);
1114 let centroid_that = LatLng::from(cell_that);
1115
1116 let dist = Self::distance(centroid_this, centroid_that);
1117 Some(dist)
1118 }
1119 _ => None,
1120 };
1121
1122 builder.append_option(result);
1123 }
1124
1125 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1126 }
1127}
1128
1129fn value_to_resolution(r: u8) -> datafusion_common::Result<Resolution> {
1130 Resolution::try_from(r).map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1131}
1132
1133macro_rules! ensure_then_coerce {
1134 ($compare:expr, $coerce:expr) => {{
1135 if !$compare {
1136 return Err(datafusion_common::DataFusionError::Execution(
1137 "Argument was outside of acceptable range".to_string(),
1138 ));
1139 }
1140 Ok($coerce)
1141 }};
1142}
1143
1144fn value_to_position(v: ScalarValue) -> datafusion_common::Result<u64> {
1145 match v {
1146 ScalarValue::Int8(Some(v)) => ensure_then_coerce!(v >= 0, v as u64),
1147 ScalarValue::Int16(Some(v)) => ensure_then_coerce!(v >= 0, v as u64),
1148 ScalarValue::Int32(Some(v)) => ensure_then_coerce!(v >= 0, v as u64),
1149 ScalarValue::Int64(Some(v)) => ensure_then_coerce!(v >= 0, v as u64),
1150 ScalarValue::UInt8(Some(v)) => Ok(v as u64),
1151 ScalarValue::UInt16(Some(v)) => Ok(v as u64),
1152 ScalarValue::UInt32(Some(v)) => Ok(v as u64),
1153 ScalarValue::UInt64(Some(v)) => Ok(v),
1154 _ => unreachable!(),
1155 }
1156}
1157
1158fn value_to_distance(v: ScalarValue) -> datafusion_common::Result<u32> {
1159 match v {
1160 ScalarValue::Int8(Some(v)) => ensure_then_coerce!(v >= 0, v as u32),
1161 ScalarValue::Int16(Some(v)) => ensure_then_coerce!(v >= 0, v as u32),
1162 ScalarValue::Int32(Some(v)) => ensure_then_coerce!(v >= 0, v as u32),
1163 ScalarValue::Int64(Some(v)) => ensure_then_coerce!(v >= 0, v as u32),
1164 ScalarValue::UInt8(Some(v)) => Ok(v as u32),
1165 ScalarValue::UInt16(Some(v)) => Ok(v as u32),
1166 ScalarValue::UInt32(Some(v)) => Ok(v),
1167 ScalarValue::UInt64(Some(v)) => Ok(v as u32),
1168 _ => unreachable!(),
1169 }
1170}
1171
1172fn signature_of_cell() -> Signature {
1173 let mut signatures = Vec::with_capacity(CELL_TYPES.len());
1174 for cell_type in CELL_TYPES.as_slice() {
1175 signatures.push(TypeSignature::Exact(vec![cell_type.clone()]));
1176 }
1177
1178 Signature::one_of(signatures, Volatility::Stable)
1179}
1180
1181fn signature_of_double_cells() -> Signature {
1182 let mut signatures = Vec::with_capacity(CELL_TYPES.len() * CELL_TYPES.len());
1183 for cell_type in CELL_TYPES.as_slice() {
1184 for cell_type2 in CELL_TYPES.as_slice() {
1185 signatures.push(TypeSignature::Exact(vec![
1186 cell_type.clone(),
1187 cell_type2.clone(),
1188 ]));
1189 }
1190 }
1191
1192 Signature::one_of(signatures, Volatility::Stable)
1193}
1194
1195fn signature_of_cell_and_resolution() -> Signature {
1196 let mut signatures = Vec::with_capacity(CELL_TYPES.len() * RESOLUTION_TYPES.len());
1197 for cell_type in CELL_TYPES.as_slice() {
1198 for resolution_type in RESOLUTION_TYPES {
1199 signatures.push(TypeSignature::Exact(vec![
1200 cell_type.clone(),
1201 resolution_type.clone(),
1202 ]));
1203 }
1204 }
1205 Signature::one_of(signatures, Volatility::Stable)
1206}
1207
1208fn signature_of_cell_and_distance() -> Signature {
1209 let mut signatures = Vec::with_capacity(CELL_TYPES.len() * DISTANCE_TYPES.len());
1210 for cell_type in CELL_TYPES.as_slice() {
1211 for distance_type in DISTANCE_TYPES {
1212 signatures.push(TypeSignature::Exact(vec![
1213 cell_type.clone(),
1214 distance_type.clone(),
1215 ]));
1216 }
1217 }
1218 Signature::one_of(signatures, Volatility::Stable)
1219}
1220
1221fn cell_from_value(v: ScalarValue) -> datafusion_common::Result<Option<CellIndex>> {
1222 match v {
1223 ScalarValue::Int64(Some(v)) => Some(CellIndex::try_from(v as u64)),
1224 ScalarValue::UInt64(Some(v)) => Some(CellIndex::try_from(v)),
1225 ScalarValue::Utf8(Some(s)) => Some(CellIndex::from_str(&s)),
1226 _ => None,
1227 }
1228 .transpose()
1229 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1230}
1231
1232fn cells_from_value(v: ScalarValue) -> datafusion_common::Result<Vec<CellIndex>> {
1238 match v {
1239 ScalarValue::List(list) => match list.value_type() {
1240 DataType::Int64 => list
1241 .values()
1242 .as_primitive::<Int64Type>()
1243 .iter()
1244 .map(|v| {
1245 if let Some(v) = v {
1246 CellIndex::try_from(v as u64)
1247 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1248 } else {
1249 Err(DataFusionError::Execution(
1250 "Invalid data type in array".to_string(),
1251 ))
1252 }
1253 })
1254 .collect::<datafusion_common::Result<Vec<CellIndex>>>(),
1255 DataType::UInt64 => list
1256 .values()
1257 .as_primitive::<UInt64Type>()
1258 .iter()
1259 .map(|v| {
1260 if let Some(v) = v {
1261 CellIndex::try_from(v)
1262 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1263 } else {
1264 Err(DataFusionError::Execution(
1265 "Invalid data type in array".to_string(),
1266 ))
1267 }
1268 })
1269 .collect::<datafusion_common::Result<Vec<CellIndex>>>(),
1270 DataType::Utf8 => list
1271 .values()
1272 .as_string::<i32>()
1273 .iter()
1274 .map(|v| {
1275 if let Some(v) = v {
1276 CellIndex::from_str(v)
1277 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1278 } else {
1279 Err(DataFusionError::Execution(
1280 "Invalid data type in array".to_string(),
1281 ))
1282 }
1283 })
1284 .collect::<datafusion_common::Result<Vec<CellIndex>>>(),
1285 _ => Ok(vec![]),
1286 },
1287 ScalarValue::Utf8(Some(csv)) => {
1288 let str_seq = csv.split(',');
1289 str_seq
1290 .map(|v| {
1291 CellIndex::from_str(v.trim())
1292 .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))
1293 })
1294 .collect::<datafusion_common::Result<Vec<CellIndex>>>()
1295 }
1296 _ => Ok(vec![]),
1297 }
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302 use super::*;
1303
1304 #[test]
1305 fn test_h3_euclidean_distance() {
1306 let point_this = LatLng::new(42.3521, -72.1235).expect("incorrect lat lng");
1307 let point_that = LatLng::new(42.45, -72.1260).expect("incorrect lat lng");
1308
1309 let dist = H3CellDistanceEuclideanDegree::distance(point_this, point_that);
1310 assert_eq!(dist, 0.09793191512474639);
1311 }
1312}