1use std::time::Duration;
16
17use common_error::ext::BoxedError;
18use common_macro::admin_fn;
19use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest};
20use common_query::error::{
21 InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu,
22 UnsupportedInputDataTypeSnafu,
23};
24use datafusion_expr::{Signature, TypeSignature, Volatility};
25use datatypes::arrow::datatypes::DataType as ArrowDataType;
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use snafu::{ResultExt, ensure};
29
30use crate::handlers::ProcedureServiceHandlerRef;
31use crate::helper::cast_u64;
32
33const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60);
34const DEFAULT_FULL_FILE_LISTING: bool = false;
35
36#[admin_fn(
37 name = GcRegionsFunction,
38 display_name = gc_regions,
39 sig_fn = gc_regions_signature,
40 ret = uint64
41)]
42pub(crate) async fn gc_regions(
43 procedure_service_handler: &ProcedureServiceHandlerRef,
44 _ctx: &QueryContextRef,
45 params: &[ValueRef<'_>],
46) -> Result<Value> {
47 let (region_ids, full_file_listing) = parse_gc_regions_params(params)?;
48
49 let resp = procedure_service_handler
50 .gc_regions(GcRegionsRequest {
51 region_ids,
52 full_file_listing,
53 timeout: DEFAULT_GC_TIMEOUT,
54 })
55 .await?;
56
57 Ok(Value::from(resp.processed_regions))
58}
59
60#[admin_fn(
61 name = GcTableFunction,
62 display_name = gc_table,
63 sig_fn = gc_table_signature,
64 ret = uint64
65)]
66pub(crate) async fn gc_table(
67 procedure_service_handler: &ProcedureServiceHandlerRef,
68 query_ctx: &QueryContextRef,
69 params: &[ValueRef<'_>],
70) -> Result<Value> {
71 let (catalog_name, schema_name, table_name, full_file_listing) =
72 parse_gc_table_params(params, query_ctx)?;
73
74 let resp = procedure_service_handler
75 .gc_table(GcTableRequest {
76 catalog_name,
77 schema_name,
78 table_name,
79 full_file_listing,
80 timeout: DEFAULT_GC_TIMEOUT,
81 })
82 .await?;
83
84 Ok(Value::from(resp.processed_regions))
85}
86
87fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec<u64>, bool)> {
88 ensure!(
89 !params.is_empty(),
90 InvalidFuncArgsSnafu {
91 err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
92 .to_string(),
93 }
94 );
95
96 let (full_file_listing, region_params) = match params.last() {
97 Some(ValueRef::Boolean(value)) => (*value, ¶ms[..params.len() - 1]),
98 _ => (DEFAULT_FULL_FILE_LISTING, params),
99 };
100
101 ensure!(
102 !region_params.is_empty(),
103 InvalidFuncArgsSnafu {
104 err_msg: "The length of the args is not correct, expect at least 1 region id"
105 .to_string(),
106 }
107 );
108
109 let mut region_ids = Vec::with_capacity(region_params.len());
110 for param in region_params {
111 let Some(region_id) = cast_u64(param)? else {
112 return UnsupportedInputDataTypeSnafu {
113 function: "gc_regions",
114 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
115 }
116 .fail();
117 };
118 region_ids.push(region_id);
119 }
120
121 Ok((region_ids, full_file_listing))
122}
123
124fn parse_gc_table_params(
125 params: &[ValueRef<'_>],
126 query_ctx: &QueryContextRef,
127) -> Result<(String, String, String, bool)> {
128 ensure!(
129 matches!(params.len(), 1 | 2),
130 InvalidFuncArgsSnafu {
131 err_msg: format!(
132 "The length of the args is not correct, expect 1 or 2, have: {}",
133 params.len()
134 ),
135 }
136 );
137
138 let ValueRef::String(table_name) = params[0] else {
139 return UnsupportedInputDataTypeSnafu {
140 function: "gc_table",
141 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
142 }
143 .fail();
144 };
145
146 let full_file_listing = if params.len() == 2 {
147 let ValueRef::Boolean(value) = params[1] else {
148 return UnsupportedInputDataTypeSnafu {
149 function: "gc_table",
150 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
151 }
152 .fail();
153 };
154 value
155 } else {
156 DEFAULT_FULL_FILE_LISTING
157 };
158
159 let (catalog_name, schema_name, table_name) =
160 session::table_name::table_name_to_full_name(table_name, query_ctx)
161 .map_err(BoxedError::new)
162 .context(TableMutationSnafu)?;
163
164 Ok((catalog_name, schema_name, table_name, full_file_listing))
165}
166
167fn gc_regions_signature() -> Signature {
168 Signature::variadic_any(Volatility::Immutable)
169}
170
171fn gc_table_signature() -> Signature {
172 Signature::one_of(
173 vec![
174 TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]),
175 TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]),
176 ],
177 Volatility::Immutable,
178 )
179}
180
181#[cfg(test)]
182mod tests {
183 use session::context::QueryContext;
184
185 use super::*;
186
187 #[test]
188 fn test_parse_gc_regions_params_with_full_file_listing() {
189 let params = vec![
190 ValueRef::UInt64(1),
191 ValueRef::UInt64(2),
192 ValueRef::Boolean(true),
193 ];
194 let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap();
195
196 assert_eq!(region_ids, vec![1, 2]);
197 assert!(full_file_listing);
198 }
199
200 #[test]
201 fn test_parse_gc_regions_params_default_full_file_listing() {
202 let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)];
203 let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap();
204
205 assert_eq!(region_ids, vec![1, 2]);
206 assert!(!full_file_listing);
207 }
208
209 #[test]
210 fn test_parse_gc_table_params_with_full_file_listing() {
211 let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)];
212 let (catalog, schema, table, full_file_listing) =
213 parse_gc_table_params(¶ms, &QueryContext::arc()).unwrap();
214
215 assert_eq!(catalog, "greptime");
216 assert_eq!(schema, "public");
217 assert_eq!(table, "t");
218 assert!(full_file_listing);
219 }
220}