common_function/admin/
flush_compact_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_macro::admin_fn;
16use common_query::error::{
17    InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
18};
19use datafusion_expr::{Signature, Volatility};
20use datatypes::data_type::DataType;
21use datatypes::prelude::*;
22use session::context::QueryContextRef;
23use snafu::ensure;
24use store_api::storage::RegionId;
25
26use crate::handlers::TableMutationHandlerRef;
27use crate::helper::cast_u64;
28
29macro_rules! define_region_function {
30    ($name: expr, $display_name_str: expr, $display_name: ident) => {
31        /// A function to $display_name
32        #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = signature, ret = uint64)]
33        pub(crate) async fn $display_name(
34            table_mutation_handler: &TableMutationHandlerRef,
35            query_ctx: &QueryContextRef,
36            params: &[ValueRef<'_>],
37        ) -> Result<Value> {
38            ensure!(
39                params.len() == 1,
40                InvalidFuncArgsSnafu {
41                    err_msg: format!(
42                        "The length of the args is not correct, expect 1, have: {}",
43                        params.len()
44                    ),
45                }
46            );
47
48            let Some(region_id) = cast_u64(&params[0])? else {
49                return UnsupportedInputDataTypeSnafu {
50                    function: stringify!($display_name_str),
51                    datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
52                }
53                .fail();
54            };
55
56            let affected_rows = table_mutation_handler
57                .$display_name(RegionId::from_u64(region_id), query_ctx.clone())
58                .await?;
59
60            Ok(Value::from(affected_rows as u64))
61        }
62    };
63}
64
65define_region_function!(FlushRegionFunction, flush_region, flush_region);
66
67define_region_function!(CompactRegionFunction, compact_region, compact_region);
68
69fn signature() -> Signature {
70    Signature::uniform(
71        1,
72        ConcreteDataType::numerics()
73            .into_iter()
74            .map(|dt| dt.as_arrow_type())
75            .collect(),
76        Volatility::Immutable,
77    )
78}
79
80#[cfg(test)]
81mod tests {
82    use std::sync::Arc;
83
84    use arrow::array::UInt64Array;
85    use arrow::datatypes::{DataType, Field};
86    use datafusion_expr::ColumnarValue;
87
88    use super::*;
89    use crate::function::FunctionContext;
90    use crate::function_factory::ScalarFunctionFactory;
91
92    macro_rules! define_region_function_test {
93        ($name: ident, $func: ident) => {
94            paste::paste! {
95                #[test]
96                fn [<test_ $name _misc>]() {
97                    let factory: ScalarFunctionFactory = $func::factory().into();
98                    let f = factory.provide(FunctionContext::mock());
99                    assert_eq!(stringify!($name), f.name());
100                    assert_eq!(
101                        DataType::UInt64,
102                        f.return_type(&[]).unwrap()
103                    );
104                    assert!(matches!(f.signature(),
105                                     datafusion_expr::Signature {
106                                         type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
107                                         volatility: datafusion_expr::Volatility::Immutable,
108                                         ..
109                                     } if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
110                }
111
112                #[tokio::test]
113                async fn [<test_ $name _missing_table_mutation>]() {
114                    let factory: ScalarFunctionFactory = $func::factory().into();
115                    let provider = factory.provide(FunctionContext::default());
116                    let f = provider.as_async().unwrap();
117
118                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
119                        args: vec![
120                            ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
121                        ],
122                        arg_fields: vec![
123                            Arc::new(Field::new("arg_0", DataType::UInt64, false)),
124                        ],
125                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
126                        number_rows: 1,
127                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
128                    };
129                    let result = f.invoke_async_with_args(func_args).await.unwrap_err();
130                    assert_eq!(
131                        "Execution error: Handler error: Missing TableMutationHandler, not expected",
132                        result.to_string()
133                    );
134                }
135
136                #[tokio::test]
137                async fn [<test_ $name>]() {
138                    let factory: ScalarFunctionFactory = $func::factory().into();
139                    let provider = factory.provide(FunctionContext::mock());
140                    let f = provider.as_async().unwrap();
141
142                    let func_args = datafusion::logical_expr::ScalarFunctionArgs {
143                        args: vec![
144                            ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))),
145                        ],
146                        arg_fields: vec![
147                            Arc::new(Field::new("arg_0", DataType::UInt64, false)),
148                        ],
149                        return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
150                        number_rows: 1,
151                        config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
152                    };
153                    let result = f.invoke_async_with_args(func_args).await.unwrap();
154
155                    match result {
156                        ColumnarValue::Array(array) => {
157                            let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
158                            assert_eq!(result_array.value(0), 42u64);
159                        }
160                        ColumnarValue::Scalar(scalar) => {
161                            assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
162                        }
163                    }
164                }
165            }
166        };
167    }
168
169    define_region_function_test!(flush_region, FlushRegionFunction);
170
171    define_region_function_test!(compact_region, CompactRegionFunction);
172}