common_function/
flush_flow.rs1use common_error::ext::BoxedError;
16use common_macro::admin_fn;
17use common_query::error::{
18 ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
19 UnsupportedInputDataTypeSnafu,
20};
21use common_query::prelude::Signature;
22use datafusion::logical_expr::Volatility;
23use datatypes::value::{Value, ValueRef};
24use session::context::QueryContextRef;
25use snafu::{ensure, ResultExt};
26use sql::parser::ParserContext;
27use store_api::storage::ConcreteDataType;
28
29use crate::handlers::FlowServiceHandlerRef;
30
31fn flush_signature() -> Signature {
32 Signature::uniform(
33 1,
34 vec![ConcreteDataType::string_datatype()],
35 Volatility::Immutable,
36 )
37}
38
39#[admin_fn(
40 name = FlushFlowFunction,
41 display_name = flush_flow,
42 sig_fn = flush_signature,
43 ret = uint64
44)]
45pub(crate) async fn flush_flow(
46 flow_service_handler: &FlowServiceHandlerRef,
47 query_ctx: &QueryContextRef,
48 params: &[ValueRef<'_>],
49) -> Result<Value> {
50 let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
51
52 let res = flow_service_handler
53 .flush(&catalog_name, &flow_name, query_ctx.clone())
54 .await?;
55 let affected_rows = res.affected_rows;
56
57 Ok(Value::from(affected_rows))
58}
59
60fn parse_flush_flow(
61 params: &[ValueRef<'_>],
62 query_ctx: &QueryContextRef,
63) -> Result<(String, String)> {
64 ensure!(
65 params.len() == 1,
66 InvalidFuncArgsSnafu {
67 err_msg: format!(
68 "The length of the args is not correct, expect 1, have: {}",
69 params.len()
70 ),
71 }
72 );
73
74 let ValueRef::String(flow_name) = params[0] else {
75 return UnsupportedInputDataTypeSnafu {
76 function: "flush_flow",
77 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
78 }
79 .fail();
80 };
81 let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
82 .map_err(BoxedError::new)
83 .context(ExecuteSnafu)?;
84
85 let (catalog_name, flow_name) = match &obj_name.0[..] {
86 [flow_name] => (
87 query_ctx.current_catalog().to_string(),
88 flow_name.value.clone(),
89 ),
90 [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
91 _ => {
92 return InvalidFuncArgsSnafu {
93 err_msg: format!(
94 "expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
95 obj_name
96 ),
97 }
98 .fail()
99 }
100 };
101 Ok((catalog_name, flow_name))
102}
103
104#[cfg(test)]
105mod test {
106 use std::sync::Arc;
107
108 use datatypes::scalars::ScalarVector;
109 use datatypes::vectors::StringVector;
110 use session::context::QueryContext;
111
112 use super::*;
113 use crate::function::{AsyncFunction, FunctionContext};
114
115 #[test]
116 fn test_flush_flow_metadata() {
117 let f = FlushFlowFunction;
118 assert_eq!("flush_flow", f.name());
119 assert_eq!(
120 ConcreteDataType::uint64_datatype(),
121 f.return_type(&[]).unwrap()
122 );
123 assert_eq!(
124 f.signature(),
125 Signature::uniform(
126 1,
127 vec![ConcreteDataType::string_datatype()],
128 Volatility::Immutable,
129 )
130 );
131 }
132
133 #[tokio::test]
134 async fn test_missing_flow_service() {
135 let f = FlushFlowFunction;
136
137 let args = vec!["flow_name"];
138 let args = args
139 .into_iter()
140 .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
141 .collect::<Vec<_>>();
142
143 let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
144 assert_eq!(
145 "Missing FlowServiceHandler, not expected",
146 result.to_string()
147 );
148 }
149
150 #[test]
151 fn test_parse_flow_args() {
152 let testcases = [
153 ("flow_name", ("greptime", "flow_name")),
154 ("catalog.flow_name", ("catalog", "flow_name")),
155 ];
156 for (input, expected) in testcases.iter() {
157 let args = vec![*input];
158 let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
159
160 let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
161 assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
162 }
163 }
164}