common_function/
flush_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_macro::admin_fn;
17use common_query::error::{
18    ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
19    UnsupportedInputDataTypeSnafu,
20};
21use common_query::prelude::Signature;
22use datafusion::logical_expr::Volatility;
23use datatypes::value::{Value, ValueRef};
24use session::context::QueryContextRef;
25use snafu::{ensure, ResultExt};
26use sql::parser::ParserContext;
27use store_api::storage::ConcreteDataType;
28
29use crate::handlers::FlowServiceHandlerRef;
30
31fn flush_signature() -> Signature {
32    Signature::uniform(
33        1,
34        vec![ConcreteDataType::string_datatype()],
35        Volatility::Immutable,
36    )
37}
38
39#[admin_fn(
40    name = FlushFlowFunction,
41    display_name = flush_flow,
42    sig_fn = flush_signature,
43    ret = uint64
44)]
45pub(crate) async fn flush_flow(
46    flow_service_handler: &FlowServiceHandlerRef,
47    query_ctx: &QueryContextRef,
48    params: &[ValueRef<'_>],
49) -> Result<Value> {
50    let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
51
52    let res = flow_service_handler
53        .flush(&catalog_name, &flow_name, query_ctx.clone())
54        .await?;
55    let affected_rows = res.affected_rows;
56
57    Ok(Value::from(affected_rows))
58}
59
60fn parse_flush_flow(
61    params: &[ValueRef<'_>],
62    query_ctx: &QueryContextRef,
63) -> Result<(String, String)> {
64    ensure!(
65        params.len() == 1,
66        InvalidFuncArgsSnafu {
67            err_msg: format!(
68                "The length of the args is not correct, expect 1, have: {}",
69                params.len()
70            ),
71        }
72    );
73
74    let ValueRef::String(flow_name) = params[0] else {
75        return UnsupportedInputDataTypeSnafu {
76            function: "flush_flow",
77            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
78        }
79        .fail();
80    };
81    let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
82        .map_err(BoxedError::new)
83        .context(ExecuteSnafu)?;
84
85    let (catalog_name, flow_name) = match &obj_name.0[..] {
86        [flow_name] => (
87            query_ctx.current_catalog().to_string(),
88            flow_name.value.clone(),
89        ),
90        [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
91        _ => {
92            return InvalidFuncArgsSnafu {
93                err_msg: format!(
94                    "expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
95                    obj_name
96                ),
97            }
98            .fail()
99        }
100    };
101    Ok((catalog_name, flow_name))
102}
103
104#[cfg(test)]
105mod test {
106    use std::sync::Arc;
107
108    use datatypes::scalars::ScalarVector;
109    use datatypes::vectors::StringVector;
110    use session::context::QueryContext;
111
112    use super::*;
113    use crate::function::{AsyncFunction, FunctionContext};
114
115    #[test]
116    fn test_flush_flow_metadata() {
117        let f = FlushFlowFunction;
118        assert_eq!("flush_flow", f.name());
119        assert_eq!(
120            ConcreteDataType::uint64_datatype(),
121            f.return_type(&[]).unwrap()
122        );
123        assert_eq!(
124            f.signature(),
125            Signature::uniform(
126                1,
127                vec![ConcreteDataType::string_datatype()],
128                Volatility::Immutable,
129            )
130        );
131    }
132
133    #[tokio::test]
134    async fn test_missing_flow_service() {
135        let f = FlushFlowFunction;
136
137        let args = vec!["flow_name"];
138        let args = args
139            .into_iter()
140            .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
141            .collect::<Vec<_>>();
142
143        let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
144        assert_eq!(
145            "Missing FlowServiceHandler, not expected",
146            result.to_string()
147        );
148    }
149
150    #[test]
151    fn test_parse_flow_args() {
152        let testcases = [
153            ("flow_name", ("greptime", "flow_name")),
154            ("catalog.flow_name", ("catalog", "flow_name")),
155        ];
156        for (input, expected) in testcases.iter() {
157            let args = vec![*input];
158            let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
159
160            let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
161            assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
162        }
163    }
164}