common_function/admin/
reconcile_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::reconcile_request::Target;
16use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy};
17use arrow::datatypes::DataType as ArrowDataType;
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22    MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu,
23};
24use common_telemetry::info;
25use datafusion_expr::{Signature, TypeSignature, Volatility};
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::ResultExt;
30
31use crate::handlers::ProcedureServiceHandlerRef;
32use crate::helper::parse_resolve_strategy;
33
34const FN_NAME: &str = "reconcile_table";
35
36/// A function to reconcile a table.
37/// Returns the procedure id if success.
38///
39/// - `reconcile_table(table_name)`.
40/// - `reconcile_table(table_name, resolve_strategy)`.
41///
42/// The parameters:
43/// - `table_name`:  the table name
44#[admin_fn(
45    name = ReconcileTableFunction,
46    display_name = reconcile_table,
47    sig_fn = signature,
48    ret = string
49)]
50pub(crate) async fn reconcile_table(
51    procedure_service_handler: &ProcedureServiceHandlerRef,
52    query_ctx: &QueryContextRef,
53    params: &[ValueRef<'_>],
54) -> Result<Value> {
55    let (table_name, resolve_strategy) = match params {
56        [ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest),
57        [
58            ValueRef::String(table_name),
59            ValueRef::String(resolve_strategy),
60        ] => (table_name, parse_resolve_strategy(resolve_strategy)?),
61        _ => {
62            return UnsupportedInputDataTypeSnafu {
63                function: FN_NAME,
64                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65            }
66            .fail();
67        }
68    };
69    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70        .map_err(BoxedError::new)
71        .context(TableMutationSnafu)?;
72    info!(
73        "Reconciling table: {} with resolve_strategy: {:?}",
74        format_full_table_name(&catalog_name, &schema_name, &table_name),
75        resolve_strategy
76    );
77    let pid = procedure_service_handler
78        .reconcile(ReconcileRequest {
79            target: Some(Target::ReconcileTable(ReconcileTable {
80                catalog_name,
81                schema_name,
82                table_name,
83                resolve_strategy: resolve_strategy as i32,
84            })),
85            ..Default::default()
86        })
87        .await?;
88    match pid {
89        Some(pid) => Ok(Value::from(pid)),
90        None => Ok(Value::Null),
91    }
92}
93
94fn signature() -> Signature {
95    Signature::one_of(
96        vec![
97            // reconcile_table(table_name)
98            TypeSignature::Exact(vec![ArrowDataType::Utf8]),
99            // reconcile_table(table_name, resolve_strategy)
100            TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]),
101        ],
102        Volatility::Immutable,
103    )
104}
105
106#[cfg(test)]
107mod tests {
108    use std::sync::Arc;
109
110    use arrow::array::StringArray;
111    use arrow::datatypes::{DataType, Field};
112    use datafusion_expr::ColumnarValue;
113
114    use crate::admin::reconcile_table::ReconcileTableFunction;
115    use crate::function::FunctionContext;
116    use crate::function_factory::ScalarFunctionFactory;
117
118    #[tokio::test]
119    async fn test_reconcile_table() {
120        common_telemetry::init_default_ut_logging();
121
122        // reconcile_table(table_name)
123        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
124        let provider = factory.provide(FunctionContext::mock());
125        let f = provider.as_async().unwrap();
126
127        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
128            args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![
129                "test",
130            ])))],
131            arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))],
132            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
133            number_rows: 1,
134            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
135        };
136        let result = f.invoke_async_with_args(func_args).await.unwrap();
137        match result {
138            ColumnarValue::Array(array) => {
139                let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
140                assert_eq!(result_array.value(0), "test_pid");
141            }
142            ColumnarValue::Scalar(scalar) => {
143                assert_eq!(
144                    scalar,
145                    datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
146                );
147            }
148        }
149
150        // reconcile_table(table_name, resolve_strategy)
151        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
152        let provider = factory.provide(FunctionContext::mock());
153        let f = provider.as_async().unwrap();
154
155        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
156            args: vec![
157                ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
158                ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
159            ],
160            arg_fields: vec![
161                Arc::new(Field::new("arg_0", DataType::Utf8, false)),
162                Arc::new(Field::new("arg_1", DataType::Utf8, false)),
163            ],
164            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
165            number_rows: 1,
166            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
167        };
168        let result = f.invoke_async_with_args(func_args).await.unwrap();
169        match result {
170            ColumnarValue::Array(array) => {
171                let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
172                assert_eq!(result_array.value(0), "test_pid");
173            }
174            ColumnarValue::Scalar(scalar) => {
175                assert_eq!(
176                    scalar,
177                    datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
178                );
179            }
180        }
181
182        // unsupported input data type
183        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
184        let provider = factory.provide(FunctionContext::mock());
185        let f = provider.as_async().unwrap();
186
187        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
188            args: vec![
189                ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
190                ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
191                ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))),
192            ],
193            arg_fields: vec![
194                Arc::new(Field::new("arg_0", DataType::Utf8, false)),
195                Arc::new(Field::new("arg_1", DataType::Utf8, false)),
196                Arc::new(Field::new("arg_2", DataType::Utf8, false)),
197            ],
198            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
199            number_rows: 1,
200            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
201        };
202        let _err = f.invoke_async_with_args(func_args).await.unwrap_err();
203        // Note: Error type is DataFusionError at this level, not common_query::Error
204    }
205}