1use std::str::FromStr;
16
17use api::v1::region::{compact_request, StrictWindow};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23 UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ensure, ResultExt};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40#[admin_fn(
41 name = FlushTableFunction,
42 display_name = flush_table,
43 sig_fn = flush_signature,
44 ret = uint64
45)]
46pub(crate) async fn flush_table(
47 table_mutation_handler: &TableMutationHandlerRef,
48 query_ctx: &QueryContextRef,
49 params: &[ValueRef<'_>],
50) -> Result<Value> {
51 ensure!(
52 params.len() == 1,
53 InvalidFuncArgsSnafu {
54 err_msg: format!(
55 "The length of the args is not correct, expect 1, have: {}",
56 params.len()
57 ),
58 }
59 );
60
61 let ValueRef::String(table_name) = params[0] else {
62 return UnsupportedInputDataTypeSnafu {
63 function: "flush_table",
64 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65 }
66 .fail();
67 };
68
69 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70 .map_err(BoxedError::new)
71 .context(TableMutationSnafu)?;
72
73 let affected_rows = table_mutation_handler
74 .flush(
75 FlushTableRequest {
76 catalog_name,
77 schema_name,
78 table_name,
79 },
80 query_ctx.clone(),
81 )
82 .await?;
83
84 Ok(Value::from(affected_rows as u64))
85}
86
87#[admin_fn(
88 name = CompactTableFunction,
89 display_name = compact_table,
90 sig_fn = compact_signature,
91 ret = uint64
92)]
93pub(crate) async fn compact_table(
94 table_mutation_handler: &TableMutationHandlerRef,
95 query_ctx: &QueryContextRef,
96 params: &[ValueRef<'_>],
97) -> Result<Value> {
98 let request = parse_compact_params(params, query_ctx)?;
99 info!("Compact table request: {:?}", request);
100
101 let affected_rows = table_mutation_handler
102 .compact(request, query_ctx.clone())
103 .await?;
104
105 Ok(Value::from(affected_rows as u64))
106}
107
108fn flush_signature() -> Signature {
109 Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
110}
111
112fn compact_signature() -> Signature {
113 Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
114}
115
116fn parse_compact_params(
121 params: &[ValueRef<'_>],
122 query_ctx: &QueryContextRef,
123) -> Result<CompactTableRequest> {
124 ensure!(
125 !params.is_empty(),
126 InvalidFuncArgsSnafu {
127 err_msg: "Args cannot be empty",
128 }
129 );
130
131 let (table_name, compact_type) = match params {
132 [ValueRef::String(table_name)] => (
133 table_name,
134 compact_request::Options::Regular(Default::default()),
135 ),
136 [ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
137 let compact_type = parse_compact_type(compact_ty_str, None)?;
138 (table_name, compact_type)
139 }
140
141 [ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
142 {
143 let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
144 (table_name, compact_type)
145 }
146 _ => {
147 return UnsupportedInputDataTypeSnafu {
148 function: "compact_table",
149 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
150 }
151 .fail()
152 }
153 };
154
155 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
156 .map_err(BoxedError::new)
157 .context(TableMutationSnafu)?;
158
159 Ok(CompactTableRequest {
160 catalog_name,
161 schema_name,
162 table_name,
163 compact_options: compact_type,
164 })
165}
166
167fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
170 if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
171 | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
172 {
173 let window_seconds = option
174 .map(|v| {
175 i64::from_str(v).map_err(|_| {
176 InvalidFuncArgsSnafu {
177 err_msg: format!(
178 "Compact window is expected to be a valid number, provided: {}",
179 v
180 ),
181 }
182 .build()
183 })
184 })
185 .transpose()?
186 .unwrap_or(0);
187
188 Ok(compact_request::Options::StrictWindow(StrictWindow {
189 window_seconds,
190 }))
191 } else {
192 Ok(compact_request::Options::Regular(Default::default()))
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::sync::Arc;
199
200 use api::v1::region::compact_request::Options;
201 use arrow::array::StringArray;
202 use arrow::datatypes::{DataType, Field};
203 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
204 use datafusion_expr::ColumnarValue;
205 use session::context::QueryContext;
206
207 use super::*;
208 use crate::function::FunctionContext;
209 use crate::function_factory::ScalarFunctionFactory;
210
211 macro_rules! define_table_function_test {
212 ($name: ident, $func: ident) => {
213 paste::paste!{
214 #[test]
215 fn [<test_ $name _misc>]() {
216 let factory: ScalarFunctionFactory = $func::factory().into();
217 let f = factory.provide(FunctionContext::mock());
218 assert_eq!(stringify!($name), f.name());
219 assert_eq!(
220 DataType::UInt64,
221 f.return_type(&[]).unwrap()
222 );
223 assert!(matches!(f.signature(),
224 datafusion_expr::Signature {
225 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
226 volatility: datafusion_expr::Volatility::Immutable
227 } if valid_types == &vec![ArrowDataType::Utf8]));
228 }
229
230 #[tokio::test]
231 async fn [<test_ $name _missing_table_mutation>]() {
232 let factory: ScalarFunctionFactory = $func::factory().into();
233 let provider = factory.provide(FunctionContext::default());
234 let f = provider.as_async().unwrap();
235
236 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
237 args: vec![
238 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
239 ],
240 arg_fields: vec![
241 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
242 ],
243 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
244 number_rows: 1,
245 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
246 };
247 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
248 assert_eq!(
249 "Execution error: Handler error: Missing TableMutationHandler, not expected",
250 result.to_string()
251 );
252 }
253
254 #[tokio::test]
255 async fn [<test_ $name>]() {
256 let factory: ScalarFunctionFactory = $func::factory().into();
257 let provider = factory.provide(FunctionContext::mock());
258 let f = provider.as_async().unwrap();
259
260 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
261 args: vec![
262 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
263 ],
264 arg_fields: vec![
265 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
266 ],
267 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
268 number_rows: 1,
269 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
270 };
271 let result = f.invoke_async_with_args(func_args).await.unwrap();
272
273 match result {
274 ColumnarValue::Array(array) => {
275 let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
276 assert_eq!(result_array.value(0), 42u64);
277 }
278 ColumnarValue::Scalar(scalar) => {
279 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
280 }
281 }
282 }
283 }
284 }
285 }
286
287 define_table_function_test!(flush_table, FlushTableFunction);
288
289 fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
290 for (params, expected) in cases {
291 let params = params
292 .iter()
293 .map(|s| ValueRef::String(s))
294 .collect::<Vec<_>>();
295
296 assert_eq!(
297 expected,
298 &parse_compact_params(¶ms, &QueryContext::arc()).unwrap()
299 );
300 }
301 }
302
303 #[test]
304 fn test_parse_compact_params() {
305 check_parse_compact_params(&[
306 (
307 &["table"],
308 CompactTableRequest {
309 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
310 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
311 table_name: "table".to_string(),
312 compact_options: Options::Regular(Default::default()),
313 },
314 ),
315 (
316 &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
317 CompactTableRequest {
318 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
319 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
320 table_name: "table".to_string(),
321 compact_options: Options::Regular(Default::default()),
322 },
323 ),
324 (
325 &[&format!(
326 "{}.{}.table",
327 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
328 )],
329 CompactTableRequest {
330 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
331 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
332 table_name: "table".to_string(),
333 compact_options: Options::Regular(Default::default()),
334 },
335 ),
336 (
337 &["table", "regular"],
338 CompactTableRequest {
339 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
340 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
341 table_name: "table".to_string(),
342 compact_options: Options::Regular(Default::default()),
343 },
344 ),
345 (
346 &["table", "strict_window"],
347 CompactTableRequest {
348 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
349 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
350 table_name: "table".to_string(),
351 compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
352 },
353 ),
354 (
355 &["table", "strict_window", "3600"],
356 CompactTableRequest {
357 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
358 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
359 table_name: "table".to_string(),
360 compact_options: Options::StrictWindow(StrictWindow {
361 window_seconds: 3600,
362 }),
363 },
364 ),
365 (
366 &["table", "regular", "abcd"],
367 CompactTableRequest {
368 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
369 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
370 table_name: "table".to_string(),
371 compact_options: Options::Regular(Default::default()),
372 },
373 ),
374 (
375 &["table", "swcs", "120"],
376 CompactTableRequest {
377 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
378 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
379 table_name: "table".to_string(),
380 compact_options: Options::StrictWindow(StrictWindow {
381 window_seconds: 120,
382 }),
383 },
384 ),
385 ]);
386
387 assert!(parse_compact_params(
388 &["table", "strict_window", "abc"]
389 .into_iter()
390 .map(ValueRef::String)
391 .collect::<Vec<_>>(),
392 &QueryContext::arc(),
393 )
394 .is_err());
395
396 assert!(parse_compact_params(
397 &["a.b.table", "strict_window", "abc"]
398 .into_iter()
399 .map(ValueRef::String)
400 .collect::<Vec<_>>(),
401 &QueryContext::arc(),
402 )
403 .is_err());
404 }
405}