1use api::v1::meta::reconcile_request::Target;
16use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy};
17use arrow::datatypes::DataType as ArrowDataType;
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu,
23};
24use common_telemetry::info;
25use datafusion_expr::{Signature, TypeSignature, Volatility};
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::ResultExt;
30
31use crate::handlers::ProcedureServiceHandlerRef;
32use crate::helper::parse_resolve_strategy;
33
34const FN_NAME: &str = "reconcile_table";
35
36#[admin_fn(
45 name = ReconcileTableFunction,
46 display_name = reconcile_table,
47 sig_fn = signature,
48 ret = string
49)]
50pub(crate) async fn reconcile_table(
51 procedure_service_handler: &ProcedureServiceHandlerRef,
52 query_ctx: &QueryContextRef,
53 params: &[ValueRef<'_>],
54) -> Result<Value> {
55 let (table_name, resolve_strategy) = match params {
56 [ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest),
57 [
58 ValueRef::String(table_name),
59 ValueRef::String(resolve_strategy),
60 ] => (table_name, parse_resolve_strategy(resolve_strategy)?),
61 _ => {
62 return UnsupportedInputDataTypeSnafu {
63 function: FN_NAME,
64 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
65 }
66 .fail();
67 }
68 };
69 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
70 .map_err(BoxedError::new)
71 .context(TableMutationSnafu)?;
72 info!(
73 "Reconciling table: {} with resolve_strategy: {:?}",
74 format_full_table_name(&catalog_name, &schema_name, &table_name),
75 resolve_strategy
76 );
77 let pid = procedure_service_handler
78 .reconcile(ReconcileRequest {
79 target: Some(Target::ReconcileTable(ReconcileTable {
80 catalog_name,
81 schema_name,
82 table_name,
83 resolve_strategy: resolve_strategy as i32,
84 })),
85 ..Default::default()
86 })
87 .await?;
88 match pid {
89 Some(pid) => Ok(Value::from(pid)),
90 None => Ok(Value::Null),
91 }
92}
93
94fn signature() -> Signature {
95 Signature::one_of(
96 vec![
97 TypeSignature::Exact(vec![ArrowDataType::Utf8]),
99 TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]),
101 ],
102 Volatility::Immutable,
103 )
104}
105
106#[cfg(test)]
107mod tests {
108 use std::sync::Arc;
109
110 use arrow::array::StringArray;
111 use arrow::datatypes::{DataType, Field};
112 use datafusion_expr::ColumnarValue;
113
114 use crate::admin::reconcile_table::ReconcileTableFunction;
115 use crate::function::FunctionContext;
116 use crate::function_factory::ScalarFunctionFactory;
117
118 #[tokio::test]
119 async fn test_reconcile_table() {
120 common_telemetry::init_default_ut_logging();
121
122 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
124 let provider = factory.provide(FunctionContext::mock());
125 let f = provider.as_async().unwrap();
126
127 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
128 args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![
129 "test",
130 ])))],
131 arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))],
132 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
133 number_rows: 1,
134 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
135 };
136 let result = f.invoke_async_with_args(func_args).await.unwrap();
137 match result {
138 ColumnarValue::Array(array) => {
139 let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
140 assert_eq!(result_array.value(0), "test_pid");
141 }
142 ColumnarValue::Scalar(scalar) => {
143 assert_eq!(
144 scalar,
145 datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
146 );
147 }
148 }
149
150 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
152 let provider = factory.provide(FunctionContext::mock());
153 let f = provider.as_async().unwrap();
154
155 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
156 args: vec![
157 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
158 ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
159 ],
160 arg_fields: vec![
161 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
162 Arc::new(Field::new("arg_1", DataType::Utf8, false)),
163 ],
164 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
165 number_rows: 1,
166 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
167 };
168 let result = f.invoke_async_with_args(func_args).await.unwrap();
169 match result {
170 ColumnarValue::Array(array) => {
171 let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
172 assert_eq!(result_array.value(0), "test_pid");
173 }
174 ColumnarValue::Scalar(scalar) => {
175 assert_eq!(
176 scalar,
177 datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
178 );
179 }
180 }
181
182 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
184 let provider = factory.provide(FunctionContext::mock());
185 let f = provider.as_async().unwrap();
186
187 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
188 args: vec![
189 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
190 ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
191 ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))),
192 ],
193 arg_fields: vec![
194 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
195 Arc::new(Field::new("arg_1", DataType::Utf8, false)),
196 Arc::new(Field::new("arg_2", DataType::Utf8, false)),
197 ],
198 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
199 number_rows: 1,
200 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
201 };
202 let _err = f.invoke_async_with_args(func_args).await.unwrap_err();
203 }
205}