common_function/
helper.rs1use api::v1::meta::ResolveStrategy;
16use common_query::error::{
17    InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, TypeSignature, Volatility};
20use datatypes::arrow::datatypes::DataType;
21use datatypes::prelude::ConcreteDataType;
22use datatypes::types::cast::cast;
23use datatypes::value::ValueRef;
24use snafu::{OptionExt, ResultExt};
25
26pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
28    let mut sigs = Vec::with_capacity(args1.len() * args2.len());
29
30    for arg1 in &args1 {
31        for arg2 in &args2 {
32            sigs.push(TypeSignature::Exact(vec![arg1.clone(), arg2.clone()]));
33        }
34    }
35
36    Signature::one_of(sigs, Volatility::Immutable)
37}
38
39pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
41    cast(value.clone().into(), &ConcreteDataType::uint64_datatype())
42        .context(InvalidInputTypeSnafu {
43            err_msg: format!(
44                "Failed to cast input into uint64, actual type: {:#?}",
45                value.data_type(),
46            ),
47        })
48        .map(|v| v.as_u64())
49}
50
51pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
53    cast(value.clone().into(), &ConcreteDataType::uint32_datatype())
54        .context(InvalidInputTypeSnafu {
55            err_msg: format!(
56                "Failed to cast input into uint32, actual type: {:#?}",
57                value.data_type(),
58            ),
59        })
60        .map(|v| v.as_u64().map(|v| v as u32))
61}
62
63pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
65    ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
66        err_msg: format!("Invalid resolve strategy: {}", strategy),
67    })
68}
69
70pub fn default_parallelism() -> u32 {
72    64
73}
74
75pub fn default_resolve_strategy() -> ResolveStrategy {
77    ResolveStrategy::UseLatest
78}
79
80pub fn get_string_from_params<'a>(
85    params: &'a [ValueRef<'a>],
86    index: usize,
87    fn_name: &'a str,
88) -> Result<&'a str> {
89    let ValueRef::String(s) = ¶ms[index] else {
90        return UnsupportedInputDataTypeSnafu {
91            function: fn_name,
92            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
93        }
94        .fail();
95    };
96    Ok(s)
97}
98
99macro_rules! with_match_timestamp_types {
100    ($data_type:expr, | $_t:tt $T:ident | $body:tt) => {{
101        macro_rules! __with_ty__ {
102            ( $_t $T:ident ) => {
103                $body
104            };
105        }
106
107        use datafusion_common::DataFusionError;
108        use datafusion_common::arrow::datatypes::{
109            TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
110            TimestampSecondType,
111        };
112
113        match $data_type {
114            DataType::Timestamp(TimeUnit::Second, _) => Ok(__with_ty__! { TimestampSecondType }),
115            DataType::Timestamp(TimeUnit::Millisecond, _) => {
116                Ok(__with_ty__! { TimestampMillisecondType })
117            }
118            DataType::Timestamp(TimeUnit::Microsecond, _) => {
119                Ok(__with_ty__! { TimestampMicrosecondType })
120            }
121            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
122                Ok(__with_ty__! { TimestampNanosecondType })
123            }
124            _ => Err(DataFusionError::Execution(format!(
125                "not expected data type: '{}'",
126                $data_type
127            ))),
128        }
129    }};
130}
131
132pub(crate) use with_match_timestamp_types;
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn test_parse_resolve_strategy() {
140        assert_eq!(
141            parse_resolve_strategy("UseLatest").unwrap(),
142            ResolveStrategy::UseLatest
143        );
144    }
145}