common_function/admin/
reconcile_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::reconcile_request::Target;
16use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy};
17use arrow::datatypes::DataType as ArrowDataType;
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22    MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu,
23};
24use common_telemetry::info;
25use datafusion_expr::{Signature, TypeSignature, Volatility};
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::ResultExt;
30
31use crate::handlers::ProcedureServiceHandlerRef;
32use crate::helper::parse_resolve_strategy;
33
34const FN_NAME: &str = "reconcile_table";
35
36/// A function to reconcile a table.
37/// Returns the procedure id if success.
38///
39/// - `reconcile_table(table_name)`.
40/// - `reconcile_table(table_name, resolve_strategy)`.
41///
42/// The parameters:
43/// - `table_name`:  the table name
44#[admin_fn(
45    name = ReconcileTableFunction,
46    display_name = reconcile_table,
47    sig_fn = signature,
48    ret = string
49)]
50pub(crate) async fn reconcile_table(
51    procedure_service_handler: &ProcedureServiceHandlerRef,
52    query_ctx: &QueryContextRef,
53    params: &[ValueRef<'_>],
54) -> Result<Value> {
55    let (table_name, resolve_strategy) = match params {
56        [ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest),
57        [ValueRef::String(table_name), ValueRef::String(resolve_strategy)] => {
58            (table_name, parse_resolve_strategy(resolve_strategy)?)
59        }
60        _ => {
61            return UnsupportedInputDataTypeSnafu {
62                function: FN_NAME,
63                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
64            }
65            .fail()
66        }
67    };
68    let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
69        .map_err(BoxedError::new)
70        .context(TableMutationSnafu)?;
71    info!(
72        "Reconciling table: {} with resolve_strategy: {:?}",
73        format_full_table_name(&catalog_name, &schema_name, &table_name),
74        resolve_strategy
75    );
76    let pid = procedure_service_handler
77        .reconcile(ReconcileRequest {
78            target: Some(Target::ReconcileTable(ReconcileTable {
79                catalog_name,
80                schema_name,
81                table_name,
82                resolve_strategy: resolve_strategy as i32,
83            })),
84            ..Default::default()
85        })
86        .await?;
87    match pid {
88        Some(pid) => Ok(Value::from(pid)),
89        None => Ok(Value::Null),
90    }
91}
92
93fn signature() -> Signature {
94    Signature::one_of(
95        vec![
96            // reconcile_table(table_name)
97            TypeSignature::Exact(vec![ArrowDataType::Utf8]),
98            // reconcile_table(table_name, resolve_strategy)
99            TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]),
100        ],
101        Volatility::Immutable,
102    )
103}
104
105#[cfg(test)]
106mod tests {
107    use std::sync::Arc;
108
109    use arrow::array::StringArray;
110    use arrow::datatypes::{DataType, Field};
111    use datafusion_expr::ColumnarValue;
112
113    use crate::admin::reconcile_table::ReconcileTableFunction;
114    use crate::function::FunctionContext;
115    use crate::function_factory::ScalarFunctionFactory;
116
117    #[tokio::test]
118    async fn test_reconcile_table() {
119        common_telemetry::init_default_ut_logging();
120
121        // reconcile_table(table_name)
122        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
123        let provider = factory.provide(FunctionContext::mock());
124        let f = provider.as_async().unwrap();
125
126        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
127            args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![
128                "test",
129            ])))],
130            arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))],
131            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
132            number_rows: 1,
133            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
134        };
135        let result = f.invoke_async_with_args(func_args).await.unwrap();
136        match result {
137            ColumnarValue::Array(array) => {
138                let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
139                assert_eq!(result_array.value(0), "test_pid");
140            }
141            ColumnarValue::Scalar(scalar) => {
142                assert_eq!(
143                    scalar,
144                    datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
145                );
146            }
147        }
148
149        // reconcile_table(table_name, resolve_strategy)
150        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
151        let provider = factory.provide(FunctionContext::mock());
152        let f = provider.as_async().unwrap();
153
154        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
155            args: vec![
156                ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
157                ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
158            ],
159            arg_fields: vec![
160                Arc::new(Field::new("arg_0", DataType::Utf8, false)),
161                Arc::new(Field::new("arg_1", DataType::Utf8, false)),
162            ],
163            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
164            number_rows: 1,
165            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
166        };
167        let result = f.invoke_async_with_args(func_args).await.unwrap();
168        match result {
169            ColumnarValue::Array(array) => {
170                let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
171                assert_eq!(result_array.value(0), "test_pid");
172            }
173            ColumnarValue::Scalar(scalar) => {
174                assert_eq!(
175                    scalar,
176                    datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
177                );
178            }
179        }
180
181        // unsupported input data type
182        let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
183        let provider = factory.provide(FunctionContext::mock());
184        let f = provider.as_async().unwrap();
185
186        let func_args = datafusion::logical_expr::ScalarFunctionArgs {
187            args: vec![
188                ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
189                ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
190                ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))),
191            ],
192            arg_fields: vec![
193                Arc::new(Field::new("arg_0", DataType::Utf8, false)),
194                Arc::new(Field::new("arg_1", DataType::Utf8, false)),
195                Arc::new(Field::new("arg_2", DataType::Utf8, false)),
196            ],
197            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
198            number_rows: 1,
199            config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
200        };
201        let _err = f.invoke_async_with_args(func_args).await.unwrap_err();
202        // Note: Error type is DataFusionError at this level, not common_query::Error
203    }
204}