1use api::v1::meta::reconcile_request::Target;
16use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy};
17use arrow::datatypes::DataType as ArrowDataType;
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu,
23};
24use common_telemetry::info;
25use datafusion_expr::{Signature, TypeSignature, Volatility};
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::ResultExt;
30
31use crate::handlers::ProcedureServiceHandlerRef;
32use crate::helper::parse_resolve_strategy;
33
34const FN_NAME: &str = "reconcile_table";
35
36#[admin_fn(
45 name = ReconcileTableFunction,
46 display_name = reconcile_table,
47 sig_fn = signature,
48 ret = string
49)]
50pub(crate) async fn reconcile_table(
51 procedure_service_handler: &ProcedureServiceHandlerRef,
52 query_ctx: &QueryContextRef,
53 params: &[ValueRef<'_>],
54) -> Result<Value> {
55 let (table_name, resolve_strategy) = match params {
56 [ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest),
57 [ValueRef::String(table_name), ValueRef::String(resolve_strategy)] => {
58 (table_name, parse_resolve_strategy(resolve_strategy)?)
59 }
60 _ => {
61 return UnsupportedInputDataTypeSnafu {
62 function: FN_NAME,
63 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
64 }
65 .fail()
66 }
67 };
68 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
69 .map_err(BoxedError::new)
70 .context(TableMutationSnafu)?;
71 info!(
72 "Reconciling table: {} with resolve_strategy: {:?}",
73 format_full_table_name(&catalog_name, &schema_name, &table_name),
74 resolve_strategy
75 );
76 let pid = procedure_service_handler
77 .reconcile(ReconcileRequest {
78 target: Some(Target::ReconcileTable(ReconcileTable {
79 catalog_name,
80 schema_name,
81 table_name,
82 resolve_strategy: resolve_strategy as i32,
83 })),
84 ..Default::default()
85 })
86 .await?;
87 match pid {
88 Some(pid) => Ok(Value::from(pid)),
89 None => Ok(Value::Null),
90 }
91}
92
93fn signature() -> Signature {
94 Signature::one_of(
95 vec![
96 TypeSignature::Exact(vec![ArrowDataType::Utf8]),
98 TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]),
100 ],
101 Volatility::Immutable,
102 )
103}
104
105#[cfg(test)]
106mod tests {
107 use std::sync::Arc;
108
109 use arrow::array::StringArray;
110 use arrow::datatypes::{DataType, Field};
111 use datafusion_expr::ColumnarValue;
112
113 use crate::admin::reconcile_table::ReconcileTableFunction;
114 use crate::function::FunctionContext;
115 use crate::function_factory::ScalarFunctionFactory;
116
117 #[tokio::test]
118 async fn test_reconcile_table() {
119 common_telemetry::init_default_ut_logging();
120
121 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
123 let provider = factory.provide(FunctionContext::mock());
124 let f = provider.as_async().unwrap();
125
126 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
127 args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![
128 "test",
129 ])))],
130 arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))],
131 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
132 number_rows: 1,
133 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
134 };
135 let result = f.invoke_async_with_args(func_args).await.unwrap();
136 match result {
137 ColumnarValue::Array(array) => {
138 let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
139 assert_eq!(result_array.value(0), "test_pid");
140 }
141 ColumnarValue::Scalar(scalar) => {
142 assert_eq!(
143 scalar,
144 datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
145 );
146 }
147 }
148
149 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
151 let provider = factory.provide(FunctionContext::mock());
152 let f = provider.as_async().unwrap();
153
154 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
155 args: vec![
156 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
157 ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
158 ],
159 arg_fields: vec![
160 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
161 Arc::new(Field::new("arg_1", DataType::Utf8, false)),
162 ],
163 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
164 number_rows: 1,
165 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
166 };
167 let result = f.invoke_async_with_args(func_args).await.unwrap();
168 match result {
169 ColumnarValue::Array(array) => {
170 let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
171 assert_eq!(result_array.value(0), "test_pid");
172 }
173 ColumnarValue::Scalar(scalar) => {
174 assert_eq!(
175 scalar,
176 datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
177 );
178 }
179 }
180
181 let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into();
183 let provider = factory.provide(FunctionContext::mock());
184 let f = provider.as_async().unwrap();
185
186 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
187 args: vec![
188 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
189 ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))),
190 ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))),
191 ],
192 arg_fields: vec![
193 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
194 Arc::new(Field::new("arg_1", DataType::Utf8, false)),
195 Arc::new(Field::new("arg_2", DataType::Utf8, false)),
196 ],
197 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
198 number_rows: 1,
199 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
200 };
201 let _err = f.invoke_async_with_args(func_args).await.unwrap_err();
202 }
204}