1use std::str::FromStr;
16
17use api::v1::region::{StrictWindow, compact_request};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23 UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ResultExt, ensure};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40const DEFAULT_COMPACTION_PARALLELISM: u32 = 1;
41
42#[admin_fn(
43 name = FlushTableFunction,
44 display_name = flush_table,
45 sig_fn = flush_signature,
46 ret = uint64
47)]
48pub(crate) async fn flush_table(
49 table_mutation_handler: &TableMutationHandlerRef,
50 query_ctx: &QueryContextRef,
51 params: &[ValueRef<'_>],
52) -> Result<Value> {
53 ensure!(
54 params.len() == 1,
55 InvalidFuncArgsSnafu {
56 err_msg: format!(
57 "The length of the args is not correct, expect 1, have: {}",
58 params.len()
59 ),
60 }
61 );
62
63 let ValueRef::String(table_name) = params[0] else {
64 return UnsupportedInputDataTypeSnafu {
65 function: "flush_table",
66 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
67 }
68 .fail();
69 };
70
71 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
72 .map_err(BoxedError::new)
73 .context(TableMutationSnafu)?;
74
75 let affected_rows = table_mutation_handler
76 .flush(
77 FlushTableRequest {
78 catalog_name,
79 schema_name,
80 table_name,
81 },
82 query_ctx.clone(),
83 )
84 .await?;
85
86 Ok(Value::from(affected_rows as u64))
87}
88
89#[admin_fn(
90 name = CompactTableFunction,
91 display_name = compact_table,
92 sig_fn = compact_signature,
93 ret = uint64
94)]
95pub(crate) async fn compact_table(
96 table_mutation_handler: &TableMutationHandlerRef,
97 query_ctx: &QueryContextRef,
98 params: &[ValueRef<'_>],
99) -> Result<Value> {
100 let request = parse_compact_request(params, query_ctx)?;
101 info!("Compact table request: {:?}", request);
102
103 let affected_rows = table_mutation_handler
104 .compact(request, query_ctx.clone())
105 .await?;
106
107 Ok(Value::from(affected_rows as u64))
108}
109
110fn flush_signature() -> Signature {
111 Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
112}
113
114fn compact_signature() -> Signature {
115 Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
116}
117
118fn parse_compact_request(
125 params: &[ValueRef<'_>],
126 query_ctx: &QueryContextRef,
127) -> Result<CompactTableRequest> {
128 ensure!(
129 !params.is_empty() && params.len() <= 3,
130 InvalidFuncArgsSnafu {
131 err_msg: format!(
132 "The length of the args is not correct, expect 1-4, have: {}",
133 params.len()
134 ),
135 }
136 );
137
138 let (table_name, compact_type, parallelism) = match params {
139 [ValueRef::String(table_name)] => (
141 table_name,
142 compact_request::Options::Regular(Default::default()),
143 DEFAULT_COMPACTION_PARALLELISM,
144 ),
145 [
147 ValueRef::String(table_name),
148 ValueRef::String(compact_ty_str),
149 ] => {
150 let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?;
151 (table_name, compact_type, parallelism)
152 }
153 [
155 ValueRef::String(table_name),
156 ValueRef::String(compact_ty_str),
157 ValueRef::String(options_str),
158 ] => {
159 let (compact_type, parallelism) =
160 parse_compact_options(compact_ty_str, Some(options_str))?;
161 (table_name, compact_type, parallelism)
162 }
163 _ => {
164 return UnsupportedInputDataTypeSnafu {
165 function: "compact_table",
166 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
167 }
168 .fail();
169 }
170 };
171
172 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
173 .map_err(BoxedError::new)
174 .context(TableMutationSnafu)?;
175
176 Ok(CompactTableRequest {
177 catalog_name,
178 schema_name,
179 table_name,
180 compact_options: compact_type,
181 parallelism,
182 })
183}
184
185fn parse_compact_options(
188 type_str: &str,
189 option: Option<&str>,
190) -> Result<(compact_request::Options, u32)> {
191 if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
192 | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
193 {
194 let Some(option_str) = option else {
195 return Ok((
196 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }),
197 DEFAULT_COMPACTION_PARALLELISM,
198 ));
199 };
200
201 if let Ok(window_seconds) = i64::from_str(option_str) {
203 return Ok((
204 compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
205 DEFAULT_COMPACTION_PARALLELISM,
206 ));
207 };
208
209 let mut window_seconds = 0i64;
211 let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
212
213 let pairs: Vec<&str> = option_str.split(',').collect();
214 for pair in pairs {
215 let kv: Vec<&str> = pair.trim().split('=').collect();
216 if kv.len() != 2 {
217 return InvalidFuncArgsSnafu {
218 err_msg: format!("Invalid key-value pair: {}", pair.trim()),
219 }
220 .fail();
221 }
222
223 let key = kv[0].trim();
224 let value = kv[1].trim();
225
226 match key {
227 "window" | "window_seconds" => {
228 window_seconds = i64::from_str(value).map_err(|_| {
229 InvalidFuncArgsSnafu {
230 err_msg: format!("Invalid value for window: {}", value),
231 }
232 .build()
233 })?;
234 }
235 "parallelism" => {
236 parallelism = value.parse::<u32>().map_err(|_| {
237 InvalidFuncArgsSnafu {
238 err_msg: format!("Invalid value for parallelism: {}", value),
239 }
240 .build()
241 })?;
242 }
243 _ => {
244 return InvalidFuncArgsSnafu {
245 err_msg: format!("Unknown parameter: {}", key),
246 }
247 .fail();
248 }
249 }
250 }
251
252 Ok((
253 compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
254 parallelism,
255 ))
256 } else {
257 let Some(option_str) = option else {
259 return Ok((
260 compact_request::Options::Regular(Default::default()),
261 DEFAULT_COMPACTION_PARALLELISM,
262 ));
263 };
264
265 let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
266 let pairs: Vec<&str> = option_str.split(',').collect();
267 for pair in pairs {
268 let kv: Vec<&str> = pair.trim().split('=').collect();
269 if kv.len() != 2 {
270 return InvalidFuncArgsSnafu {
271 err_msg: format!("Invalid key-value pair: {}", pair.trim()),
272 }
273 .fail();
274 }
275
276 let key = kv[0].trim();
277 let value = kv[1].trim();
278
279 match key {
280 "parallelism" => {
281 parallelism = value.parse::<u32>().map_err(|_| {
282 InvalidFuncArgsSnafu {
283 err_msg: format!("Invalid value for parallelism: {}", value),
284 }
285 .build()
286 })?;
287 }
288 _ => {
289 return InvalidFuncArgsSnafu {
290 err_msg: format!("Unknown parameter: {}", key),
291 }
292 .fail();
293 }
294 }
295 }
296
297 Ok((
298 compact_request::Options::Regular(Default::default()),
299 parallelism,
300 ))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use std::sync::Arc;
307
308 use api::v1::region::compact_request::Options;
309 use arrow::array::StringArray;
310 use arrow::datatypes::{DataType, Field};
311 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
312 use datafusion_expr::ColumnarValue;
313 use session::context::QueryContext;
314
315 use super::*;
316 use crate::function::FunctionContext;
317 use crate::function_factory::ScalarFunctionFactory;
318
319 macro_rules! define_table_function_test {
320 ($name: ident, $func: ident) => {
321 paste::paste!{
322 #[test]
323 fn [<test_ $name _misc>]() {
324 let factory: ScalarFunctionFactory = $func::factory().into();
325 let f = factory.provide(FunctionContext::mock());
326 assert_eq!(stringify!($name), f.name());
327 assert_eq!(
328 DataType::UInt64,
329 f.return_type(&[]).unwrap()
330 );
331 assert!(matches!(f.signature(),
332 datafusion_expr::Signature {
333 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
334 volatility: datafusion_expr::Volatility::Immutable
335 } if valid_types == &vec![ArrowDataType::Utf8]));
336 }
337
338 #[tokio::test]
339 async fn [<test_ $name _missing_table_mutation>]() {
340 let factory: ScalarFunctionFactory = $func::factory().into();
341 let provider = factory.provide(FunctionContext::default());
342 let f = provider.as_async().unwrap();
343
344 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
345 args: vec![
346 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
347 ],
348 arg_fields: vec![
349 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
350 ],
351 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
352 number_rows: 1,
353 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
354 };
355 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
356 assert_eq!(
357 "Execution error: Handler error: Missing TableMutationHandler, not expected",
358 result.to_string()
359 );
360 }
361
362 #[tokio::test]
363 async fn [<test_ $name>]() {
364 let factory: ScalarFunctionFactory = $func::factory().into();
365 let provider = factory.provide(FunctionContext::mock());
366 let f = provider.as_async().unwrap();
367
368 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
369 args: vec![
370 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
371 ],
372 arg_fields: vec![
373 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
374 ],
375 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
376 number_rows: 1,
377 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
378 };
379 let result = f.invoke_async_with_args(func_args).await.unwrap();
380
381 match result {
382 ColumnarValue::Array(array) => {
383 let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
384 assert_eq!(result_array.value(0), 42u64);
385 }
386 ColumnarValue::Scalar(scalar) => {
387 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
388 }
389 }
390 }
391 }
392 }
393 }
394
395 define_table_function_test!(flush_table, FlushTableFunction);
396
397 fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
398 for (params, expected) in cases {
399 let params = params
400 .iter()
401 .map(|s| ValueRef::String(s))
402 .collect::<Vec<_>>();
403
404 assert_eq!(
405 expected,
406 &parse_compact_request(¶ms, &QueryContext::arc()).unwrap()
407 );
408 }
409 }
410
411 #[test]
412 fn test_parse_compact_params() {
413 check_parse_compact_params(&[
414 (
415 &["table"],
416 CompactTableRequest {
417 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
418 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
419 table_name: "table".to_string(),
420 compact_options: Options::Regular(Default::default()),
421 parallelism: 1,
422 },
423 ),
424 (
425 &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
426 CompactTableRequest {
427 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
428 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
429 table_name: "table".to_string(),
430 compact_options: Options::Regular(Default::default()),
431 parallelism: 1,
432 },
433 ),
434 (
435 &[&format!(
436 "{}.{}.table",
437 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
438 )],
439 CompactTableRequest {
440 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
441 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
442 table_name: "table".to_string(),
443 compact_options: Options::Regular(Default::default()),
444 parallelism: 1,
445 },
446 ),
447 (
448 &["table", "regular"],
449 CompactTableRequest {
450 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
451 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
452 table_name: "table".to_string(),
453 compact_options: Options::Regular(Default::default()),
454 parallelism: 1,
455 },
456 ),
457 (
458 &["table", "strict_window"],
459 CompactTableRequest {
460 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
461 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
462 table_name: "table".to_string(),
463 compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
464 parallelism: 1,
465 },
466 ),
467 (
468 &["table", "strict_window", "3600"],
469 CompactTableRequest {
470 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
471 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
472 table_name: "table".to_string(),
473 compact_options: Options::StrictWindow(StrictWindow {
474 window_seconds: 3600,
475 }),
476 parallelism: 1,
477 },
478 ),
479 (
480 &["table", "swcs", "120"],
481 CompactTableRequest {
482 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
483 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
484 table_name: "table".to_string(),
485 compact_options: Options::StrictWindow(StrictWindow {
486 window_seconds: 120,
487 }),
488 parallelism: 1,
489 },
490 ),
491 (
493 &["table", "regular", "parallelism=4"],
494 CompactTableRequest {
495 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
496 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
497 table_name: "table".to_string(),
498 compact_options: Options::Regular(Default::default()),
499 parallelism: 4,
500 },
501 ),
502 (
503 &["table", "strict_window", "window=3600,parallelism=2"],
504 CompactTableRequest {
505 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
506 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
507 table_name: "table".to_string(),
508 compact_options: Options::StrictWindow(StrictWindow {
509 window_seconds: 3600,
510 }),
511 parallelism: 2,
512 },
513 ),
514 (
515 &["table", "strict_window", "window=3600"],
516 CompactTableRequest {
517 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
518 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
519 table_name: "table".to_string(),
520 compact_options: Options::StrictWindow(StrictWindow {
521 window_seconds: 3600,
522 }),
523 parallelism: 1,
524 },
525 ),
526 (
527 &["table", "strict_window", "window_seconds=7200"],
528 CompactTableRequest {
529 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
530 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
531 table_name: "table".to_string(),
532 compact_options: Options::StrictWindow(StrictWindow {
533 window_seconds: 7200,
534 }),
535 parallelism: 1,
536 },
537 ),
538 (
539 &["table", "strict_window", "window=1800"],
540 CompactTableRequest {
541 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
542 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
543 table_name: "table".to_string(),
544 compact_options: Options::StrictWindow(StrictWindow {
545 window_seconds: 1800,
546 }),
547 parallelism: 1,
548 },
549 ),
550 (
551 &["table", "regular", "parallelism=8"],
552 CompactTableRequest {
553 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
554 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
555 table_name: "table".to_string(),
556 compact_options: Options::Regular(Default::default()),
557 parallelism: 8,
558 },
559 ),
560 ]);
561
562 assert!(
563 parse_compact_request(
564 &["table", "strict_window", "abc"]
565 .into_iter()
566 .map(ValueRef::String)
567 .collect::<Vec<_>>(),
568 &QueryContext::arc(),
569 )
570 .is_err()
571 );
572
573 assert!(
574 parse_compact_request(
575 &["a.b.table", "strict_window", "abc"]
576 .into_iter()
577 .map(ValueRef::String)
578 .collect::<Vec<_>>(),
579 &QueryContext::arc(),
580 )
581 .is_err()
582 );
583
584 assert!(
586 parse_compact_request(
587 &["table", "regular", "options", "invalid"]
588 .into_iter()
589 .map(ValueRef::String)
590 .collect::<Vec<_>>(),
591 &QueryContext::arc(),
592 )
593 .is_err()
594 );
595
596 assert!(
598 parse_compact_request(
599 &["table", "regular", "options", "4", "extra"]
600 .into_iter()
601 .map(ValueRef::String)
602 .collect::<Vec<_>>(),
603 &QueryContext::arc(),
604 )
605 .is_err()
606 );
607
608 assert!(
610 parse_compact_request(
611 &["table", "strict_window", "window"]
612 .into_iter()
613 .map(ValueRef::String)
614 .collect::<Vec<_>>(),
615 &QueryContext::arc(),
616 )
617 .is_err()
618 );
619
620 assert!(
622 parse_compact_request(
623 &["table", "strict_window", "invalid_key=123"]
624 .into_iter()
625 .map(ValueRef::String)
626 .collect::<Vec<_>>(),
627 &QueryContext::arc(),
628 )
629 .is_err()
630 );
631
632 assert!(
633 parse_compact_request(
634 &["table", "regular", "abcd"]
635 .into_iter()
636 .map(ValueRef::String)
637 .collect::<Vec<_>>(),
638 &QueryContext::arc(),
639 )
640 .is_err()
641 );
642
643 assert!(
645 parse_compact_request(
646 &["table", "strict_window", "window=abc"]
647 .into_iter()
648 .map(ValueRef::String)
649 .collect::<Vec<_>>(),
650 &QueryContext::arc(),
651 )
652 .is_err()
653 );
654
655 assert!(
657 parse_compact_request(
658 &["table", "strict_window", "parallelism=abc"]
659 .into_iter()
660 .map(ValueRef::String)
661 .collect::<Vec<_>>(),
662 &QueryContext::arc(),
663 )
664 .is_err()
665 );
666 }
667}