common_function/admin/
flush_compact_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use api::v1::region::{StrictWindow, compact_request};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23    UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ResultExt, ensure};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35/// Compact type: strict window.
36const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37/// Compact type: strict window (short name).
38const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40const DEFAULT_COMPACTION_PARALLELISM: u32 = 1;
41
42#[admin_fn(
43    name = FlushTableFunction,
44    display_name = flush_table,
45    sig_fn = flush_signature,
46    ret = uint64
47)]
48pub(crate) async fn flush_table(
49    table_mutation_handler: &TableMutationHandlerRef,
50    query_ctx: &QueryContextRef,
51    params: &[ValueRef<'_>],
52) -> Result<Value> {
53    ensure!(
54        params.len() == 1,
55        InvalidFuncArgsSnafu {
56            err_msg: format!(
57                "The length of the args is not correct, expect 1, have: {}",
58                params.len()
59            ),
60        }
61    );
62
63    let ValueRef::String(table_name) = params[0] else {
64        return UnsupportedInputDataTypeSnafu {
65            function: "flush_table",
66            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
67        }
68        .fail();
69    };
70
71    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
72        .map_err(BoxedError::new)
73        .context(TableMutationSnafu)?;
74
75    let affected_rows = table_mutation_handler
76        .flush(
77            FlushTableRequest {
78                catalog_name,
79                schema_name,
80                table_name,
81            },
82            query_ctx.clone(),
83        )
84        .await?;
85
86    Ok(Value::from(affected_rows as u64))
87}
88
89#[admin_fn(
90    name = CompactTableFunction,
91    display_name = compact_table,
92    sig_fn = compact_signature,
93    ret = uint64
94)]
95pub(crate) async fn compact_table(
96    table_mutation_handler: &TableMutationHandlerRef,
97    query_ctx: &QueryContextRef,
98    params: &[ValueRef<'_>],
99) -> Result<Value> {
100    let request = parse_compact_request(params, query_ctx)?;
101    info!("Compact table request: {:?}", request);
102
103    let affected_rows = table_mutation_handler
104        .compact(request, query_ctx.clone())
105        .await?;
106
107    Ok(Value::from(affected_rows as u64))
108}
109
110fn flush_signature() -> Signature {
111    Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
112}
113
114fn compact_signature() -> Signature {
115    Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
116}
117
118/// Parses `compact_table` UDF parameters. This function accepts following combinations:
119/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
120/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
121/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
122///   - For `twcs`, it accepts `parallelism=[N]` where N is an unsigned 32 bits number
123///   - For `swcs`, it accepts two numeric parameter: `parallelism` and `window`.
124fn parse_compact_request(
125    params: &[ValueRef<'_>],
126    query_ctx: &QueryContextRef,
127) -> Result<CompactTableRequest> {
128    ensure!(
129        !params.is_empty() && params.len() <= 3,
130        InvalidFuncArgsSnafu {
131            err_msg: format!(
132                "The length of the args is not correct, expect 1-4, have: {}",
133                params.len()
134            ),
135        }
136    );
137
138    let (table_name, compact_type, parallelism) = match params {
139        // 1. Only table name, strategy defaults to twcs and default parallelism.
140        [ValueRef::String(table_name)] => (
141            table_name,
142            compact_request::Options::Regular(Default::default()),
143            DEFAULT_COMPACTION_PARALLELISM,
144        ),
145        // 2. Both table name and strategy are provided.
146        [
147            ValueRef::String(table_name),
148            ValueRef::String(compact_ty_str),
149        ] => {
150            let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?;
151            (table_name, compact_type, parallelism)
152        }
153        // 3. Table name, strategy and strategy specific options
154        [
155            ValueRef::String(table_name),
156            ValueRef::String(compact_ty_str),
157            ValueRef::String(options_str),
158        ] => {
159            let (compact_type, parallelism) =
160                parse_compact_options(compact_ty_str, Some(options_str))?;
161            (table_name, compact_type, parallelism)
162        }
163        _ => {
164            return UnsupportedInputDataTypeSnafu {
165                function: "compact_table",
166                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
167            }
168            .fail();
169        }
170    };
171
172    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
173        .map_err(BoxedError::new)
174        .context(TableMutationSnafu)?;
175
176    Ok(CompactTableRequest {
177        catalog_name,
178        schema_name,
179        table_name,
180        compact_options: compact_type,
181        parallelism,
182    })
183}
184
185/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chosen,
186/// otherwise choose regular (TWCS) compaction.
187fn parse_compact_options(
188    type_str: &str,
189    option: Option<&str>,
190) -> Result<(compact_request::Options, u32)> {
191    if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
192        | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
193    {
194        let Some(option_str) = option else {
195            return Ok((
196                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }),
197                DEFAULT_COMPACTION_PARALLELISM,
198            ));
199        };
200
201        // For compatibility, accepts single number as window size.
202        if let Ok(window_seconds) = i64::from_str(option_str) {
203            return Ok((
204                compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
205                DEFAULT_COMPACTION_PARALLELISM,
206            ));
207        };
208
209        // Parse keyword arguments in forms: `key1=value1,key2=value2`
210        let mut window_seconds = 0i64;
211        let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
212
213        let pairs: Vec<&str> = option_str.split(',').collect();
214        for pair in pairs {
215            let kv: Vec<&str> = pair.trim().split('=').collect();
216            if kv.len() != 2 {
217                return InvalidFuncArgsSnafu {
218                    err_msg: format!("Invalid key-value pair: {}", pair.trim()),
219                }
220                .fail();
221            }
222
223            let key = kv[0].trim();
224            let value = kv[1].trim();
225
226            match key {
227                "window" | "window_seconds" => {
228                    window_seconds = i64::from_str(value).map_err(|_| {
229                        InvalidFuncArgsSnafu {
230                            err_msg: format!("Invalid value for window: {}", value),
231                        }
232                        .build()
233                    })?;
234                }
235                "parallelism" => {
236                    parallelism = value.parse::<u32>().map_err(|_| {
237                        InvalidFuncArgsSnafu {
238                            err_msg: format!("Invalid value for parallelism: {}", value),
239                        }
240                        .build()
241                    })?;
242                }
243                _ => {
244                    return InvalidFuncArgsSnafu {
245                        err_msg: format!("Unknown parameter: {}", key),
246                    }
247                    .fail();
248                }
249            }
250        }
251
252        Ok((
253            compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
254            parallelism,
255        ))
256    } else {
257        // TWCS strategy
258        let Some(option_str) = option else {
259            return Ok((
260                compact_request::Options::Regular(Default::default()),
261                DEFAULT_COMPACTION_PARALLELISM,
262            ));
263        };
264
265        let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
266        let pairs: Vec<&str> = option_str.split(',').collect();
267        for pair in pairs {
268            let kv: Vec<&str> = pair.trim().split('=').collect();
269            if kv.len() != 2 {
270                return InvalidFuncArgsSnafu {
271                    err_msg: format!("Invalid key-value pair: {}", pair.trim()),
272                }
273                .fail();
274            }
275
276            let key = kv[0].trim();
277            let value = kv[1].trim();
278
279            match key {
280                "parallelism" => {
281                    parallelism = value.parse::<u32>().map_err(|_| {
282                        InvalidFuncArgsSnafu {
283                            err_msg: format!("Invalid value for parallelism: {}", value),
284                        }
285                        .build()
286                    })?;
287                }
288                _ => {
289                    return InvalidFuncArgsSnafu {
290                        err_msg: format!("Unknown parameter: {}", key),
291                    }
292                    .fail();
293                }
294            }
295        }
296
297        Ok((
298            compact_request::Options::Regular(Default::default()),
299            parallelism,
300        ))
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use std::sync::Arc;
307
308    use api::v1::region::compact_request::Options;
309    use arrow::array::StringArray;
310    use arrow::datatypes::{DataType, Field};
311    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
312    use datafusion_expr::ColumnarValue;
313    use session::context::QueryContext;
314
315    use super::*;
316    use crate::function::FunctionContext;
317    use crate::function_factory::ScalarFunctionFactory;
318
319    macro_rules! define_table_function_test {
320        ($name: ident, $func: ident) => {
321            paste::paste!{
322                #[test]
323                fn [<test_ $name _misc>]() {
324                    let factory: ScalarFunctionFactory = $func::factory().into();
325                    let f = factory.provide(FunctionContext::mock());
326                    assert_eq!(stringify!($name), f.name());
327                    assert_eq!(
328                        DataType::UInt64,
329                        f.return_type(&[]).unwrap()
330                    );
331                    assert!(matches!(f.signature(),
332                                     datafusion_expr::Signature {
333                                         type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
334                                         volatility: datafusion_expr::Volatility::Immutable
335                                     } if valid_types == &vec![ArrowDataType::Utf8]));
336                }
337
338                #[tokio::test]
339                async fn [<test_ $name _missing_table_mutation>]() {
340                    let factory: ScalarFunctionFactory = $func::factory().into();
341                    let provider = factory.provide(FunctionContext::default());
342                    let f = provider.as_async().unwrap();
343
344                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
345                        args: vec![
346                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
347                        ],
348                        arg_fields: vec![
349                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
350                        ],
351                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
352                        number_rows: 1,
353                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
354                    };
355                    let result = f.invoke_async_with_args(func_args).await.unwrap_err();
356                    assert_eq!(
357                        "Execution error: Handler error: Missing TableMutationHandler, not expected",
358                        result.to_string()
359                    );
360                }
361
362                #[tokio::test]
363                async fn [<test_ $name>]() {
364                    let factory: ScalarFunctionFactory = $func::factory().into();
365                    let provider = factory.provide(FunctionContext::mock());
366                    let f = provider.as_async().unwrap();
367
368                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
369                        args: vec![
370                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
371                        ],
372                        arg_fields: vec![
373                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
374                        ],
375                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
376                        number_rows: 1,
377                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
378                    };
379                    let result = f.invoke_async_with_args(func_args).await.unwrap();
380
381                    match result {
382                        ColumnarValue::Array(array) => {
383                            let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
384                            assert_eq!(result_array.value(0), 42u64);
385                        }
386                        ColumnarValue::Scalar(scalar) => {
387                            assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
388                        }
389                    }
390                }
391            }
392        }
393    }
394
395    define_table_function_test!(flush_table, FlushTableFunction);
396
397    fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
398        for (params, expected) in cases {
399            let params = params
400                .iter()
401                .map(|s| ValueRef::String(s))
402                .collect::<Vec<_>>();
403
404            assert_eq!(
405                expected,
406                &parse_compact_request(&params, &QueryContext::arc()).unwrap()
407            );
408        }
409    }
410
411    #[test]
412    fn test_parse_compact_params() {
413        check_parse_compact_params(&[
414            (
415                &["table"],
416                CompactTableRequest {
417                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
418                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
419                    table_name: "table".to_string(),
420                    compact_options: Options::Regular(Default::default()),
421                    parallelism: 1,
422                },
423            ),
424            (
425                &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
426                CompactTableRequest {
427                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
428                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
429                    table_name: "table".to_string(),
430                    compact_options: Options::Regular(Default::default()),
431                    parallelism: 1,
432                },
433            ),
434            (
435                &[&format!(
436                    "{}.{}.table",
437                    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
438                )],
439                CompactTableRequest {
440                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
441                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
442                    table_name: "table".to_string(),
443                    compact_options: Options::Regular(Default::default()),
444                    parallelism: 1,
445                },
446            ),
447            (
448                &["table", "regular"],
449                CompactTableRequest {
450                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
451                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
452                    table_name: "table".to_string(),
453                    compact_options: Options::Regular(Default::default()),
454                    parallelism: 1,
455                },
456            ),
457            (
458                &["table", "strict_window"],
459                CompactTableRequest {
460                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
461                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
462                    table_name: "table".to_string(),
463                    compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
464                    parallelism: 1,
465                },
466            ),
467            (
468                &["table", "strict_window", "3600"],
469                CompactTableRequest {
470                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
471                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
472                    table_name: "table".to_string(),
473                    compact_options: Options::StrictWindow(StrictWindow {
474                        window_seconds: 3600,
475                    }),
476                    parallelism: 1,
477                },
478            ),
479            (
480                &["table", "swcs", "120"],
481                CompactTableRequest {
482                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
483                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
484                    table_name: "table".to_string(),
485                    compact_options: Options::StrictWindow(StrictWindow {
486                        window_seconds: 120,
487                    }),
488                    parallelism: 1,
489                },
490            ),
491            // Test with parallelism parameter
492            (
493                &["table", "regular", "parallelism=4"],
494                CompactTableRequest {
495                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
496                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
497                    table_name: "table".to_string(),
498                    compact_options: Options::Regular(Default::default()),
499                    parallelism: 4,
500                },
501            ),
502            (
503                &["table", "strict_window", "window=3600,parallelism=2"],
504                CompactTableRequest {
505                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
506                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
507                    table_name: "table".to_string(),
508                    compact_options: Options::StrictWindow(StrictWindow {
509                        window_seconds: 3600,
510                    }),
511                    parallelism: 2,
512                },
513            ),
514            (
515                &["table", "strict_window", "window=3600"],
516                CompactTableRequest {
517                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
518                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
519                    table_name: "table".to_string(),
520                    compact_options: Options::StrictWindow(StrictWindow {
521                        window_seconds: 3600,
522                    }),
523                    parallelism: 1,
524                },
525            ),
526            (
527                &["table", "strict_window", "window_seconds=7200"],
528                CompactTableRequest {
529                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
530                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
531                    table_name: "table".to_string(),
532                    compact_options: Options::StrictWindow(StrictWindow {
533                        window_seconds: 7200,
534                    }),
535                    parallelism: 1,
536                },
537            ),
538            (
539                &["table", "strict_window", "window=1800"],
540                CompactTableRequest {
541                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
542                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
543                    table_name: "table".to_string(),
544                    compact_options: Options::StrictWindow(StrictWindow {
545                        window_seconds: 1800,
546                    }),
547                    parallelism: 1,
548                },
549            ),
550            (
551                &["table", "regular", "parallelism=8"],
552                CompactTableRequest {
553                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
554                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
555                    table_name: "table".to_string(),
556                    compact_options: Options::Regular(Default::default()),
557                    parallelism: 8,
558                },
559            ),
560        ]);
561
562        assert!(
563            parse_compact_request(
564                &["table", "strict_window", "abc"]
565                    .into_iter()
566                    .map(ValueRef::String)
567                    .collect::<Vec<_>>(),
568                &QueryContext::arc(),
569            )
570            .is_err()
571        );
572
573        assert!(
574            parse_compact_request(
575                &["a.b.table", "strict_window", "abc"]
576                    .into_iter()
577                    .map(ValueRef::String)
578                    .collect::<Vec<_>>(),
579                &QueryContext::arc(),
580            )
581            .is_err()
582        );
583
584        // Test invalid parallelism
585        assert!(
586            parse_compact_request(
587                &["table", "regular", "options", "invalid"]
588                    .into_iter()
589                    .map(ValueRef::String)
590                    .collect::<Vec<_>>(),
591                &QueryContext::arc(),
592            )
593            .is_err()
594        );
595
596        // Test too many parameters
597        assert!(
598            parse_compact_request(
599                &["table", "regular", "options", "4", "extra"]
600                    .into_iter()
601                    .map(ValueRef::String)
602                    .collect::<Vec<_>>(),
603                &QueryContext::arc(),
604            )
605            .is_err()
606        );
607
608        // Test invalid keyword argument format
609        assert!(
610            parse_compact_request(
611                &["table", "strict_window", "window"]
612                    .into_iter()
613                    .map(ValueRef::String)
614                    .collect::<Vec<_>>(),
615                &QueryContext::arc(),
616            )
617            .is_err()
618        );
619
620        // Test invalid keyword
621        assert!(
622            parse_compact_request(
623                &["table", "strict_window", "invalid_key=123"]
624                    .into_iter()
625                    .map(ValueRef::String)
626                    .collect::<Vec<_>>(),
627                &QueryContext::arc(),
628            )
629            .is_err()
630        );
631
632        assert!(
633            parse_compact_request(
634                &["table", "regular", "abcd"]
635                    .into_iter()
636                    .map(ValueRef::String)
637                    .collect::<Vec<_>>(),
638                &QueryContext::arc(),
639            )
640            .is_err()
641        );
642
643        // Test invalid window value
644        assert!(
645            parse_compact_request(
646                &["table", "strict_window", "window=abc"]
647                    .into_iter()
648                    .map(ValueRef::String)
649                    .collect::<Vec<_>>(),
650                &QueryContext::arc(),
651            )
652            .is_err()
653        );
654
655        // Test invalid parallelism in options string
656        assert!(
657            parse_compact_request(
658                &["table", "strict_window", "parallelism=abc"]
659                    .into_iter()
660                    .map(ValueRef::String)
661                    .collect::<Vec<_>>(),
662                &QueryContext::arc(),
663            )
664            .is_err()
665        );
666    }
667}