common_function/
helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ResolveStrategy;
16use common_query::error::{
17    InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, TypeSignature, Volatility};
20use datatypes::arrow::datatypes::DataType;
21use datatypes::prelude::ConcreteDataType;
22use datatypes::types::cast::cast;
23use datatypes::value::ValueRef;
24use snafu::{OptionExt, ResultExt};
25
26/// Create a function signature with oneof signatures of interleaving two arguments.
27pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
28    let mut sigs = Vec::with_capacity(args1.len() * args2.len());
29
30    for arg1 in &args1 {
31        for arg2 in &args2 {
32            sigs.push(TypeSignature::Exact(vec![arg1.clone(), arg2.clone()]));
33        }
34    }
35
36    Signature::one_of(sigs, Volatility::Immutable)
37}
38
39/// Cast a [`ValueRef`] to u64, returns `None` if fails
40pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
41    cast((*value).into(), &ConcreteDataType::uint64_datatype())
42        .context(InvalidInputTypeSnafu {
43            err_msg: format!(
44                "Failed to cast input into uint64, actual type: {:#?}",
45                value.data_type(),
46            ),
47        })
48        .map(|v| v.as_u64())
49}
50
51/// Cast a [`ValueRef`] to u32, returns `None` if fails
52pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
53    cast((*value).into(), &ConcreteDataType::uint32_datatype())
54        .context(InvalidInputTypeSnafu {
55            err_msg: format!(
56                "Failed to cast input into uint32, actual type: {:#?}",
57                value.data_type(),
58            ),
59        })
60        .map(|v| v.as_u64().map(|v| v as u32))
61}
62
63/// Parse a resolve strategy from a string.
64pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
65    ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
66        err_msg: format!("Invalid resolve strategy: {}", strategy),
67    })
68}
69
70/// Default parallelism for reconcile operations.
71pub fn default_parallelism() -> u32 {
72    64
73}
74
75/// Default resolve strategy for reconcile operations.
76pub fn default_resolve_strategy() -> ResolveStrategy {
77    ResolveStrategy::UseLatest
78}
79
80/// Get the string value from the params.
81///
82/// # Errors
83/// Returns an error if the input type is not a string.
84pub fn get_string_from_params<'a>(
85    params: &'a [ValueRef<'a>],
86    index: usize,
87    fn_name: &'a str,
88) -> Result<&'a str> {
89    let ValueRef::String(s) = &params[index] else {
90        return UnsupportedInputDataTypeSnafu {
91            function: fn_name,
92            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
93        }
94        .fail();
95    };
96    Ok(s)
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_parse_resolve_strategy() {
105        assert_eq!(
106            parse_resolve_strategy("UseLatest").unwrap(),
107            ResolveStrategy::UseLatest
108        );
109    }
110}