common_function/admin/
flush_compact_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use api::v1::region::{StrictWindow, compact_request};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23    UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ResultExt, ensure};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35/// Compact type: strict window.
36const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37/// Compact type: strict window (short name).
38const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40#[admin_fn(
41    name = FlushTableFunction,
42    display_name = flush_table,
43    sig_fn = flush_signature,
44    ret = uint64
45)]
46pub(crate) async fn flush_table(
47    table_mutation_handler: &TableMutationHandlerRef,
48    query_ctx: &QueryContextRef,
49    params: &[ValueRef<'_>],
50) -> Result<Value> {
51    ensure!(
52        params.len() == 1,
53        InvalidFuncArgsSnafu {
54            err_msg: format!(
55                "The length of the args is not correct, expect 1, have: {}",
56                params.len()
57            ),
58        }
59    );
60
61    let ValueRef::String(table_name) = params[0] else {
62        return UnsupportedInputDataTypeSnafu {
63            function: "flush_table",
64            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65        }
66        .fail();
67    };
68
69    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70        .map_err(BoxedError::new)
71        .context(TableMutationSnafu)?;
72
73    let affected_rows = table_mutation_handler
74        .flush(
75            FlushTableRequest {
76                catalog_name,
77                schema_name,
78                table_name,
79            },
80            query_ctx.clone(),
81        )
82        .await?;
83
84    Ok(Value::from(affected_rows as u64))
85}
86
87#[admin_fn(
88    name = CompactTableFunction,
89    display_name = compact_table,
90    sig_fn = compact_signature,
91    ret = uint64
92)]
93pub(crate) async fn compact_table(
94    table_mutation_handler: &TableMutationHandlerRef,
95    query_ctx: &QueryContextRef,
96    params: &[ValueRef<'_>],
97) -> Result<Value> {
98    let request = parse_compact_params(params, query_ctx)?;
99    info!("Compact table request: {:?}", request);
100
101    let affected_rows = table_mutation_handler
102        .compact(request, query_ctx.clone())
103        .await?;
104
105    Ok(Value::from(affected_rows as u64))
106}
107
108fn flush_signature() -> Signature {
109    Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
110}
111
112fn compact_signature() -> Signature {
113    Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
114}
115
116/// Parses `compact_table` UDF parameters. This function accepts following combinations:
117/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
118/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
119/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
120fn parse_compact_params(
121    params: &[ValueRef<'_>],
122    query_ctx: &QueryContextRef,
123) -> Result<CompactTableRequest> {
124    ensure!(
125        !params.is_empty(),
126        InvalidFuncArgsSnafu {
127            err_msg: "Args cannot be empty",
128        }
129    );
130
131    let (table_name, compact_type) = match params {
132        [ValueRef::String(table_name)] => (
133            table_name,
134            compact_request::Options::Regular(Default::default()),
135        ),
136        [
137            ValueRef::String(table_name),
138            ValueRef::String(compact_ty_str),
139        ] => {
140            let compact_type = parse_compact_type(compact_ty_str, None)?;
141            (table_name, compact_type)
142        }
143
144        [
145            ValueRef::String(table_name),
146            ValueRef::String(compact_ty_str),
147            ValueRef::String(options_str),
148        ] => {
149            let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
150            (table_name, compact_type)
151        }
152        _ => {
153            return UnsupportedInputDataTypeSnafu {
154                function: "compact_table",
155                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
156            }
157            .fail();
158        }
159    };
160
161    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
162        .map_err(BoxedError::new)
163        .context(TableMutationSnafu)?;
164
165    Ok(CompactTableRequest {
166        catalog_name,
167        schema_name,
168        table_name,
169        compact_options: compact_type,
170    })
171}
172
173/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose,
174/// otherwise choose regular (TWCS) compaction.
175fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
176    if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
177        | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
178    {
179        let window_seconds = option
180            .map(|v| {
181                i64::from_str(v).map_err(|_| {
182                    InvalidFuncArgsSnafu {
183                        err_msg: format!(
184                            "Compact window is expected to be a valid number, provided: {}",
185                            v
186                        ),
187                    }
188                    .build()
189                })
190            })
191            .transpose()?
192            .unwrap_or(0);
193
194        Ok(compact_request::Options::StrictWindow(StrictWindow {
195            window_seconds,
196        }))
197    } else {
198        Ok(compact_request::Options::Regular(Default::default()))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::sync::Arc;
205
206    use api::v1::region::compact_request::Options;
207    use arrow::array::StringArray;
208    use arrow::datatypes::{DataType, Field};
209    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
210    use datafusion_expr::ColumnarValue;
211    use session::context::QueryContext;
212
213    use super::*;
214    use crate::function::FunctionContext;
215    use crate::function_factory::ScalarFunctionFactory;
216
217    macro_rules! define_table_function_test {
218        ($name: ident, $func: ident) => {
219            paste::paste!{
220                #[test]
221                fn [<test_ $name _misc>]() {
222                    let factory: ScalarFunctionFactory = $func::factory().into();
223                    let f = factory.provide(FunctionContext::mock());
224                    assert_eq!(stringify!($name), f.name());
225                    assert_eq!(
226                        DataType::UInt64,
227                        f.return_type(&[]).unwrap()
228                    );
229                    assert!(matches!(f.signature(),
230                                     datafusion_expr::Signature {
231                                         type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
232                                         volatility: datafusion_expr::Volatility::Immutable
233                                     } if valid_types == &vec![ArrowDataType::Utf8]));
234                }
235
236                #[tokio::test]
237                async fn [<test_ $name _missing_table_mutation>]() {
238                    let factory: ScalarFunctionFactory = $func::factory().into();
239                    let provider = factory.provide(FunctionContext::default());
240                    let f = provider.as_async().unwrap();
241
242                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
243                        args: vec![
244                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
245                        ],
246                        arg_fields: vec![
247                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
248                        ],
249                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
250                        number_rows: 1,
251                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
252                    };
253                    let result = f.invoke_async_with_args(func_args).await.unwrap_err();
254                    assert_eq!(
255                        "Execution error: Handler error: Missing TableMutationHandler, not expected",
256                        result.to_string()
257                    );
258                }
259
260                #[tokio::test]
261                async fn [<test_ $name>]() {
262                    let factory: ScalarFunctionFactory = $func::factory().into();
263                    let provider = factory.provide(FunctionContext::mock());
264                    let f = provider.as_async().unwrap();
265
266                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
267                        args: vec![
268                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
269                        ],
270                        arg_fields: vec![
271                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
272                        ],
273                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
274                        number_rows: 1,
275                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
276                    };
277                    let result = f.invoke_async_with_args(func_args).await.unwrap();
278
279                    match result {
280                        ColumnarValue::Array(array) => {
281                            let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
282                            assert_eq!(result_array.value(0), 42u64);
283                        }
284                        ColumnarValue::Scalar(scalar) => {
285                            assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
286                        }
287                    }
288                }
289            }
290        }
291    }
292
293    define_table_function_test!(flush_table, FlushTableFunction);
294
295    fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
296        for (params, expected) in cases {
297            let params = params
298                .iter()
299                .map(|s| ValueRef::String(s))
300                .collect::<Vec<_>>();
301
302            assert_eq!(
303                expected,
304                &parse_compact_params(&params, &QueryContext::arc()).unwrap()
305            );
306        }
307    }
308
309    #[test]
310    fn test_parse_compact_params() {
311        check_parse_compact_params(&[
312            (
313                &["table"],
314                CompactTableRequest {
315                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
316                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
317                    table_name: "table".to_string(),
318                    compact_options: Options::Regular(Default::default()),
319                },
320            ),
321            (
322                &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
323                CompactTableRequest {
324                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
325                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
326                    table_name: "table".to_string(),
327                    compact_options: Options::Regular(Default::default()),
328                },
329            ),
330            (
331                &[&format!(
332                    "{}.{}.table",
333                    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
334                )],
335                CompactTableRequest {
336                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
337                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
338                    table_name: "table".to_string(),
339                    compact_options: Options::Regular(Default::default()),
340                },
341            ),
342            (
343                &["table", "regular"],
344                CompactTableRequest {
345                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
346                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
347                    table_name: "table".to_string(),
348                    compact_options: Options::Regular(Default::default()),
349                },
350            ),
351            (
352                &["table", "strict_window"],
353                CompactTableRequest {
354                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
355                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
356                    table_name: "table".to_string(),
357                    compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
358                },
359            ),
360            (
361                &["table", "strict_window", "3600"],
362                CompactTableRequest {
363                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
364                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
365                    table_name: "table".to_string(),
366                    compact_options: Options::StrictWindow(StrictWindow {
367                        window_seconds: 3600,
368                    }),
369                },
370            ),
371            (
372                &["table", "regular", "abcd"],
373                CompactTableRequest {
374                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
375                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
376                    table_name: "table".to_string(),
377                    compact_options: Options::Regular(Default::default()),
378                },
379            ),
380            (
381                &["table", "swcs", "120"],
382                CompactTableRequest {
383                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
384                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
385                    table_name: "table".to_string(),
386                    compact_options: Options::StrictWindow(StrictWindow {
387                        window_seconds: 120,
388                    }),
389                },
390            ),
391        ]);
392
393        assert!(
394            parse_compact_params(
395                &["table", "strict_window", "abc"]
396                    .into_iter()
397                    .map(ValueRef::String)
398                    .collect::<Vec<_>>(),
399                &QueryContext::arc(),
400            )
401            .is_err()
402        );
403
404        assert!(
405            parse_compact_params(
406                &["a.b.table", "strict_window", "abc"]
407                    .into_iter()
408                    .map(ValueRef::String)
409                    .collect::<Vec<_>>(),
410                &QueryContext::arc(),
411            )
412            .is_err()
413        );
414    }
415}