common_function/admin/
flush_compact_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_macro::admin_fn;
16use common_query::error::{
17    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, Volatility};
20use datatypes::data_type::DataType;
21use datatypes::prelude::*;
22use session::context::QueryContextRef;
23use snafu::ensure;
24use store_api::storage::RegionId;
25
26use crate::handlers::TableMutationHandlerRef;
27use crate::helper::cast_u64;
28
29macro_rules! define_region_function {
30    ($name: expr, $display_name_str: expr, $display_name: ident) => {
31        /// A function to $display_name
32        #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
33        pub(crate) async fn $display_name(
34            table_mutation_handler: &TableMutationHandlerRef,
35            query_ctx: &QueryContextRef,
36            params: &[ValueRef<'_>],
37        ) -> Result<Value> {
38            ensure!(
39                params.len() == 1,
40                InvalidFuncArgsSnafu {
41                    err_msg: format!(
42                        "The length of the args is not correct, expect 1, have: {}",
43                        params.len()
44                    ),
45                }
46            );
47
48            let Some(region_id) = cast_u64(&params[0])? else {
49                return UnsupportedInputDataTypeSnafu {
50                    function: stringify!($display_name_str),
51                    datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
52                }
53                .fail();
54            };
55
56            let affected_rows = table_mutation_handler
57                .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
58                .await?;
59
60            Ok(Value::from(affected_rows as u64))
61        }
62    };
63}
64
65define_region_function!(FlushRegionFunction, flush_region, flush_region);
66
67define_region_function!(CompactRegionFunction, compact_region, compact_region);
68
69fn signature() -> Signature {
70    Signature::uniform(
71        1,
72        ConcreteDataType::numerics()
73            .into_iter()
74            .map(|dt| dt.as_arrow_type())
75            .collect(),
76        Volatility::Immutable,
77    )
78}
79
80#[cfg(test)]
81mod tests {
82    use std::sync::Arc;
83
84    use arrow::array::UInt64Array;
85    use arrow::datatypes::{DataType, Field};
86    use datafusion_expr::ColumnarValue;
87
88    use super::*;
89    use crate::function::FunctionContext;
90    use crate::function_factory::ScalarFunctionFactory;
91
92    macro_rules! define_region_function_test {
93        ($name: ident, $func: ident) => {
94            paste::paste! {
95                #[test]
96                fn [<test_ $name _misc>]() {
97                    let factory: ScalarFunctionFactory = $func::factory().into();
98                    let f = factory.provide(FunctionContext::mock());
99                    assert_eq!(stringify!($name), f.name());
100                    assert_eq!(
101                        DataType::UInt64,
102                        f.return_type(&[]).unwrap()
103                    );
104                    assert!(matches!(f.signature(),
105                                     datafusion_expr::Signature {
106                                         type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
107                                         volatility: datafusion_expr::Volatility::Immutable
108                                     } if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
109                }
110
111                #[tokio::test]
112                async fn [<test_ $name _missing_table_mutation>]() {
113                    let factory: ScalarFunctionFactory = $func::factory().into();
114                    let provider = factory.provide(FunctionContext::default());
115                    let f = provider.as_async().unwrap();
116
117                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
118                        args: vec![
119                            ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
120                        ],
121                        arg_fields: vec![
122                            Arc::new(Field::new("arg_0", DataType::UInt64, false)),
123                        ],
124                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
125                        number_rows: 1,
126                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
127                    };
128                    let result = f.invoke_async_with_args(func_args).await.unwrap_err();
129                    assert_eq!(
130                        "Execution error: Handler error: Missing TableMutationHandler, not expected",
131                        result.to_string()
132                    );
133                }
134
135                #[tokio::test]
136                async fn [<test_ $name>]() {
137                    let factory: ScalarFunctionFactory = $func::factory().into();
138                    let provider = factory.provide(FunctionContext::mock());
139                    let f = provider.as_async().unwrap();
140
141                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
142                        args: vec![
143                            ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
144                        ],
145                        arg_fields: vec![
146                            Arc::new(Field::new("arg_0", DataType::UInt64, false)),
147                        ],
148                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
149                        number_rows: 1,
150                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
151                    };
152                    let result = f.invoke_async_with_args(func_args).await.unwrap();
153
154                    match result {
155                        ColumnarValue::Array(array) => {
156                            let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
157                            assert_eq!(result_array.value(0), 42u64);
158                        }
159                        ColumnarValue::Scalar(scalar) => {
160                            assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
161                        }
162                    }
163                }
164            }
165        };
166    }
167
168    define_region_function_test!(flush_region, FlushRegionFunction);
169
170    define_region_function_test!(compact_region, CompactRegionFunction);
171}