common_function/admin/
flush_compact_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_macro::admin_fn;
16use common_query::error::{
17    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use common_query::prelude::{Signature, Volatility};
20use datatypes::prelude::*;
21use session::context::QueryContextRef;
22use snafu::ensure;
23use store_api::storage::RegionId;
24
25use crate::handlers::TableMutationHandlerRef;
26use crate::helper::cast_u64;
27
28macro_rules! define_region_function {
29    ($name: expr, $display_name_str: expr, $display_name: ident) => {
30        /// A function to $display_name
31        #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
32        pub(crate) async fn $display_name(
33            table_mutation_handler: &TableMutationHandlerRef,
34            query_ctx: &QueryContextRef,
35            params: &[ValueRef<'_>],
36        ) -> Result<Value> {
37            ensure!(
38                params.len() == 1,
39                InvalidFuncArgsSnafu {
40                    err_msg: format!(
41                        "The length of the args is not correct, expect 1, have: {}",
42                        params.len()
43                    ),
44                }
45            );
46
47            let Some(region_id) = cast_u64(&params[0])? else {
48                return UnsupportedInputDataTypeSnafu {
49                    function: stringify!($display_name_str),
50                    datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
51                }
52                .fail();
53            };
54
55            let affected_rows = table_mutation_handler
56                .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
57                .await?;
58
59            Ok(Value::from(affected_rows as u64))
60        }
61    };
62}
63
64define_region_function!(FlushRegionFunction, flush_region, flush_region);
65
66define_region_function!(CompactRegionFunction, compact_region, compact_region);
67
68fn signature() -> Signature {
69    Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
70}
71
72#[cfg(test)]
73mod tests {
74    use std::sync::Arc;
75
76    use common_query::prelude::TypeSignature;
77    use datatypes::vectors::UInt64Vector;
78
79    use super::*;
80    use crate::function::{AsyncFunction, FunctionContext};
81
82    macro_rules! define_region_function_test {
83        ($name: ident, $func: ident) => {
84            paste::paste! {
85                #[test]
86                fn [<test_ $name _misc>]() {
87                    let f = $func;
88                    assert_eq!(stringify!($name), f.name());
89                    assert_eq!(
90                        ConcreteDataType::uint64_datatype(),
91                        f.return_type(&[]).unwrap()
92                    );
93                    assert!(matches!(f.signature(),
94                                     Signature {
95                                         type_signature: TypeSignature::Uniform(1, valid_types),
96                                         volatility: Volatility::Immutable
97                                     } if valid_types == ConcreteDataType::numerics()));
98                }
99
100                #[tokio::test]
101                async fn [<test_ $name _missing_table_mutation>]() {
102                    let f = $func;
103
104                    let args = vec![99];
105
106                    let args = args
107                        .into_iter()
108                        .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
109                        .collect::<Vec<_>>();
110
111                    let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
112                    assert_eq!(
113                        "Missing TableMutationHandler, not expected",
114                        result.to_string()
115                    );
116                }
117
118                #[tokio::test]
119                async fn [<test_ $name>]() {
120                    let f = $func;
121
122
123                    let args = vec![99];
124
125                    let args = args
126                        .into_iter()
127                        .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
128                        .collect::<Vec<_>>();
129
130                    let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
131
132                    let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
133                    assert_eq!(expect, result);
134                }
135            }
136        };
137    }
138
139    define_region_function_test!(flush_region, FlushRegionFunction);
140
141    define_region_function_test!(compact_region, CompactRegionFunction);
142}