common_function/
helper.rs1use api::v1::meta::ResolveStrategy;
16use common_query::error::{
17 InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, TypeSignature, Volatility};
20use datatypes::arrow::datatypes::DataType;
21use datatypes::prelude::ConcreteDataType;
22use datatypes::types::cast::cast;
23use datatypes::value::ValueRef;
24use snafu::{OptionExt, ResultExt};
25
26pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
28 let mut sigs = Vec::with_capacity(args1.len() * args2.len());
29
30 for arg1 in &args1 {
31 for arg2 in &args2 {
32 sigs.push(TypeSignature::Exact(vec![arg1.clone(), arg2.clone()]));
33 }
34 }
35
36 Signature::one_of(sigs, Volatility::Immutable)
37}
38
39pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
41 cast(value.clone().into(), &ConcreteDataType::uint64_datatype())
42 .context(InvalidInputTypeSnafu {
43 err_msg: format!(
44 "Failed to cast input into uint64, actual type: {:#?}",
45 value.data_type(),
46 ),
47 })
48 .map(|v| v.as_u64())
49}
50
51pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
53 cast(value.clone().into(), &ConcreteDataType::uint32_datatype())
54 .context(InvalidInputTypeSnafu {
55 err_msg: format!(
56 "Failed to cast input into uint32, actual type: {:#?}",
57 value.data_type(),
58 ),
59 })
60 .map(|v| v.as_u64().map(|v| v as u32))
61}
62
63pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
65 ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
66 err_msg: format!("Invalid resolve strategy: {}", strategy),
67 })
68}
69
70pub fn default_parallelism() -> u32 {
72 64
73}
74
75pub fn default_resolve_strategy() -> ResolveStrategy {
77 ResolveStrategy::UseLatest
78}
79
80pub fn get_string_from_params<'a>(
85 params: &'a [ValueRef<'a>],
86 index: usize,
87 fn_name: &'a str,
88) -> Result<&'a str> {
89 let ValueRef::String(s) = ¶ms[index] else {
90 return UnsupportedInputDataTypeSnafu {
91 function: fn_name,
92 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
93 }
94 .fail();
95 };
96 Ok(s)
97}
98
99macro_rules! with_match_timestamp_types {
100 ($data_type:expr, | $_t:tt $T:ident | $body:tt) => {{
101 macro_rules! __with_ty__ {
102 ( $_t $T:ident ) => {
103 $body
104 };
105 }
106
107 use datafusion_common::DataFusionError;
108 use datafusion_common::arrow::datatypes::{
109 TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
110 TimestampSecondType,
111 };
112
113 match $data_type {
114 DataType::Timestamp(TimeUnit::Second, _) => Ok(__with_ty__! { TimestampSecondType }),
115 DataType::Timestamp(TimeUnit::Millisecond, _) => {
116 Ok(__with_ty__! { TimestampMillisecondType })
117 }
118 DataType::Timestamp(TimeUnit::Microsecond, _) => {
119 Ok(__with_ty__! { TimestampMicrosecondType })
120 }
121 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
122 Ok(__with_ty__! { TimestampNanosecondType })
123 }
124 _ => Err(DataFusionError::Execution(format!(
125 "not expected data type: '{}'",
126 $data_type
127 ))),
128 }
129 }};
130}
131
132pub(crate) use with_match_timestamp_types;
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn test_parse_resolve_strategy() {
140 assert_eq!(
141 parse_resolve_strategy("UseLatest").unwrap(),
142 ResolveStrategy::UseLatest
143 );
144 }
145}