common_function/admin/
flush_compact_region.rs1use common_macro::admin_fn;
16use common_query::error::{
17 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use common_query::prelude::{Signature, Volatility};
20use datatypes::prelude::*;
21use session::context::QueryContextRef;
22use snafu::ensure;
23use store_api::storage::RegionId;
24
25use crate::handlers::TableMutationHandlerRef;
26use crate::helper::cast_u64;
27
28macro_rules! define_region_function {
29 ($name: expr, $display_name_str: expr, $display_name: ident) => {
30 #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
32 pub(crate) async fn $display_name(
33 table_mutation_handler: &TableMutationHandlerRef,
34 query_ctx: &QueryContextRef,
35 params: &[ValueRef<'_>],
36 ) -> Result<Value> {
37 ensure!(
38 params.len() == 1,
39 InvalidFuncArgsSnafu {
40 err_msg: format!(
41 "The length of the args is not correct, expect 1, have: {}",
42 params.len()
43 ),
44 }
45 );
46
47 let Some(region_id) = cast_u64(¶ms[0])? else {
48 return UnsupportedInputDataTypeSnafu {
49 function: stringify!($display_name_str),
50 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
51 }
52 .fail();
53 };
54
55 let affected_rows = table_mutation_handler
56 .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
57 .await?;
58
59 Ok(Value::from(affected_rows as u64))
60 }
61 };
62}
63
64define_region_function!(FlushRegionFunction, flush_region, flush_region);
65
66define_region_function!(CompactRegionFunction, compact_region, compact_region);
67
68fn signature() -> Signature {
69 Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
70}
71
72#[cfg(test)]
73mod tests {
74 use std::sync::Arc;
75
76 use common_query::prelude::TypeSignature;
77 use datatypes::vectors::UInt64Vector;
78
79 use super::*;
80 use crate::function::{AsyncFunction, FunctionContext};
81
82 macro_rules! define_region_function_test {
83 ($name: ident, $func: ident) => {
84 paste::paste! {
85 #[test]
86 fn [<test_ $name _misc>]() {
87 let f = $func;
88 assert_eq!(stringify!($name), f.name());
89 assert_eq!(
90 ConcreteDataType::uint64_datatype(),
91 f.return_type(&[]).unwrap()
92 );
93 assert!(matches!(f.signature(),
94 Signature {
95 type_signature: TypeSignature::Uniform(1, valid_types),
96 volatility: Volatility::Immutable
97 } if valid_types == ConcreteDataType::numerics()));
98 }
99
100 #[tokio::test]
101 async fn [<test_ $name _missing_table_mutation>]() {
102 let f = $func;
103
104 let args = vec![99];
105
106 let args = args
107 .into_iter()
108 .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
109 .collect::<Vec<_>>();
110
111 let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
112 assert_eq!(
113 "Missing TableMutationHandler, not expected",
114 result.to_string()
115 );
116 }
117
118 #[tokio::test]
119 async fn [<test_ $name>]() {
120 let f = $func;
121
122
123 let args = vec![99];
124
125 let args = args
126 .into_iter()
127 .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
128 .collect::<Vec<_>>();
129
130 let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
131
132 let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
133 assert_eq!(expect, result);
134 }
135 }
136 };
137 }
138
139 define_region_function_test!(flush_region, FlushRegionFunction);
140
141 define_region_function_test!(compact_region, CompactRegionFunction);
142}