1use std::str::FromStr;
16
17use api::v1::region::{StrictWindow, compact_request};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23 UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ResultExt, ensure};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40#[admin_fn(
41 name = FlushTableFunction,
42 display_name = flush_table,
43 sig_fn = flush_signature,
44 ret = uint64
45)]
46pub(crate) async fn flush_table(
47 table_mutation_handler: &TableMutationHandlerRef,
48 query_ctx: &QueryContextRef,
49 params: &[ValueRef<'_>],
50) -> Result<Value> {
51 ensure!(
52 params.len() == 1,
53 InvalidFuncArgsSnafu {
54 err_msg: format!(
55 "The length of the args is not correct, expect 1, have: {}",
56 params.len()
57 ),
58 }
59 );
60
61 let ValueRef::String(table_name) = params[0] else {
62 return UnsupportedInputDataTypeSnafu {
63 function: "flush_table",
64 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65 }
66 .fail();
67 };
68
69 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70 .map_err(BoxedError::new)
71 .context(TableMutationSnafu)?;
72
73 let affected_rows = table_mutation_handler
74 .flush(
75 FlushTableRequest {
76 catalog_name,
77 schema_name,
78 table_name,
79 },
80 query_ctx.clone(),
81 )
82 .await?;
83
84 Ok(Value::from(affected_rows as u64))
85}
86
87#[admin_fn(
88 name = CompactTableFunction,
89 display_name = compact_table,
90 sig_fn = compact_signature,
91 ret = uint64
92)]
93pub(crate) async fn compact_table(
94 table_mutation_handler: &TableMutationHandlerRef,
95 query_ctx: &QueryContextRef,
96 params: &[ValueRef<'_>],
97) -> Result<Value> {
98 let request = parse_compact_params(params, query_ctx)?;
99 info!("Compact table request: {:?}", request);
100
101 let affected_rows = table_mutation_handler
102 .compact(request, query_ctx.clone())
103 .await?;
104
105 Ok(Value::from(affected_rows as u64))
106}
107
108fn flush_signature() -> Signature {
109 Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
110}
111
112fn compact_signature() -> Signature {
113 Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
114}
115
116fn parse_compact_params(
121 params: &[ValueRef<'_>],
122 query_ctx: &QueryContextRef,
123) -> Result<CompactTableRequest> {
124 ensure!(
125 !params.is_empty(),
126 InvalidFuncArgsSnafu {
127 err_msg: "Args cannot be empty",
128 }
129 );
130
131 let (table_name, compact_type) = match params {
132 [ValueRef::String(table_name)] => (
133 table_name,
134 compact_request::Options::Regular(Default::default()),
135 ),
136 [
137 ValueRef::String(table_name),
138 ValueRef::String(compact_ty_str),
139 ] => {
140 let compact_type = parse_compact_type(compact_ty_str, None)?;
141 (table_name, compact_type)
142 }
143
144 [
145 ValueRef::String(table_name),
146 ValueRef::String(compact_ty_str),
147 ValueRef::String(options_str),
148 ] => {
149 let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
150 (table_name, compact_type)
151 }
152 _ => {
153 return UnsupportedInputDataTypeSnafu {
154 function: "compact_table",
155 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
156 }
157 .fail();
158 }
159 };
160
161 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
162 .map_err(BoxedError::new)
163 .context(TableMutationSnafu)?;
164
165 Ok(CompactTableRequest {
166 catalog_name,
167 schema_name,
168 table_name,
169 compact_options: compact_type,
170 })
171}
172
173fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
176 if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
177 | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
178 {
179 let window_seconds = option
180 .map(|v| {
181 i64::from_str(v).map_err(|_| {
182 InvalidFuncArgsSnafu {
183 err_msg: format!(
184 "Compact window is expected to be a valid number, provided: {}",
185 v
186 ),
187 }
188 .build()
189 })
190 })
191 .transpose()?
192 .unwrap_or(0);
193
194 Ok(compact_request::Options::StrictWindow(StrictWindow {
195 window_seconds,
196 }))
197 } else {
198 Ok(compact_request::Options::Regular(Default::default()))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205
206 use api::v1::region::compact_request::Options;
207 use arrow::array::StringArray;
208 use arrow::datatypes::{DataType, Field};
209 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
210 use datafusion_expr::ColumnarValue;
211 use session::context::QueryContext;
212
213 use super::*;
214 use crate::function::FunctionContext;
215 use crate::function_factory::ScalarFunctionFactory;
216
217 macro_rules! define_table_function_test {
218 ($name: ident, $func: ident) => {
219 paste::paste!{
220 #[test]
221 fn [<test_ $name _misc>]() {
222 let factory: ScalarFunctionFactory = $func::factory().into();
223 let f = factory.provide(FunctionContext::mock());
224 assert_eq!(stringify!($name), f.name());
225 assert_eq!(
226 DataType::UInt64,
227 f.return_type(&[]).unwrap()
228 );
229 assert!(matches!(f.signature(),
230 datafusion_expr::Signature {
231 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
232 volatility: datafusion_expr::Volatility::Immutable
233 } if valid_types == &vec![ArrowDataType::Utf8]));
234 }
235
236 #[tokio::test]
237 async fn [<test_ $name _missing_table_mutation>]() {
238 let factory: ScalarFunctionFactory = $func::factory().into();
239 let provider = factory.provide(FunctionContext::default());
240 let f = provider.as_async().unwrap();
241
242 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
243 args: vec![
244 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
245 ],
246 arg_fields: vec![
247 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
248 ],
249 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
250 number_rows: 1,
251 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
252 };
253 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
254 assert_eq!(
255 "Execution error: Handler error: Missing TableMutationHandler, not expected",
256 result.to_string()
257 );
258 }
259
260 #[tokio::test]
261 async fn [<test_ $name>]() {
262 let factory: ScalarFunctionFactory = $func::factory().into();
263 let provider = factory.provide(FunctionContext::mock());
264 let f = provider.as_async().unwrap();
265
266 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
267 args: vec![
268 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
269 ],
270 arg_fields: vec![
271 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
272 ],
273 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
274 number_rows: 1,
275 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
276 };
277 let result = f.invoke_async_with_args(func_args).await.unwrap();
278
279 match result {
280 ColumnarValue::Array(array) => {
281 let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
282 assert_eq!(result_array.value(0), 42u64);
283 }
284 ColumnarValue::Scalar(scalar) => {
285 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
286 }
287 }
288 }
289 }
290 }
291 }
292
293 define_table_function_test!(flush_table, FlushTableFunction);
294
295 fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
296 for (params, expected) in cases {
297 let params = params
298 .iter()
299 .map(|s| ValueRef::String(s))
300 .collect::<Vec<_>>();
301
302 assert_eq!(
303 expected,
304 &parse_compact_params(¶ms, &QueryContext::arc()).unwrap()
305 );
306 }
307 }
308
309 #[test]
310 fn test_parse_compact_params() {
311 check_parse_compact_params(&[
312 (
313 &["table"],
314 CompactTableRequest {
315 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
316 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
317 table_name: "table".to_string(),
318 compact_options: Options::Regular(Default::default()),
319 },
320 ),
321 (
322 &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
323 CompactTableRequest {
324 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
325 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
326 table_name: "table".to_string(),
327 compact_options: Options::Regular(Default::default()),
328 },
329 ),
330 (
331 &[&format!(
332 "{}.{}.table",
333 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
334 )],
335 CompactTableRequest {
336 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
337 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
338 table_name: "table".to_string(),
339 compact_options: Options::Regular(Default::default()),
340 },
341 ),
342 (
343 &["table", "regular"],
344 CompactTableRequest {
345 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
346 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
347 table_name: "table".to_string(),
348 compact_options: Options::Regular(Default::default()),
349 },
350 ),
351 (
352 &["table", "strict_window"],
353 CompactTableRequest {
354 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
355 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
356 table_name: "table".to_string(),
357 compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
358 },
359 ),
360 (
361 &["table", "strict_window", "3600"],
362 CompactTableRequest {
363 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
364 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
365 table_name: "table".to_string(),
366 compact_options: Options::StrictWindow(StrictWindow {
367 window_seconds: 3600,
368 }),
369 },
370 ),
371 (
372 &["table", "regular", "abcd"],
373 CompactTableRequest {
374 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
375 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
376 table_name: "table".to_string(),
377 compact_options: Options::Regular(Default::default()),
378 },
379 ),
380 (
381 &["table", "swcs", "120"],
382 CompactTableRequest {
383 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
384 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
385 table_name: "table".to_string(),
386 compact_options: Options::StrictWindow(StrictWindow {
387 window_seconds: 120,
388 }),
389 },
390 ),
391 ]);
392
393 assert!(
394 parse_compact_params(
395 &["table", "strict_window", "abc"]
396 .into_iter()
397 .map(ValueRef::String)
398 .collect::<Vec<_>>(),
399 &QueryContext::arc(),
400 )
401 .is_err()
402 );
403
404 assert!(
405 parse_compact_params(
406 &["a.b.table", "strict_window", "abc"]
407 .into_iter()
408 .map(ValueRef::String)
409 .collect::<Vec<_>>(),
410 &QueryContext::arc(),
411 )
412 .is_err()
413 );
414 }
415}