common_function/admin/
gc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_error::ext::BoxedError;
18use common_macro::admin_fn;
19use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest};
20use common_query::error::{
21    InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu,
22    UnsupportedInputDataTypeSnafu,
23};
24use datafusion_expr::{Signature, TypeSignature, Volatility};
25use datatypes::arrow::datatypes::DataType as ArrowDataType;
26use datatypes::prelude::*;
27use session::context::QueryContextRef;
28use snafu::{ResultExt, ensure};
29
30use crate::handlers::ProcedureServiceHandlerRef;
31use crate::helper::cast_u64;
32
33const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60);
34const DEFAULT_FULL_FILE_LISTING: bool = false;
35
36#[admin_fn(
37    name = GcRegionsFunction,
38    display_name = gc_regions,
39    sig_fn = gc_regions_signature,
40    ret = uint64
41)]
42pub(crate) async fn gc_regions(
43    procedure_service_handler: &ProcedureServiceHandlerRef,
44    _ctx: &QueryContextRef,
45    params: &[ValueRef<'_>],
46) -> Result<Value> {
47    let (region_ids, full_file_listing) = parse_gc_regions_params(params)?;
48
49    let resp = procedure_service_handler
50        .gc_regions(GcRegionsRequest {
51            region_ids,
52            full_file_listing,
53            timeout: DEFAULT_GC_TIMEOUT,
54        })
55        .await?;
56
57    Ok(Value::from(resp.processed_regions))
58}
59
60#[admin_fn(
61    name = GcTableFunction,
62    display_name = gc_table,
63    sig_fn = gc_table_signature,
64    ret = uint64
65)]
66pub(crate) async fn gc_table(
67    procedure_service_handler: &ProcedureServiceHandlerRef,
68    query_ctx: &QueryContextRef,
69    params: &[ValueRef<'_>],
70) -> Result<Value> {
71    let (catalog_name, schema_name, table_name, full_file_listing) =
72        parse_gc_table_params(params, query_ctx)?;
73
74    let resp = procedure_service_handler
75        .gc_table(GcTableRequest {
76            catalog_name,
77            schema_name,
78            table_name,
79            full_file_listing,
80            timeout: DEFAULT_GC_TIMEOUT,
81        })
82        .await?;
83
84    Ok(Value::from(resp.processed_regions))
85}
86
87fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec<u64>, bool)> {
88    ensure!(
89        !params.is_empty(),
90        InvalidFuncArgsSnafu {
91            err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
92                .to_string(),
93        }
94    );
95
96    let (full_file_listing, region_params) = match params.last() {
97        Some(ValueRef::Boolean(value)) => (*value, &params[..params.len() - 1]),
98        _ => (DEFAULT_FULL_FILE_LISTING, params),
99    };
100
101    ensure!(
102        !region_params.is_empty(),
103        InvalidFuncArgsSnafu {
104            err_msg: "The length of the args is not correct, expect at least 1 region id"
105                .to_string(),
106        }
107    );
108
109    let mut region_ids = Vec::with_capacity(region_params.len());
110    for param in region_params {
111        let Some(region_id) = cast_u64(param)? else {
112            return UnsupportedInputDataTypeSnafu {
113                function: "gc_regions",
114                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
115            }
116            .fail();
117        };
118        region_ids.push(region_id);
119    }
120
121    Ok((region_ids, full_file_listing))
122}
123
124fn parse_gc_table_params(
125    params: &[ValueRef<'_>],
126    query_ctx: &QueryContextRef,
127) -> Result<(String, String, String, bool)> {
128    ensure!(
129        matches!(params.len(), 1 | 2),
130        InvalidFuncArgsSnafu {
131            err_msg: format!(
132                "The length of the args is not correct, expect 1 or 2, have: {}",
133                params.len()
134            ),
135        }
136    );
137
138    let ValueRef::String(table_name) = params[0] else {
139        return UnsupportedInputDataTypeSnafu {
140            function: "gc_table",
141            datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
142        }
143        .fail();
144    };
145
146    let full_file_listing = if params.len() == 2 {
147        let ValueRef::Boolean(value) = params[1] else {
148            return UnsupportedInputDataTypeSnafu {
149                function: "gc_table",
150                datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
151            }
152            .fail();
153        };
154        value
155    } else {
156        DEFAULT_FULL_FILE_LISTING
157    };
158
159    let (catalog_name, schema_name, table_name) =
160        session::table_name::table_name_to_full_name(table_name, query_ctx)
161            .map_err(BoxedError::new)
162            .context(TableMutationSnafu)?;
163
164    Ok((catalog_name, schema_name, table_name, full_file_listing))
165}
166
167fn gc_regions_signature() -> Signature {
168    Signature::variadic_any(Volatility::Immutable)
169}
170
171fn gc_table_signature() -> Signature {
172    Signature::one_of(
173        vec![
174            TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]),
175            TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]),
176        ],
177        Volatility::Immutable,
178    )
179}
180
181#[cfg(test)]
182mod tests {
183    use session::context::QueryContext;
184
185    use super::*;
186
187    #[test]
188    fn test_parse_gc_regions_params_with_full_file_listing() {
189        let params = vec![
190            ValueRef::UInt64(1),
191            ValueRef::UInt64(2),
192            ValueRef::Boolean(true),
193        ];
194        let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
195
196        assert_eq!(region_ids, vec![1, 2]);
197        assert!(full_file_listing);
198    }
199
200    #[test]
201    fn test_parse_gc_regions_params_default_full_file_listing() {
202        let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)];
203        let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
204
205        assert_eq!(region_ids, vec![1, 2]);
206        assert!(!full_file_listing);
207    }
208
209    #[test]
210    fn test_parse_gc_table_params_with_full_file_listing() {
211        let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)];
212        let (catalog, schema, table, full_file_listing) =
213            parse_gc_table_params(&params, &QueryContext::arc()).unwrap();
214
215        assert_eq!(catalog, "greptime");
216        assert_eq!(schema, "public");
217        assert_eq!(table, "t");
218        assert!(full_file_listing);
219    }
220}