common_function/admin/
flush_compact_region.rs1use common_macro::admin_fn;
16use common_query::error::{
17 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, Volatility};
20use datatypes::data_type::DataType;
21use datatypes::prelude::*;
22use session::context::QueryContextRef;
23use snafu::ensure;
24use store_api::storage::RegionId;
25
26use crate::handlers::TableMutationHandlerRef;
27use crate::helper::cast_u64;
28
29macro_rules! define_region_function {
30 ($name: expr, $display_name_str: expr, $display_name: ident) => {
31 #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
33 pub(crate) async fn $display_name(
34 table_mutation_handler: &TableMutationHandlerRef,
35 query_ctx: &QueryContextRef,
36 params: &[ValueRef<'_>],
37 ) -> Result<Value> {
38 ensure!(
39 params.len() == 1,
40 InvalidFuncArgsSnafu {
41 err_msg: format!(
42 "The length of the args is not correct, expect 1, have: {}",
43 params.len()
44 ),
45 }
46 );
47
48 let Some(region_id) = cast_u64(¶ms[0])? else {
49 return UnsupportedInputDataTypeSnafu {
50 function: stringify!($display_name_str),
51 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
52 }
53 .fail();
54 };
55
56 let affected_rows = table_mutation_handler
57 .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
58 .await?;
59
60 Ok(Value::from(affected_rows as u64))
61 }
62 };
63}
64
65define_region_function!(FlushRegionFunction, flush_region, flush_region);
66
67define_region_function!(CompactRegionFunction, compact_region, compact_region);
68
69fn signature() -> Signature {
70 Signature::uniform(
71 1,
72 ConcreteDataType::numerics()
73 .into_iter()
74 .map(|dt| dt.as_arrow_type())
75 .collect(),
76 Volatility::Immutable,
77 )
78}
79
80#[cfg(test)]
81mod tests {
82 use std::sync::Arc;
83
84 use arrow::array::UInt64Array;
85 use arrow::datatypes::{DataType, Field};
86 use datafusion_expr::ColumnarValue;
87
88 use super::*;
89 use crate::function::FunctionContext;
90 use crate::function_factory::ScalarFunctionFactory;
91
92 macro_rules! define_region_function_test {
93 ($name: ident, $func: ident) => {
94 paste::paste! {
95 #[test]
96 fn [<test_ $name _misc>]() {
97 let factory: ScalarFunctionFactory = $func::factory().into();
98 let f = factory.provide(FunctionContext::mock());
99 assert_eq!(stringify!($name), f.name());
100 assert_eq!(
101 DataType::UInt64,
102 f.return_type(&[]).unwrap()
103 );
104 assert!(matches!(f.signature(),
105 datafusion_expr::Signature {
106 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
107 volatility: datafusion_expr::Volatility::Immutable,
108 ..
109 } if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
110 }
111
112 #[tokio::test]
113 async fn [<test_ $name _missing_table_mutation>]() {
114 let factory: ScalarFunctionFactory = $func::factory().into();
115 let provider = factory.provide(FunctionContext::default());
116 let f = provider.as_async().unwrap();
117
118 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
119 args: vec![
120 ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
121 ],
122 arg_fields: vec![
123 Arc::new(Field::new("arg_0", DataType::UInt64, false)),
124 ],
125 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
126 number_rows: 1,
127 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
128 };
129 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
130 assert_eq!(
131 "Execution error: Handler error: Missing TableMutationHandler, not expected",
132 result.to_string()
133 );
134 }
135
136 #[tokio::test]
137 async fn [<test_ $name>]() {
138 let factory: ScalarFunctionFactory = $func::factory().into();
139 let provider = factory.provide(FunctionContext::mock());
140 let f = provider.as_async().unwrap();
141
142 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
143 args: vec![
144 ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
145 ],
146 arg_fields: vec![
147 Arc::new(Field::new("arg_0", DataType::UInt64, false)),
148 ],
149 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
150 number_rows: 1,
151 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
152 };
153 let result = f.invoke_async_with_args(func_args).await.unwrap();
154
155 match result {
156 ColumnarValue::Array(array) => {
157 let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
158 assert_eq!(result_array.value(0), 42u64);
159 }
160 ColumnarValue::Scalar(scalar) => {
161 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
162 }
163 }
164 }
165 }
166 };
167 }
168
169 define_region_function_test!(flush_region, FlushRegionFunction);
170
171 define_region_function_test!(compact_region, CompactRegionFunction);
172}