common_function/admin/
flush_compact_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use api::v1::region::{compact_request, StrictWindow};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23    UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ensure, ResultExt};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35/// Compact type: strict window.
36const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37/// Compact type: strict window (short name).
38const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40#[admin_fn(
41    name = FlushTableFunction,
42    display_name = flush_table,
43    sig_fn = flush_signature,
44    ret = uint64
45)]
46pub(crate) async fn flush_table(
47    table_mutation_handler: &TableMutationHandlerRef,
48    query_ctx: &QueryContextRef,
49    params: &[ValueRef<'_>],
50) -> Result<Value> {
51    ensure!(
52        params.len() == 1,
53        InvalidFuncArgsSnafu {
54            err_msg: format!(
55                "The length of the args is not correct, expect 1, have: {}",
56                params.len()
57            ),
58        }
59    );
60
61    let ValueRef::String(table_name) = params[0] else {
62        return UnsupportedInputDataTypeSnafu {
63            function: "flush_table",
64            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65        }
66        .fail();
67    };
68
69    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70        .map_err(BoxedError::new)
71        .context(TableMutationSnafu)?;
72
73    let affected_rows = table_mutation_handler
74        .flush(
75            FlushTableRequest {
76                catalog_name,
77                schema_name,
78                table_name,
79            },
80            query_ctx.clone(),
81        )
82        .await?;
83
84    Ok(Value::from(affected_rows as u64))
85}
86
87#[admin_fn(
88    name = CompactTableFunction,
89    display_name = compact_table,
90    sig_fn = compact_signature,
91    ret = uint64
92)]
93pub(crate) async fn compact_table(
94    table_mutation_handler: &TableMutationHandlerRef,
95    query_ctx: &QueryContextRef,
96    params: &[ValueRef<'_>],
97) -> Result<Value> {
98    let request = parse_compact_params(params, query_ctx)?;
99    info!("Compact table request: {:?}", request);
100
101    let affected_rows = table_mutation_handler
102        .compact(request, query_ctx.clone())
103        .await?;
104
105    Ok(Value::from(affected_rows as u64))
106}
107
108fn flush_signature() -> Signature {
109    Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
110}
111
112fn compact_signature() -> Signature {
113    Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
114}
115
116/// Parses `compact_table` UDF parameters. This function accepts following combinations:
117/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
118/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
119/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
120fn parse_compact_params(
121    params: &[ValueRef<'_>],
122    query_ctx: &QueryContextRef,
123) -> Result<CompactTableRequest> {
124    ensure!(
125        !params.is_empty(),
126        InvalidFuncArgsSnafu {
127            err_msg: "Args cannot be empty",
128        }
129    );
130
131    let (table_name, compact_type) = match params {
132        [ValueRef::String(table_name)] => (
133            table_name,
134            compact_request::Options::Regular(Default::default()),
135        ),
136        [ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
137            let compact_type = parse_compact_type(compact_ty_str, None)?;
138            (table_name, compact_type)
139        }
140
141        [ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
142        {
143            let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
144            (table_name, compact_type)
145        }
146        _ => {
147            return UnsupportedInputDataTypeSnafu {
148                function: "compact_table",
149                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
150            }
151            .fail()
152        }
153    };
154
155    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
156        .map_err(BoxedError::new)
157        .context(TableMutationSnafu)?;
158
159    Ok(CompactTableRequest {
160        catalog_name,
161        schema_name,
162        table_name,
163        compact_options: compact_type,
164    })
165}
166
167/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose,
168/// otherwise choose regular (TWCS) compaction.
169fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
170    if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
171        | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
172    {
173        let window_seconds = option
174            .map(|v| {
175                i64::from_str(v).map_err(|_| {
176                    InvalidFuncArgsSnafu {
177                        err_msg: format!(
178                            "Compact window is expected to be a valid number, provided: {}",
179                            v
180                        ),
181                    }
182                    .build()
183                })
184            })
185            .transpose()?
186            .unwrap_or(0);
187
188        Ok(compact_request::Options::StrictWindow(StrictWindow {
189            window_seconds,
190        }))
191    } else {
192        Ok(compact_request::Options::Regular(Default::default()))
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::sync::Arc;
199
200    use api::v1::region::compact_request::Options;
201    use arrow::array::StringArray;
202    use arrow::datatypes::{DataType, Field};
203    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
204    use datafusion_expr::ColumnarValue;
205    use session::context::QueryContext;
206
207    use super::*;
208    use crate::function::FunctionContext;
209    use crate::function_factory::ScalarFunctionFactory;
210
211    macro_rules! define_table_function_test {
212        ($name: ident, $func: ident) => {
213            paste::paste!{
214                #[test]
215                fn [<test_ $name _misc>]() {
216                    let factory: ScalarFunctionFactory = $func::factory().into();
217                    let f = factory.provide(FunctionContext::mock());
218                    assert_eq!(stringify!($name), f.name());
219                    assert_eq!(
220                        DataType::UInt64,
221                        f.return_type(&[]).unwrap()
222                    );
223                    assert!(matches!(f.signature(),
224                                     datafusion_expr::Signature {
225                                         type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
226                                         volatility: datafusion_expr::Volatility::Immutable
227                                     } if valid_types == &vec![ArrowDataType::Utf8]));
228                }
229
230                #[tokio::test]
231                async fn [<test_ $name _missing_table_mutation>]() {
232                    let factory: ScalarFunctionFactory = $func::factory().into();
233                    let provider = factory.provide(FunctionContext::default());
234                    let f = provider.as_async().unwrap();
235
236                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
237                        args: vec![
238                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
239                        ],
240                        arg_fields: vec![
241                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
242                        ],
243                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
244                        number_rows: 1,
245                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
246                    };
247                    let result = f.invoke_async_with_args(func_args).await.unwrap_err();
248                    assert_eq!(
249                        "Execution error: Handler error: Missing TableMutationHandler, not expected",
250                        result.to_string()
251                    );
252                }
253
254                #[tokio::test]
255                async fn [<test_ $name>]() {
256                    let factory: ScalarFunctionFactory = $func::factory().into();
257                    let provider = factory.provide(FunctionContext::mock());
258                    let f = provider.as_async().unwrap();
259
260                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
261                        args: vec![
262                            ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
263                        ],
264                        arg_fields: vec![
265                            Arc::new(Field::new("arg_0", DataType::Utf8, false)),
266                        ],
267                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
268                        number_rows: 1,
269                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
270                    };
271                    let result = f.invoke_async_with_args(func_args).await.unwrap();
272
273                    match result {
274                        ColumnarValue::Array(array) => {
275                            let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
276                            assert_eq!(result_array.value(0), 42u64);
277                        }
278                        ColumnarValue::Scalar(scalar) => {
279                            assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
280                        }
281                    }
282                }
283            }
284        }
285    }
286
287    define_table_function_test!(flush_table, FlushTableFunction);
288
289    fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
290        for (params, expected) in cases {
291            let params = params
292                .iter()
293                .map(|s| ValueRef::String(s))
294                .collect::<Vec<_>>();
295
296            assert_eq!(
297                expected,
298                &parse_compact_params(&params, &QueryContext::arc()).unwrap()
299            );
300        }
301    }
302
303    #[test]
304    fn test_parse_compact_params() {
305        check_parse_compact_params(&[
306            (
307                &["table"],
308                CompactTableRequest {
309                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
310                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
311                    table_name: "table".to_string(),
312                    compact_options: Options::Regular(Default::default()),
313                },
314            ),
315            (
316                &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
317                CompactTableRequest {
318                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
319                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
320                    table_name: "table".to_string(),
321                    compact_options: Options::Regular(Default::default()),
322                },
323            ),
324            (
325                &[&format!(
326                    "{}.{}.table",
327                    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
328                )],
329                CompactTableRequest {
330                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
331                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
332                    table_name: "table".to_string(),
333                    compact_options: Options::Regular(Default::default()),
334                },
335            ),
336            (
337                &["table", "regular"],
338                CompactTableRequest {
339                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
340                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
341                    table_name: "table".to_string(),
342                    compact_options: Options::Regular(Default::default()),
343                },
344            ),
345            (
346                &["table", "strict_window"],
347                CompactTableRequest {
348                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
349                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
350                    table_name: "table".to_string(),
351                    compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
352                },
353            ),
354            (
355                &["table", "strict_window", "3600"],
356                CompactTableRequest {
357                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
358                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
359                    table_name: "table".to_string(),
360                    compact_options: Options::StrictWindow(StrictWindow {
361                        window_seconds: 3600,
362                    }),
363                },
364            ),
365            (
366                &["table", "regular", "abcd"],
367                CompactTableRequest {
368                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
369                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
370                    table_name: "table".to_string(),
371                    compact_options: Options::Regular(Default::default()),
372                },
373            ),
374            (
375                &["table", "swcs", "120"],
376                CompactTableRequest {
377                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
378                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
379                    table_name: "table".to_string(),
380                    compact_options: Options::StrictWindow(StrictWindow {
381                        window_seconds: 120,
382                    }),
383                },
384            ),
385        ]);
386
387        assert!(parse_compact_params(
388            &["table", "strict_window", "abc"]
389                .into_iter()
390                .map(ValueRef::String)
391                .collect::<Vec<_>>(),
392            &QueryContext::arc(),
393        )
394        .is_err());
395
396        assert!(parse_compact_params(
397            &["a.b.table", "strict_window", "abc"]
398                .into_iter()
399                .map(ValueRef::String)
400                .collect::<Vec<_>>(),
401            &QueryContext::arc(),
402        )
403        .is_err());
404    }
405}