table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::HashMap;
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::range::TimestampRange;
26use common_time::TimeToLive;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{ColumnSchema, FulltextOptions, SkippingIndexOptions};
30use greptime_proto::v1::region::compact_request;
31use serde::{Deserialize, Serialize};
32use store_api::metric_engine_consts::{
33    is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
34};
35use store_api::mito_engine_options::is_mito_engine_option_key;
36use store_api::region_request::{SetRegionOption, UnsetRegionOption};
37
38use crate::error::{ParseTableOptionSnafu, Result};
39use crate::metadata::{TableId, TableVersion};
40use crate::table_reference::TableReference;
41
42pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
43pub const FILE_TABLE_LOCATION_KEY: &str = "location";
44pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
45pub const FILE_TABLE_FORMAT_KEY: &str = "format";
46
47pub const TABLE_DATA_MODEL: &str = "table_data_model";
48pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
49
50pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [
51    // common keys:
52    WRITE_BUFFER_SIZE_KEY,
53    TTL_KEY,
54    STORAGE_KEY,
55    COMMENT_KEY,
56    SKIP_WAL_KEY,
57    // file engine keys:
58    FILE_TABLE_LOCATION_KEY,
59    FILE_TABLE_FORMAT_KEY,
60    FILE_TABLE_PATTERN_KEY,
61    // metric engine keys:
62    PHYSICAL_TABLE_METADATA_KEY,
63    LOGICAL_TABLE_METADATA_KEY,
64    // table model info
65    TABLE_DATA_MODEL,
66];
67
68/// Returns true if the `key` is a valid key for any engine or storage.
69pub fn validate_table_option(key: &str) -> bool {
70    if is_supported_in_s3(key) {
71        return true;
72    }
73
74    if is_supported_in_oss(key) {
75        return true;
76    }
77
78    if is_mito_engine_option_key(key) {
79        return true;
80    }
81
82    if is_metric_engine_option_key(key) {
83        return true;
84    }
85
86    VALID_TABLE_OPTION_KEYS.contains(&key)
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
90#[serde(default)]
91pub struct TableOptions {
92    /// Memtable size of memtable.
93    pub write_buffer_size: Option<ReadableSize>,
94    /// Time-to-live of table. Expired data will be automatically purged.
95    pub ttl: Option<TimeToLive>,
96    /// Skip wal write for this table.
97    pub skip_wal: bool,
98    /// Extra options that may not applicable to all table engines.
99    pub extra_options: HashMap<String, String>,
100}
101
102pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
103pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
104pub const STORAGE_KEY: &str = "storage";
105pub const COMMENT_KEY: &str = "comment";
106pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
107pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
108
109impl TableOptions {
110    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
111        iter: U,
112    ) -> Result<TableOptions> {
113        let mut options = TableOptions::default();
114
115        let kvs: HashMap<String, String> = iter
116            .into_iter()
117            .map(|(k, v)| (k.to_string(), v.to_string()))
118            .collect();
119
120        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
121            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
122                ParseTableOptionSnafu {
123                    key: WRITE_BUFFER_SIZE_KEY,
124                    value: write_buffer_size,
125                }
126                .build()
127            })?;
128            options.write_buffer_size = Some(size)
129        }
130
131        if let Some(ttl) = kvs.get(TTL_KEY) {
132            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
133                ParseTableOptionSnafu {
134                    key: TTL_KEY,
135                    value: ttl,
136                }
137                .build()
138            })?;
139            options.ttl = Some(ttl_value);
140        }
141
142        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
143            options.skip_wal = skip_wal.parse().map_err(|_| {
144                ParseTableOptionSnafu {
145                    key: SKIP_WAL_KEY,
146                    value: skip_wal,
147                }
148                .build()
149            })?;
150        }
151
152        options.extra_options = HashMap::from_iter(
153            kvs.into_iter()
154                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
155        );
156
157        Ok(options)
158    }
159}
160
161impl fmt::Display for TableOptions {
162    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163        let mut key_vals = vec![];
164        if let Some(size) = self.write_buffer_size {
165            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
166        }
167
168        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
169            key_vals.push(format!("{}={}", TTL_KEY, ttl));
170        }
171
172        if self.skip_wal {
173            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
174        }
175
176        for (k, v) in &self.extra_options {
177            key_vals.push(format!("{}={}", k, v));
178        }
179
180        write!(f, "{}", key_vals.join(" "))
181    }
182}
183
184impl From<&TableOptions> for HashMap<String, String> {
185    fn from(opts: &TableOptions) -> Self {
186        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
187        if let Some(write_buffer_size) = opts.write_buffer_size {
188            let _ = res.insert(
189                WRITE_BUFFER_SIZE_KEY.to_string(),
190                write_buffer_size.to_string(),
191            );
192        }
193        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
194            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
195        }
196        res.extend(
197            opts.extra_options
198                .iter()
199                .map(|(k, v)| (k.clone(), v.clone())),
200        );
201        res
202    }
203}
204
205/// Alter table request
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AlterTableRequest {
208    pub catalog_name: String,
209    pub schema_name: String,
210    pub table_name: String,
211    pub table_id: TableId,
212    pub alter_kind: AlterKind,
213    // None in standalone.
214    pub table_version: Option<TableVersion>,
215}
216
217/// Add column request
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct AddColumnRequest {
220    pub column_schema: ColumnSchema,
221    pub is_key: bool,
222    pub location: Option<AddColumnLocation>,
223    /// Add column if not exists.
224    pub add_if_not_exists: bool,
225}
226
227/// Change column datatype request
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct ModifyColumnTypeRequest {
230    pub column_name: String,
231    pub target_type: ConcreteDataType,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub enum AlterKind {
236    AddColumns {
237        columns: Vec<AddColumnRequest>,
238    },
239    DropColumns {
240        names: Vec<String>,
241    },
242    ModifyColumnTypes {
243        columns: Vec<ModifyColumnTypeRequest>,
244    },
245    RenameTable {
246        new_table_name: String,
247    },
248    SetTableOptions {
249        options: Vec<SetRegionOption>,
250    },
251    UnsetTableOptions {
252        keys: Vec<UnsetRegionOption>,
253    },
254    SetIndex {
255        options: SetIndexOptions,
256    },
257    UnsetIndex {
258        options: UnsetIndexOptions,
259    },
260    DropDefaults {
261        names: Vec<String>,
262    },
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum SetIndexOptions {
267    Fulltext {
268        column_name: String,
269        options: FulltextOptions,
270    },
271    Inverted {
272        column_name: String,
273    },
274    Skipping {
275        column_name: String,
276        options: SkippingIndexOptions,
277    },
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub enum UnsetIndexOptions {
282    Fulltext { column_name: String },
283    Inverted { column_name: String },
284    Skipping { column_name: String },
285}
286
287#[derive(Debug)]
288pub struct InsertRequest {
289    pub catalog_name: String,
290    pub schema_name: String,
291    pub table_name: String,
292    pub columns_values: HashMap<String, VectorRef>,
293}
294
295/// Delete (by primary key) request
296#[derive(Debug)]
297pub struct DeleteRequest {
298    pub catalog_name: String,
299    pub schema_name: String,
300    pub table_name: String,
301    /// Values of each column in this table's primary key and time index.
302    ///
303    /// The key is the column name, and the value is the column value.
304    pub key_column_values: HashMap<String, VectorRef>,
305}
306
307#[derive(Debug)]
308pub enum CopyDirection {
309    Export,
310    Import,
311}
312
313/// Copy table request
314#[derive(Debug)]
315pub struct CopyTableRequest {
316    pub catalog_name: String,
317    pub schema_name: String,
318    pub table_name: String,
319    pub location: String,
320    pub with: HashMap<String, String>,
321    pub connection: HashMap<String, String>,
322    pub pattern: Option<String>,
323    pub direction: CopyDirection,
324    pub timestamp_range: Option<TimestampRange>,
325    pub limit: Option<u64>,
326}
327
328#[derive(Debug, Clone, Default)]
329pub struct FlushTableRequest {
330    pub catalog_name: String,
331    pub schema_name: String,
332    pub table_name: String,
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub struct CompactTableRequest {
337    pub catalog_name: String,
338    pub schema_name: String,
339    pub table_name: String,
340    pub compact_options: compact_request::Options,
341}
342
343impl Default for CompactTableRequest {
344    fn default() -> Self {
345        Self {
346            catalog_name: Default::default(),
347            schema_name: Default::default(),
348            table_name: Default::default(),
349            compact_options: compact_request::Options::Regular(Default::default()),
350        }
351    }
352}
353
354/// Truncate table request
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct TruncateTableRequest {
357    pub catalog_name: String,
358    pub schema_name: String,
359    pub table_name: String,
360    pub table_id: TableId,
361}
362
363impl TruncateTableRequest {
364    pub fn table_ref(&self) -> TableReference {
365        TableReference {
366            catalog: &self.catalog_name,
367            schema: &self.schema_name,
368            table: &self.table_name,
369        }
370    }
371}
372
373#[derive(Debug, Clone, Default, Deserialize, Serialize)]
374pub struct CopyDatabaseRequest {
375    pub catalog_name: String,
376    pub schema_name: String,
377    pub location: String,
378    pub with: HashMap<String, String>,
379    pub connection: HashMap<String, String>,
380    pub time_range: Option<TimestampRange>,
381}
382
383#[derive(Debug, Clone, Default, Deserialize, Serialize)]
384pub struct CopyQueryToRequest {
385    pub location: String,
386    pub with: HashMap<String, String>,
387    pub connection: HashMap<String, String>,
388}
389
390#[cfg(test)]
391mod tests {
392    use std::time::Duration;
393
394    use super::*;
395
396    #[test]
397    fn test_validate_table_option() {
398        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
399        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
400        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
401        assert!(validate_table_option(TTL_KEY));
402        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
403        assert!(validate_table_option(STORAGE_KEY));
404        assert!(!validate_table_option("foo"));
405    }
406
407    #[test]
408    fn test_serialize_table_options() {
409        let options = TableOptions {
410            write_buffer_size: None,
411            ttl: Some(Duration::from_secs(1000).into()),
412            extra_options: HashMap::new(),
413            skip_wal: false,
414        };
415        let serialized = serde_json::to_string(&options).unwrap();
416        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
417        assert_eq!(options, deserialized);
418    }
419
420    #[test]
421    fn test_convert_hashmap_between_table_options() {
422        let options = TableOptions {
423            write_buffer_size: Some(ReadableSize::mb(128)),
424            ttl: Some(Duration::from_secs(1000).into()),
425            extra_options: HashMap::new(),
426            skip_wal: false,
427        };
428        let serialized_map = HashMap::from(&options);
429        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
430        assert_eq!(options, serialized);
431
432        let options = TableOptions {
433            write_buffer_size: None,
434            ttl: Default::default(),
435            extra_options: HashMap::new(),
436            skip_wal: false,
437        };
438        let serialized_map = HashMap::from(&options);
439        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
440        assert_eq!(options, serialized);
441
442        let options = TableOptions {
443            write_buffer_size: Some(ReadableSize::mb(128)),
444            ttl: Some(Duration::from_secs(1000).into()),
445            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
446            skip_wal: false,
447        };
448        let serialized_map = HashMap::from(&options);
449        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
450        assert_eq!(options, serialized);
451    }
452
453    #[test]
454    fn test_table_options_to_string() {
455        let options = TableOptions {
456            write_buffer_size: Some(ReadableSize::mb(128)),
457            ttl: Some(Duration::from_secs(1000).into()),
458            extra_options: HashMap::new(),
459            skip_wal: false,
460        };
461
462        assert_eq!(
463            "write_buffer_size=128.0MiB ttl=16m 40s",
464            options.to_string()
465        );
466
467        let options = TableOptions {
468            write_buffer_size: Some(ReadableSize::mb(128)),
469            ttl: Some(Duration::from_secs(1000).into()),
470            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
471            skip_wal: false,
472        };
473
474        assert_eq!(
475            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
476            options.to_string()
477        );
478
479        let options = TableOptions {
480            write_buffer_size: Some(ReadableSize::mb(128)),
481            ttl: Some(Duration::from_secs(1000).into()),
482            extra_options: HashMap::new(),
483            skip_wal: true,
484        };
485        assert_eq!(
486            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
487            options.to_string()
488        );
489    }
490}