table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::HashMap;
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::s3::is_supported_in_s3;
23use common_query::AddColumnLocation;
24use common_time::range::TimestampRange;
25use common_time::TimeToLive;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::prelude::VectorRef;
28use datatypes::schema::{ColumnSchema, FulltextOptions, SkippingIndexOptions};
29use greptime_proto::v1::region::compact_request;
30use serde::{Deserialize, Serialize};
31use store_api::metric_engine_consts::{
32    is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
33};
34use store_api::mito_engine_options::is_mito_engine_option_key;
35use store_api::region_request::{SetRegionOption, UnsetRegionOption};
36
37use crate::error::{ParseTableOptionSnafu, Result};
38use crate::metadata::{TableId, TableVersion};
39use crate::table_reference::TableReference;
40
41pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
42pub const FILE_TABLE_LOCATION_KEY: &str = "location";
43pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
44pub const FILE_TABLE_FORMAT_KEY: &str = "format";
45
46pub const TABLE_DATA_MODEL: &str = "table_data_model";
47pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
48
49pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [
50    // common keys:
51    WRITE_BUFFER_SIZE_KEY,
52    TTL_KEY,
53    STORAGE_KEY,
54    COMMENT_KEY,
55    SKIP_WAL_KEY,
56    // file engine keys:
57    FILE_TABLE_LOCATION_KEY,
58    FILE_TABLE_FORMAT_KEY,
59    FILE_TABLE_PATTERN_KEY,
60    // metric engine keys:
61    PHYSICAL_TABLE_METADATA_KEY,
62    LOGICAL_TABLE_METADATA_KEY,
63    // table model info
64    TABLE_DATA_MODEL,
65];
66
67/// Returns true if the `key` is a valid key for any engine or storage.
68pub fn validate_table_option(key: &str) -> bool {
69    if is_supported_in_s3(key) {
70        return true;
71    }
72
73    if is_mito_engine_option_key(key) {
74        return true;
75    }
76
77    if is_metric_engine_option_key(key) {
78        return true;
79    }
80
81    VALID_TABLE_OPTION_KEYS.contains(&key)
82}
83
84#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct TableOptions {
87    /// Memtable size of memtable.
88    pub write_buffer_size: Option<ReadableSize>,
89    /// Time-to-live of table. Expired data will be automatically purged.
90    pub ttl: Option<TimeToLive>,
91    /// Skip wal write for this table.
92    pub skip_wal: bool,
93    /// Extra options that may not applicable to all table engines.
94    pub extra_options: HashMap<String, String>,
95}
96
97pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
98pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
99pub const STORAGE_KEY: &str = "storage";
100pub const COMMENT_KEY: &str = "comment";
101pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
102pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
103
104impl TableOptions {
105    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
106        iter: U,
107    ) -> Result<TableOptions> {
108        let mut options = TableOptions::default();
109
110        let kvs: HashMap<String, String> = iter
111            .into_iter()
112            .map(|(k, v)| (k.to_string(), v.to_string()))
113            .collect();
114
115        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
116            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
117                ParseTableOptionSnafu {
118                    key: WRITE_BUFFER_SIZE_KEY,
119                    value: write_buffer_size,
120                }
121                .build()
122            })?;
123            options.write_buffer_size = Some(size)
124        }
125
126        if let Some(ttl) = kvs.get(TTL_KEY) {
127            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
128                ParseTableOptionSnafu {
129                    key: TTL_KEY,
130                    value: ttl,
131                }
132                .build()
133            })?;
134            options.ttl = Some(ttl_value);
135        }
136
137        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
138            options.skip_wal = skip_wal.parse().map_err(|_| {
139                ParseTableOptionSnafu {
140                    key: SKIP_WAL_KEY,
141                    value: skip_wal,
142                }
143                .build()
144            })?;
145        }
146
147        options.extra_options = HashMap::from_iter(
148            kvs.into_iter()
149                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
150        );
151
152        Ok(options)
153    }
154}
155
156impl fmt::Display for TableOptions {
157    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
158        let mut key_vals = vec![];
159        if let Some(size) = self.write_buffer_size {
160            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
161        }
162
163        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
164            key_vals.push(format!("{}={}", TTL_KEY, ttl));
165        }
166
167        if self.skip_wal {
168            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
169        }
170
171        for (k, v) in &self.extra_options {
172            key_vals.push(format!("{}={}", k, v));
173        }
174
175        write!(f, "{}", key_vals.join(" "))
176    }
177}
178
179impl From<&TableOptions> for HashMap<String, String> {
180    fn from(opts: &TableOptions) -> Self {
181        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
182        if let Some(write_buffer_size) = opts.write_buffer_size {
183            let _ = res.insert(
184                WRITE_BUFFER_SIZE_KEY.to_string(),
185                write_buffer_size.to_string(),
186            );
187        }
188        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
189            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
190        }
191        res.extend(
192            opts.extra_options
193                .iter()
194                .map(|(k, v)| (k.clone(), v.clone())),
195        );
196        res
197    }
198}
199
200/// Alter table request
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct AlterTableRequest {
203    pub catalog_name: String,
204    pub schema_name: String,
205    pub table_name: String,
206    pub table_id: TableId,
207    pub alter_kind: AlterKind,
208    // None in standalone.
209    pub table_version: Option<TableVersion>,
210}
211
212/// Add column request
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct AddColumnRequest {
215    pub column_schema: ColumnSchema,
216    pub is_key: bool,
217    pub location: Option<AddColumnLocation>,
218    /// Add column if not exists.
219    pub add_if_not_exists: bool,
220}
221
222/// Change column datatype request
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct ModifyColumnTypeRequest {
225    pub column_name: String,
226    pub target_type: ConcreteDataType,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum AlterKind {
231    AddColumns {
232        columns: Vec<AddColumnRequest>,
233    },
234    DropColumns {
235        names: Vec<String>,
236    },
237    ModifyColumnTypes {
238        columns: Vec<ModifyColumnTypeRequest>,
239    },
240    RenameTable {
241        new_table_name: String,
242    },
243    SetTableOptions {
244        options: Vec<SetRegionOption>,
245    },
246    UnsetTableOptions {
247        keys: Vec<UnsetRegionOption>,
248    },
249    SetIndex {
250        options: SetIndexOptions,
251    },
252    UnsetIndex {
253        options: UnsetIndexOptions,
254    },
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub enum SetIndexOptions {
259    Fulltext {
260        column_name: String,
261        options: FulltextOptions,
262    },
263    Inverted {
264        column_name: String,
265    },
266    Skipping {
267        column_name: String,
268        options: SkippingIndexOptions,
269    },
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub enum UnsetIndexOptions {
274    Fulltext { column_name: String },
275    Inverted { column_name: String },
276    Skipping { column_name: String },
277}
278
279#[derive(Debug)]
280pub struct InsertRequest {
281    pub catalog_name: String,
282    pub schema_name: String,
283    pub table_name: String,
284    pub columns_values: HashMap<String, VectorRef>,
285}
286
287/// Delete (by primary key) request
288#[derive(Debug)]
289pub struct DeleteRequest {
290    pub catalog_name: String,
291    pub schema_name: String,
292    pub table_name: String,
293    /// Values of each column in this table's primary key and time index.
294    ///
295    /// The key is the column name, and the value is the column value.
296    pub key_column_values: HashMap<String, VectorRef>,
297}
298
299#[derive(Debug)]
300pub enum CopyDirection {
301    Export,
302    Import,
303}
304
305/// Copy table request
306#[derive(Debug)]
307pub struct CopyTableRequest {
308    pub catalog_name: String,
309    pub schema_name: String,
310    pub table_name: String,
311    pub location: String,
312    pub with: HashMap<String, String>,
313    pub connection: HashMap<String, String>,
314    pub pattern: Option<String>,
315    pub direction: CopyDirection,
316    pub timestamp_range: Option<TimestampRange>,
317    pub limit: Option<u64>,
318}
319
320#[derive(Debug, Clone, Default)]
321pub struct FlushTableRequest {
322    pub catalog_name: String,
323    pub schema_name: String,
324    pub table_name: String,
325}
326
327#[derive(Debug, Clone, PartialEq)]
328pub struct CompactTableRequest {
329    pub catalog_name: String,
330    pub schema_name: String,
331    pub table_name: String,
332    pub compact_options: compact_request::Options,
333}
334
335impl Default for CompactTableRequest {
336    fn default() -> Self {
337        Self {
338            catalog_name: Default::default(),
339            schema_name: Default::default(),
340            table_name: Default::default(),
341            compact_options: compact_request::Options::Regular(Default::default()),
342        }
343    }
344}
345
346/// Truncate table request
347#[derive(Debug, Clone, Serialize, Deserialize)]
348pub struct TruncateTableRequest {
349    pub catalog_name: String,
350    pub schema_name: String,
351    pub table_name: String,
352    pub table_id: TableId,
353}
354
355impl TruncateTableRequest {
356    pub fn table_ref(&self) -> TableReference {
357        TableReference {
358            catalog: &self.catalog_name,
359            schema: &self.schema_name,
360            table: &self.table_name,
361        }
362    }
363}
364
365#[derive(Debug, Clone, Default, Deserialize, Serialize)]
366pub struct CopyDatabaseRequest {
367    pub catalog_name: String,
368    pub schema_name: String,
369    pub location: String,
370    pub with: HashMap<String, String>,
371    pub connection: HashMap<String, String>,
372    pub time_range: Option<TimestampRange>,
373}
374
375#[derive(Debug, Clone, Default, Deserialize, Serialize)]
376pub struct CopyQueryToRequest {
377    pub location: String,
378    pub with: HashMap<String, String>,
379    pub connection: HashMap<String, String>,
380}
381
382#[cfg(test)]
383mod tests {
384    use std::time::Duration;
385
386    use super::*;
387
388    #[test]
389    fn test_validate_table_option() {
390        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
391        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
392        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
393        assert!(validate_table_option(TTL_KEY));
394        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
395        assert!(validate_table_option(STORAGE_KEY));
396        assert!(!validate_table_option("foo"));
397    }
398
399    #[test]
400    fn test_serialize_table_options() {
401        let options = TableOptions {
402            write_buffer_size: None,
403            ttl: Some(Duration::from_secs(1000).into()),
404            extra_options: HashMap::new(),
405            skip_wal: false,
406        };
407        let serialized = serde_json::to_string(&options).unwrap();
408        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
409        assert_eq!(options, deserialized);
410    }
411
412    #[test]
413    fn test_convert_hashmap_between_table_options() {
414        let options = TableOptions {
415            write_buffer_size: Some(ReadableSize::mb(128)),
416            ttl: Some(Duration::from_secs(1000).into()),
417            extra_options: HashMap::new(),
418            skip_wal: false,
419        };
420        let serialized_map = HashMap::from(&options);
421        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
422        assert_eq!(options, serialized);
423
424        let options = TableOptions {
425            write_buffer_size: None,
426            ttl: Default::default(),
427            extra_options: HashMap::new(),
428            skip_wal: false,
429        };
430        let serialized_map = HashMap::from(&options);
431        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
432        assert_eq!(options, serialized);
433
434        let options = TableOptions {
435            write_buffer_size: Some(ReadableSize::mb(128)),
436            ttl: Some(Duration::from_secs(1000).into()),
437            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
438            skip_wal: false,
439        };
440        let serialized_map = HashMap::from(&options);
441        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
442        assert_eq!(options, serialized);
443    }
444
445    #[test]
446    fn test_table_options_to_string() {
447        let options = TableOptions {
448            write_buffer_size: Some(ReadableSize::mb(128)),
449            ttl: Some(Duration::from_secs(1000).into()),
450            extra_options: HashMap::new(),
451            skip_wal: false,
452        };
453
454        assert_eq!(
455            "write_buffer_size=128.0MiB ttl=16m 40s",
456            options.to_string()
457        );
458
459        let options = TableOptions {
460            write_buffer_size: Some(ReadableSize::mb(128)),
461            ttl: Some(Duration::from_secs(1000).into()),
462            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
463            skip_wal: false,
464        };
465
466        assert_eq!(
467            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
468            options.to_string()
469        );
470
471        let options = TableOptions {
472            write_buffer_size: Some(ReadableSize::mb(128)),
473            ttl: Some(Duration::from_secs(1000).into()),
474            extra_options: HashMap::new(),
475            skip_wal: true,
476        };
477        assert_eq!(
478            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
479            options.to_string()
480        );
481    }
482}