common_function/admin/
flush_compact_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use api::v1::region::{compact_request, StrictWindow};
18use common_error::ext::BoxedError;
19use common_macro::admin_fn;
20use common_query::error::{
21    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
22    UnsupportedInputDataTypeSnafu,
23};
24use common_query::prelude::{Signature, Volatility};
25use common_telemetry::info;
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::{ensure, ResultExt};
30use table::requests::{CompactTableRequest, FlushTableRequest};
31
32use crate::handlers::TableMutationHandlerRef;
33
34/// Compact type: strict window.
35const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
36/// Compact type: strict window (short name).
37const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
38
39#[admin_fn(
40    name = FlushTableFunction,
41    display_name = flush_table,
42    sig_fn = flush_signature,
43    ret = uint64
44)]
45pub(crate) async fn flush_table(
46    table_mutation_handler: &TableMutationHandlerRef,
47    query_ctx: &QueryContextRef,
48    params: &[ValueRef<'_>],
49) -> Result<Value> {
50    ensure!(
51        params.len() == 1,
52        InvalidFuncArgsSnafu {
53            err_msg: format!(
54                "The length of the args is not correct, expect 1, have: {}",
55                params.len()
56            ),
57        }
58    );
59
60    let ValueRef::String(table_name) = params[0] else {
61        return UnsupportedInputDataTypeSnafu {
62            function: "flush_table",
63            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
64        }
65        .fail();
66    };
67
68    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
69        .map_err(BoxedError::new)
70        .context(TableMutationSnafu)?;
71
72    let affected_rows = table_mutation_handler
73        .flush(
74            FlushTableRequest {
75                catalog_name,
76                schema_name,
77                table_name,
78            },
79            query_ctx.clone(),
80        )
81        .await?;
82
83    Ok(Value::from(affected_rows as u64))
84}
85
86#[admin_fn(
87    name = CompactTableFunction,
88    display_name = compact_table,
89    sig_fn = compact_signature,
90    ret = uint64
91)]
92pub(crate) async fn compact_table(
93    table_mutation_handler: &TableMutationHandlerRef,
94    query_ctx: &QueryContextRef,
95    params: &[ValueRef<'_>],
96) -> Result<Value> {
97    let request = parse_compact_params(params, query_ctx)?;
98    info!("Compact table request: {:?}", request);
99
100    let affected_rows = table_mutation_handler
101        .compact(request, query_ctx.clone())
102        .await?;
103
104    Ok(Value::from(affected_rows as u64))
105}
106
107fn flush_signature() -> Signature {
108    Signature::uniform(
109        1,
110        vec![ConcreteDataType::string_datatype()],
111        Volatility::Immutable,
112    )
113}
114
115fn compact_signature() -> Signature {
116    Signature::variadic(
117        vec![ConcreteDataType::string_datatype()],
118        Volatility::Immutable,
119    )
120}
121
122/// Parses `compact_table` UDF parameters. This function accepts following combinations:
123/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
124/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
125/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
126fn parse_compact_params(
127    params: &[ValueRef<'_>],
128    query_ctx: &QueryContextRef,
129) -> Result<CompactTableRequest> {
130    ensure!(
131        !params.is_empty(),
132        InvalidFuncArgsSnafu {
133            err_msg: "Args cannot be empty",
134        }
135    );
136
137    let (table_name, compact_type) = match params {
138        [ValueRef::String(table_name)] => (
139            table_name,
140            compact_request::Options::Regular(Default::default()),
141        ),
142        [ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
143            let compact_type = parse_compact_type(compact_ty_str, None)?;
144            (table_name, compact_type)
145        }
146
147        [ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
148        {
149            let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
150            (table_name, compact_type)
151        }
152        _ => {
153            return UnsupportedInputDataTypeSnafu {
154                function: "compact_table",
155                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
156            }
157            .fail()
158        }
159    };
160
161    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
162        .map_err(BoxedError::new)
163        .context(TableMutationSnafu)?;
164
165    Ok(CompactTableRequest {
166        catalog_name,
167        schema_name,
168        table_name,
169        compact_options: compact_type,
170    })
171}
172
173/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose,
174/// otherwise choose regular (TWCS) compaction.
175fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
176    if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
177        | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
178    {
179        let window_seconds = option
180            .map(|v| {
181                i64::from_str(v).map_err(|_| {
182                    InvalidFuncArgsSnafu {
183                        err_msg: format!(
184                            "Compact window is expected to be a valid number, provided: {}",
185                            v
186                        ),
187                    }
188                    .build()
189                })
190            })
191            .transpose()?
192            .unwrap_or(0);
193
194        Ok(compact_request::Options::StrictWindow(StrictWindow {
195            window_seconds,
196        }))
197    } else {
198        Ok(compact_request::Options::Regular(Default::default()))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::sync::Arc;
205
206    use api::v1::region::compact_request::Options;
207    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
208    use common_query::prelude::TypeSignature;
209    use datatypes::vectors::{StringVector, UInt64Vector};
210    use session::context::QueryContext;
211
212    use super::*;
213    use crate::function::{AsyncFunction, FunctionContext};
214
215    macro_rules! define_table_function_test {
216        ($name: ident, $func: ident) => {
217            paste::paste!{
218                #[test]
219                fn [<test_ $name _misc>]() {
220                    let f = $func;
221                    assert_eq!(stringify!($name), f.name());
222                    assert_eq!(
223                        ConcreteDataType::uint64_datatype(),
224                        f.return_type(&[]).unwrap()
225                    );
226                    assert!(matches!(f.signature(),
227                                     Signature {
228                                         type_signature: TypeSignature::Uniform(1, valid_types),
229                                         volatility: Volatility::Immutable
230                                     } if valid_types == vec![ConcreteDataType::string_datatype()]));
231                }
232
233                #[tokio::test]
234                async fn [<test_ $name _missing_table_mutation>]() {
235                    let f = $func;
236
237                    let args = vec!["test"];
238
239                    let args = args
240                        .into_iter()
241                        .map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
242                        .collect::<Vec<_>>();
243
244                    let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
245                    assert_eq!(
246                        "Missing TableMutationHandler, not expected",
247                        result.to_string()
248                    );
249                }
250
251                #[tokio::test]
252                async fn [<test_ $name>]() {
253                    let f = $func;
254
255
256                    let args = vec!["test"];
257
258                    let args = args
259                        .into_iter()
260                        .map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
261                        .collect::<Vec<_>>();
262
263                    let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
264
265                    let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
266                    assert_eq!(expect, result);
267                }
268            }
269        }
270    }
271
272    define_table_function_test!(flush_table, FlushTableFunction);
273
274    fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
275        for (params, expected) in cases {
276            let params = params
277                .iter()
278                .map(|s| ValueRef::String(s))
279                .collect::<Vec<_>>();
280
281            assert_eq!(
282                expected,
283                &parse_compact_params(&params, &QueryContext::arc()).unwrap()
284            );
285        }
286    }
287
288    #[test]
289    fn test_parse_compact_params() {
290        check_parse_compact_params(&[
291            (
292                &["table"],
293                CompactTableRequest {
294                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
295                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
296                    table_name: "table".to_string(),
297                    compact_options: Options::Regular(Default::default()),
298                },
299            ),
300            (
301                &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
302                CompactTableRequest {
303                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
304                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
305                    table_name: "table".to_string(),
306                    compact_options: Options::Regular(Default::default()),
307                },
308            ),
309            (
310                &[&format!(
311                    "{}.{}.table",
312                    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
313                )],
314                CompactTableRequest {
315                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
316                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
317                    table_name: "table".to_string(),
318                    compact_options: Options::Regular(Default::default()),
319                },
320            ),
321            (
322                &["table", "regular"],
323                CompactTableRequest {
324                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
325                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
326                    table_name: "table".to_string(),
327                    compact_options: Options::Regular(Default::default()),
328                },
329            ),
330            (
331                &["table", "strict_window"],
332                CompactTableRequest {
333                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
334                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
335                    table_name: "table".to_string(),
336                    compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
337                },
338            ),
339            (
340                &["table", "strict_window", "3600"],
341                CompactTableRequest {
342                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
343                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
344                    table_name: "table".to_string(),
345                    compact_options: Options::StrictWindow(StrictWindow {
346                        window_seconds: 3600,
347                    }),
348                },
349            ),
350            (
351                &["table", "regular", "abcd"],
352                CompactTableRequest {
353                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
354                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
355                    table_name: "table".to_string(),
356                    compact_options: Options::Regular(Default::default()),
357                },
358            ),
359            (
360                &["table", "swcs", "120"],
361                CompactTableRequest {
362                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
363                    schema_name: DEFAULT_SCHEMA_NAME.to_string(),
364                    table_name: "table".to_string(),
365                    compact_options: Options::StrictWindow(StrictWindow {
366                        window_seconds: 120,
367                    }),
368                },
369            ),
370        ]);
371
372        assert!(parse_compact_params(
373            &["table", "strict_window", "abc"]
374                .into_iter()
375                .map(ValueRef::String)
376                .collect::<Vec<_>>(),
377            &QueryContext::arc(),
378        )
379        .is_err());
380
381        assert!(parse_compact_params(
382            &["a.b.table", "strict_window", "abc"]
383                .into_iter()
384                .map(ValueRef::String)
385                .collect::<Vec<_>>(),
386            &QueryContext::arc(),
387        )
388        .is_err());
389    }
390}