common_function/admin/
flush_compact_region.rs1use common_macro::admin_fn;
16use common_query::error::{
17 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, Volatility};
20use datatypes::data_type::DataType;
21use datatypes::prelude::*;
22use session::context::QueryContextRef;
23use snafu::ensure;
24use store_api::storage::RegionId;
25
26use crate::handlers::TableMutationHandlerRef;
27use crate::helper::cast_u64;
28
29macro_rules! define_region_function {
30 ($name: expr, $display_name_str: expr, $display_name: ident) => {
31 #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
33 pub(crate) async fn $display_name(
34 table_mutation_handler: &TableMutationHandlerRef,
35 query_ctx: &QueryContextRef,
36 params: &[ValueRef<'_>],
37 ) -> Result<Value> {
38 ensure!(
39 params.len() == 1,
40 InvalidFuncArgsSnafu {
41 err_msg: format!(
42 "The length of the args is not correct, expect 1, have: {}",
43 params.len()
44 ),
45 }
46 );
47
48 let Some(region_id) = cast_u64(¶ms[0])? else {
49 return UnsupportedInputDataTypeSnafu {
50 function: stringify!($display_name_str),
51 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
52 }
53 .fail();
54 };
55
56 let affected_rows = table_mutation_handler
57 .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
58 .await?;
59
60 Ok(Value::from(affected_rows as u64))
61 }
62 };
63}
64
65define_region_function!(FlushRegionFunction, flush_region, flush_region);
66
67define_region_function!(CompactRegionFunction, compact_region, compact_region);
68
69fn signature() -> Signature {
70 Signature::uniform(
71 1,
72 ConcreteDataType::numerics()
73 .into_iter()
74 .map(|dt| dt.as_arrow_type())
75 .collect(),
76 Volatility::Immutable,
77 )
78}
79
80#[cfg(test)]
81mod tests {
82 use std::sync::Arc;
83
84 use arrow::array::UInt64Array;
85 use arrow::datatypes::{DataType, Field};
86 use datafusion_expr::ColumnarValue;
87
88 use super::*;
89 use crate::function::FunctionContext;
90 use crate::function_factory::ScalarFunctionFactory;
91
92 macro_rules! define_region_function_test {
93 ($name: ident, $func: ident) => {
94 paste::paste! {
95 #[test]
96 fn [<test_ $name _misc>]() {
97 let factory: ScalarFunctionFactory = $func::factory().into();
98 let f = factory.provide(FunctionContext::mock());
99 assert_eq!(stringify!($name), f.name());
100 assert_eq!(
101 DataType::UInt64,
102 f.return_type(&[]).unwrap()
103 );
104 assert!(matches!(f.signature(),
105 datafusion_expr::Signature {
106 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
107 volatility: datafusion_expr::Volatility::Immutable
108 } if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
109 }
110
111 #[tokio::test]
112 async fn [<test_ $name _missing_table_mutation>]() {
113 let factory: ScalarFunctionFactory = $func::factory().into();
114 let provider = factory.provide(FunctionContext::default());
115 let f = provider.as_async().unwrap();
116
117 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
118 args: vec![
119 ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
120 ],
121 arg_fields: vec![
122 Arc::new(Field::new("arg_0", DataType::UInt64, false)),
123 ],
124 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
125 number_rows: 1,
126 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
127 };
128 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
129 assert_eq!(
130 "Execution error: Handler error: Missing TableMutationHandler, not expected",
131 result.to_string()
132 );
133 }
134
135 #[tokio::test]
136 async fn [<test_ $name>]() {
137 let factory: ScalarFunctionFactory = $func::factory().into();
138 let provider = factory.provide(FunctionContext::mock());
139 let f = provider.as_async().unwrap();
140
141 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
142 args: vec![
143 ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
144 ],
145 arg_fields: vec![
146 Arc::new(Field::new("arg_0", DataType::UInt64, false)),
147 ],
148 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
149 number_rows: 1,
150 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
151 };
152 let result = f.invoke_async_with_args(func_args).await.unwrap();
153
154 match result {
155 ColumnarValue::Array(array) => {
156 let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
157 assert_eq!(result_array.value(0), 42u64);
158 }
159 ColumnarValue::Scalar(scalar) => {
160 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
161 }
162 }
163 }
164 }
165 };
166 }
167
168 define_region_function_test!(flush_region, FlushRegionFunction);
169
170 define_region_function_test!(compact_region, CompactRegionFunction);
171}