common_function/
helper.rs1use api::v1::meta::ResolveStrategy;
16use common_query::error::{
17 InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, TypeSignature, Volatility};
20use datatypes::arrow::datatypes::DataType;
21use datatypes::prelude::ConcreteDataType;
22use datatypes::types::cast::cast;
23use datatypes::value::ValueRef;
24use snafu::{OptionExt, ResultExt};
25
26pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
28 let mut sigs = Vec::with_capacity(args1.len() * args2.len());
29
30 for arg1 in &args1 {
31 for arg2 in &args2 {
32 sigs.push(TypeSignature::Exact(vec![arg1.clone(), arg2.clone()]));
33 }
34 }
35
36 Signature::one_of(sigs, Volatility::Immutable)
37}
38
39pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
41 cast((*value).into(), &ConcreteDataType::uint64_datatype())
42 .context(InvalidInputTypeSnafu {
43 err_msg: format!(
44 "Failed to cast input into uint64, actual type: {:#?}",
45 value.data_type(),
46 ),
47 })
48 .map(|v| v.as_u64())
49}
50
51pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
53 cast((*value).into(), &ConcreteDataType::uint32_datatype())
54 .context(InvalidInputTypeSnafu {
55 err_msg: format!(
56 "Failed to cast input into uint32, actual type: {:#?}",
57 value.data_type(),
58 ),
59 })
60 .map(|v| v.as_u64().map(|v| v as u32))
61}
62
63pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
65 ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
66 err_msg: format!("Invalid resolve strategy: {}", strategy),
67 })
68}
69
70pub fn default_parallelism() -> u32 {
72 64
73}
74
75pub fn default_resolve_strategy() -> ResolveStrategy {
77 ResolveStrategy::UseLatest
78}
79
80pub fn get_string_from_params<'a>(
85 params: &'a [ValueRef<'a>],
86 index: usize,
87 fn_name: &'a str,
88) -> Result<&'a str> {
89 let ValueRef::String(s) = ¶ms[index] else {
90 return UnsupportedInputDataTypeSnafu {
91 function: fn_name,
92 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
93 }
94 .fail();
95 };
96 Ok(s)
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_parse_resolve_strategy() {
105 assert_eq!(
106 parse_resolve_strategy("UseLatest").unwrap(),
107 ResolveStrategy::UseLatest
108 );
109 }
110}