1use std::str::FromStr;
16
17use api::v1::region::{compact_request, StrictWindow};
18use common_error::ext::BoxedError;
19use common_macro::admin_fn;
20use common_query::error::{
21 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
22 UnsupportedInputDataTypeSnafu,
23};
24use common_query::prelude::{Signature, Volatility};
25use common_telemetry::info;
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use session::table_name::table_name_to_full_name;
29use snafu::{ensure, ResultExt};
30use table::requests::{CompactTableRequest, FlushTableRequest};
31
32use crate::handlers::TableMutationHandlerRef;
33
34const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
36const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
38
39#[admin_fn(
40 name = FlushTableFunction,
41 display_name = flush_table,
42 sig_fn = flush_signature,
43 ret = uint64
44)]
45pub(crate) async fn flush_table(
46 table_mutation_handler: &TableMutationHandlerRef,
47 query_ctx: &QueryContextRef,
48 params: &[ValueRef<'_>],
49) -> Result<Value> {
50 ensure!(
51 params.len() == 1,
52 InvalidFuncArgsSnafu {
53 err_msg: format!(
54 "The length of the args is not correct, expect 1, have: {}",
55 params.len()
56 ),
57 }
58 );
59
60 let ValueRef::String(table_name) = params[0] else {
61 return UnsupportedInputDataTypeSnafu {
62 function: "flush_table",
63 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
64 }
65 .fail();
66 };
67
68 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
69 .map_err(BoxedError::new)
70 .context(TableMutationSnafu)?;
71
72 let affected_rows = table_mutation_handler
73 .flush(
74 FlushTableRequest {
75 catalog_name,
76 schema_name,
77 table_name,
78 },
79 query_ctx.clone(),
80 )
81 .await?;
82
83 Ok(Value::from(affected_rows as u64))
84}
85
86#[admin_fn(
87 name = CompactTableFunction,
88 display_name = compact_table,
89 sig_fn = compact_signature,
90 ret = uint64
91)]
92pub(crate) async fn compact_table(
93 table_mutation_handler: &TableMutationHandlerRef,
94 query_ctx: &QueryContextRef,
95 params: &[ValueRef<'_>],
96) -> Result<Value> {
97 let request = parse_compact_params(params, query_ctx)?;
98 info!("Compact table request: {:?}", request);
99
100 let affected_rows = table_mutation_handler
101 .compact(request, query_ctx.clone())
102 .await?;
103
104 Ok(Value::from(affected_rows as u64))
105}
106
107fn flush_signature() -> Signature {
108 Signature::uniform(
109 1,
110 vec![ConcreteDataType::string_datatype()],
111 Volatility::Immutable,
112 )
113}
114
115fn compact_signature() -> Signature {
116 Signature::variadic(
117 vec![ConcreteDataType::string_datatype()],
118 Volatility::Immutable,
119 )
120}
121
122fn parse_compact_params(
127 params: &[ValueRef<'_>],
128 query_ctx: &QueryContextRef,
129) -> Result<CompactTableRequest> {
130 ensure!(
131 !params.is_empty(),
132 InvalidFuncArgsSnafu {
133 err_msg: "Args cannot be empty",
134 }
135 );
136
137 let (table_name, compact_type) = match params {
138 [ValueRef::String(table_name)] => (
139 table_name,
140 compact_request::Options::Regular(Default::default()),
141 ),
142 [ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
143 let compact_type = parse_compact_type(compact_ty_str, None)?;
144 (table_name, compact_type)
145 }
146
147 [ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
148 {
149 let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
150 (table_name, compact_type)
151 }
152 _ => {
153 return UnsupportedInputDataTypeSnafu {
154 function: "compact_table",
155 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
156 }
157 .fail()
158 }
159 };
160
161 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
162 .map_err(BoxedError::new)
163 .context(TableMutationSnafu)?;
164
165 Ok(CompactTableRequest {
166 catalog_name,
167 schema_name,
168 table_name,
169 compact_options: compact_type,
170 })
171}
172
173fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
176 if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
177 | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
178 {
179 let window_seconds = option
180 .map(|v| {
181 i64::from_str(v).map_err(|_| {
182 InvalidFuncArgsSnafu {
183 err_msg: format!(
184 "Compact window is expected to be a valid number, provided: {}",
185 v
186 ),
187 }
188 .build()
189 })
190 })
191 .transpose()?
192 .unwrap_or(0);
193
194 Ok(compact_request::Options::StrictWindow(StrictWindow {
195 window_seconds,
196 }))
197 } else {
198 Ok(compact_request::Options::Regular(Default::default()))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205
206 use api::v1::region::compact_request::Options;
207 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
208 use common_query::prelude::TypeSignature;
209 use datatypes::vectors::{StringVector, UInt64Vector};
210 use session::context::QueryContext;
211
212 use super::*;
213 use crate::function::{AsyncFunction, FunctionContext};
214
215 macro_rules! define_table_function_test {
216 ($name: ident, $func: ident) => {
217 paste::paste!{
218 #[test]
219 fn [<test_ $name _misc>]() {
220 let f = $func;
221 assert_eq!(stringify!($name), f.name());
222 assert_eq!(
223 ConcreteDataType::uint64_datatype(),
224 f.return_type(&[]).unwrap()
225 );
226 assert!(matches!(f.signature(),
227 Signature {
228 type_signature: TypeSignature::Uniform(1, valid_types),
229 volatility: Volatility::Immutable
230 } if valid_types == vec![ConcreteDataType::string_datatype()]));
231 }
232
233 #[tokio::test]
234 async fn [<test_ $name _missing_table_mutation>]() {
235 let f = $func;
236
237 let args = vec!["test"];
238
239 let args = args
240 .into_iter()
241 .map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
242 .collect::<Vec<_>>();
243
244 let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
245 assert_eq!(
246 "Missing TableMutationHandler, not expected",
247 result.to_string()
248 );
249 }
250
251 #[tokio::test]
252 async fn [<test_ $name>]() {
253 let f = $func;
254
255
256 let args = vec!["test"];
257
258 let args = args
259 .into_iter()
260 .map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
261 .collect::<Vec<_>>();
262
263 let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
264
265 let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
266 assert_eq!(expect, result);
267 }
268 }
269 }
270 }
271
272 define_table_function_test!(flush_table, FlushTableFunction);
273
274 fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
275 for (params, expected) in cases {
276 let params = params
277 .iter()
278 .map(|s| ValueRef::String(s))
279 .collect::<Vec<_>>();
280
281 assert_eq!(
282 expected,
283 &parse_compact_params(¶ms, &QueryContext::arc()).unwrap()
284 );
285 }
286 }
287
288 #[test]
289 fn test_parse_compact_params() {
290 check_parse_compact_params(&[
291 (
292 &["table"],
293 CompactTableRequest {
294 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
295 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
296 table_name: "table".to_string(),
297 compact_options: Options::Regular(Default::default()),
298 },
299 ),
300 (
301 &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
302 CompactTableRequest {
303 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
304 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
305 table_name: "table".to_string(),
306 compact_options: Options::Regular(Default::default()),
307 },
308 ),
309 (
310 &[&format!(
311 "{}.{}.table",
312 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
313 )],
314 CompactTableRequest {
315 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
316 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
317 table_name: "table".to_string(),
318 compact_options: Options::Regular(Default::default()),
319 },
320 ),
321 (
322 &["table", "regular"],
323 CompactTableRequest {
324 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
325 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
326 table_name: "table".to_string(),
327 compact_options: Options::Regular(Default::default()),
328 },
329 ),
330 (
331 &["table", "strict_window"],
332 CompactTableRequest {
333 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
334 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
335 table_name: "table".to_string(),
336 compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
337 },
338 ),
339 (
340 &["table", "strict_window", "3600"],
341 CompactTableRequest {
342 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
343 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
344 table_name: "table".to_string(),
345 compact_options: Options::StrictWindow(StrictWindow {
346 window_seconds: 3600,
347 }),
348 },
349 ),
350 (
351 &["table", "regular", "abcd"],
352 CompactTableRequest {
353 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
354 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
355 table_name: "table".to_string(),
356 compact_options: Options::Regular(Default::default()),
357 },
358 ),
359 (
360 &["table", "swcs", "120"],
361 CompactTableRequest {
362 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
363 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
364 table_name: "table".to_string(),
365 compact_options: Options::StrictWindow(StrictWindow {
366 window_seconds: 120,
367 }),
368 },
369 ),
370 ]);
371
372 assert!(parse_compact_params(
373 &["table", "strict_window", "abc"]
374 .into_iter()
375 .map(ValueRef::String)
376 .collect::<Vec<_>>(),
377 &QueryContext::arc(),
378 )
379 .is_err());
380
381 assert!(parse_compact_params(
382 &["a.b.table", "strict_window", "abc"]
383 .into_iter()
384 .map(ValueRef::String)
385 .collect::<Vec<_>>(),
386 &QueryContext::arc(),
387 )
388 .is_err());
389 }
390}