common_function/
helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ResolveStrategy;
16use common_query::error::{
17    InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, TypeSignature, Volatility};
20use datatypes::arrow::datatypes::DataType;
21use datatypes::prelude::ConcreteDataType;
22use datatypes::types::cast::cast;
23use datatypes::value::ValueRef;
24use snafu::{OptionExt, ResultExt};
25
26/// Create a function signature with oneof signatures of interleaving two arguments.
27pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
28    let mut sigs = Vec::with_capacity(args1.len() * args2.len());
29
30    for arg1 in &args1 {
31        for arg2 in &args2 {
32            sigs.push(TypeSignature::Exact(vec![arg1.clone(), arg2.clone()]));
33        }
34    }
35
36    Signature::one_of(sigs, Volatility::Immutable)
37}
38
39/// Cast a [`ValueRef`] to u64, returns `None` if fails
40pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
41    cast(value.clone().into(), &ConcreteDataType::uint64_datatype())
42        .context(InvalidInputTypeSnafu {
43            err_msg: format!(
44                "Failed to cast input into uint64, actual type: {:#?}",
45                value.data_type(),
46            ),
47        })
48        .map(|v| v.as_u64())
49}
50
51/// Cast a [`ValueRef`] to u32, returns `None` if fails
52pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
53    cast(value.clone().into(), &ConcreteDataType::uint32_datatype())
54        .context(InvalidInputTypeSnafu {
55            err_msg: format!(
56                "Failed to cast input into uint32, actual type: {:#?}",
57                value.data_type(),
58            ),
59        })
60        .map(|v| v.as_u64().map(|v| v as u32))
61}
62
63/// Parse a resolve strategy from a string.
64pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
65    ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
66        err_msg: format!("Invalid resolve strategy: {}", strategy),
67    })
68}
69
70/// Default parallelism for reconcile operations.
71pub fn default_parallelism() -> u32 {
72    64
73}
74
75/// Default resolve strategy for reconcile operations.
76pub fn default_resolve_strategy() -> ResolveStrategy {
77    ResolveStrategy::UseLatest
78}
79
80/// Get the string value from the params.
81///
82/// # Errors
83/// Returns an error if the input type is not a string.
84pub fn get_string_from_params<'a>(
85    params: &'a [ValueRef<'a>],
86    index: usize,
87    fn_name: &'a str,
88) -> Result<&'a str> {
89    let ValueRef::String(s) = &params[index] else {
90        return UnsupportedInputDataTypeSnafu {
91            function: fn_name,
92            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
93        }
94        .fail();
95    };
96    Ok(s)
97}
98
99macro_rules! with_match_timestamp_types {
100    ($data_type:expr, | $_t:tt $T:ident | $body:tt) => {{
101        macro_rules! __with_ty__ {
102            ( $_t $T:ident ) => {
103                $body
104            };
105        }
106
107        use datafusion_common::DataFusionError;
108        use datafusion_common::arrow::datatypes::{
109            TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
110            TimestampSecondType,
111        };
112
113        match $data_type {
114            DataType::Timestamp(TimeUnit::Second, _) => Ok(__with_ty__! { TimestampSecondType }),
115            DataType::Timestamp(TimeUnit::Millisecond, _) => {
116                Ok(__with_ty__! { TimestampMillisecondType })
117            }
118            DataType::Timestamp(TimeUnit::Microsecond, _) => {
119                Ok(__with_ty__! { TimestampMicrosecondType })
120            }
121            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
122                Ok(__with_ty__! { TimestampNanosecondType })
123            }
124            _ => Err(DataFusionError::Execution(format!(
125                "not expected data type: '{}'",
126                $data_type
127            ))),
128        }
129    }};
130}
131
132pub(crate) use with_match_timestamp_types;
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn test_parse_resolve_strategy() {
140        assert_eq!(
141            parse_resolve_strategy("UseLatest").unwrap(),
142            ResolveStrategy::UseLatest
143        );
144    }
145}