Skip to main content

table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30    ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36    LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39    APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY, SST_FORMAT_KEY,
40    TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
41    is_mito_engine_option_key,
42};
43use store_api::region_request::{SetRegionOption, UnsetRegionOption};
44
45use crate::error::{ParseTableOptionSnafu, Result};
46use crate::metadata::{TableId, TableVersion};
47use crate::table_reference::TableReference;
48
49pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
50pub const FILE_TABLE_LOCATION_KEY: &str = "location";
51pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
52pub const FILE_TABLE_FORMAT_KEY: &str = "format";
53
54pub const TABLE_DATA_MODEL: &str = "table_data_model";
55pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
56
57pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
58pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
59
60pub const VALID_TABLE_OPTION_KEYS: [&str; 13] = [
61    // common keys:
62    WRITE_BUFFER_SIZE_KEY,
63    TTL_KEY,
64    STORAGE_KEY,
65    COMMENT_KEY,
66    SKIP_WAL_KEY,
67    SST_FORMAT_KEY,
68    // file engine keys:
69    FILE_TABLE_LOCATION_KEY,
70    FILE_TABLE_FORMAT_KEY,
71    FILE_TABLE_PATTERN_KEY,
72    // metric engine keys:
73    PHYSICAL_TABLE_METADATA_KEY,
74    LOGICAL_TABLE_METADATA_KEY,
75    // table model info
76    TABLE_DATA_MODEL,
77    OTLP_METRIC_COMPAT_KEY,
78];
79
80pub const DDL_TIMEOUT: &str = "timeout";
81pub const DDL_WAIT: &str = "wait";
82
83pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
84
85// Valid option keys when creating a db.
86static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
87    let mut set = HashSet::new();
88    set.insert(TTL_KEY);
89    set.insert(STORAGE_KEY);
90    set.insert(MEMTABLE_TYPE);
91    set.insert(APPEND_MODE_KEY);
92    set.insert(MERGE_MODE_KEY);
93    set.insert(SKIP_WAL_KEY);
94    set.insert(COMPACTION_TYPE);
95    set.insert(TWCS_FALLBACK_TO_LOCAL);
96    set.insert(TWCS_TIME_WINDOW);
97    set.insert(TWCS_TRIGGER_FILE_NUM);
98    set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
99    set.insert(SST_FORMAT_KEY);
100    set
101});
102
103/// Returns true if the `key` is a valid key for database.
104pub fn validate_database_option(key: &str) -> bool {
105    VALID_DB_OPT_KEYS.contains(&key)
106}
107
108/// Returns true if the `key` is a valid key for any engine or storage.
109pub fn validate_table_option(key: &str) -> bool {
110    if is_supported_in_s3(key) {
111        return true;
112    }
113
114    if is_supported_in_oss(key) {
115        return true;
116    }
117
118    if is_mito_engine_option_key(key) {
119        return true;
120    }
121
122    if is_metric_engine_option_key(key) {
123        return true;
124    }
125
126    VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
127}
128
129#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
130#[serde(default)]
131pub struct TableOptions {
132    /// Memtable size of memtable.
133    pub write_buffer_size: Option<ReadableSize>,
134    /// Time-to-live of table. Expired data will be automatically purged.
135    pub ttl: Option<TimeToLive>,
136    /// Skip wal write for this table.
137    pub skip_wal: bool,
138    /// Extra options that may not applicable to all table engines.
139    pub extra_options: HashMap<String, String>,
140}
141
142pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
143pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
144pub const STORAGE_KEY: &str = "storage";
145pub const COMMENT_KEY: &str = "comment";
146pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
147pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
148pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
149
150impl TableOptions {
151    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
152        iter: U,
153    ) -> Result<TableOptions> {
154        let mut options = TableOptions::default();
155
156        let kvs: HashMap<String, String> = iter
157            .into_iter()
158            .map(|(k, v)| (k.to_string(), v.to_string()))
159            .collect();
160
161        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
162            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
163                ParseTableOptionSnafu {
164                    key: WRITE_BUFFER_SIZE_KEY,
165                    value: write_buffer_size,
166                }
167                .build()
168            })?;
169            options.write_buffer_size = Some(size)
170        }
171
172        if let Some(ttl) = kvs.get(TTL_KEY) {
173            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
174                ParseTableOptionSnafu {
175                    key: TTL_KEY,
176                    value: ttl,
177                }
178                .build()
179            })?;
180            options.ttl = Some(ttl_value);
181        }
182
183        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
184            options.skip_wal = skip_wal.parse().map_err(|_| {
185                ParseTableOptionSnafu {
186                    key: SKIP_WAL_KEY,
187                    value: skip_wal,
188                }
189                .build()
190            })?;
191        }
192
193        options.extra_options = HashMap::from_iter(
194            kvs.into_iter()
195                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
196        );
197
198        Ok(options)
199    }
200}
201
202impl fmt::Display for TableOptions {
203    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
204        let mut key_vals = vec![];
205        if let Some(size) = self.write_buffer_size {
206            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
207        }
208
209        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
210            key_vals.push(format!("{}={}", TTL_KEY, ttl));
211        }
212
213        if self.skip_wal {
214            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
215        }
216
217        for (k, v) in &self.extra_options {
218            key_vals.push(format!("{}={}", k, v));
219        }
220
221        write!(f, "{}", key_vals.join(" "))
222    }
223}
224
225impl From<&TableOptions> for HashMap<String, String> {
226    fn from(opts: &TableOptions) -> Self {
227        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
228        if let Some(write_buffer_size) = opts.write_buffer_size {
229            let _ = res.insert(
230                WRITE_BUFFER_SIZE_KEY.to_string(),
231                write_buffer_size.to_string(),
232            );
233        }
234        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
235            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
236        }
237        res.extend(
238            opts.extra_options
239                .iter()
240                .map(|(k, v)| (k.clone(), v.clone())),
241        );
242        res
243    }
244}
245
246/// Alter table request
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct AlterTableRequest {
249    pub catalog_name: String,
250    pub schema_name: String,
251    pub table_name: String,
252    pub table_id: TableId,
253    pub alter_kind: AlterKind,
254    // None in standalone.
255    pub table_version: Option<TableVersion>,
256}
257
258/// Add column request
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct AddColumnRequest {
261    pub column_schema: ColumnSchema,
262    pub is_key: bool,
263    pub location: Option<AddColumnLocation>,
264    /// Add column if not exists.
265    pub add_if_not_exists: bool,
266}
267
268/// Change column datatype request
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct ModifyColumnTypeRequest {
271    pub column_name: String,
272    pub target_type: ConcreteDataType,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub enum AlterKind {
277    AddColumns {
278        columns: Vec<AddColumnRequest>,
279    },
280    DropColumns {
281        names: Vec<String>,
282    },
283    ModifyColumnTypes {
284        columns: Vec<ModifyColumnTypeRequest>,
285    },
286    RenameTable {
287        new_table_name: String,
288    },
289    SetTableOptions {
290        options: Vec<SetRegionOption>,
291    },
292    UnsetTableOptions {
293        keys: Vec<UnsetRegionOption>,
294    },
295    SetIndexes {
296        options: Vec<SetIndexOption>,
297    },
298    UnsetIndexes {
299        options: Vec<UnsetIndexOption>,
300    },
301    DropDefaults {
302        names: Vec<String>,
303    },
304    SetDefaults {
305        defaults: Vec<SetDefaultRequest>,
306    },
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SetDefaultRequest {
311    pub column_name: String,
312    pub default_constraint: Option<ColumnDefaultConstraint>,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub enum SetIndexOption {
317    Fulltext {
318        column_name: String,
319        options: FulltextOptions,
320    },
321    Inverted {
322        column_name: String,
323    },
324    Skipping {
325        column_name: String,
326        options: SkippingIndexOptions,
327    },
328}
329
330impl SetIndexOption {
331    /// Returns the column name of the index option.
332    pub fn column_name(&self) -> &str {
333        match self {
334            SetIndexOption::Fulltext { column_name, .. } => column_name,
335            SetIndexOption::Inverted { column_name, .. } => column_name,
336            SetIndexOption::Skipping { column_name, .. } => column_name,
337        }
338    }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub enum UnsetIndexOption {
343    Fulltext { column_name: String },
344    Inverted { column_name: String },
345    Skipping { column_name: String },
346}
347
348impl UnsetIndexOption {
349    /// Returns the column name of the index option.
350    pub fn column_name(&self) -> &str {
351        match self {
352            UnsetIndexOption::Fulltext { column_name, .. } => column_name,
353            UnsetIndexOption::Inverted { column_name, .. } => column_name,
354            UnsetIndexOption::Skipping { column_name, .. } => column_name,
355        }
356    }
357}
358
359#[derive(Debug)]
360pub struct InsertRequest {
361    pub catalog_name: String,
362    pub schema_name: String,
363    pub table_name: String,
364    pub columns_values: HashMap<String, VectorRef>,
365}
366
367/// Delete (by primary key) request
368#[derive(Debug)]
369pub struct DeleteRequest {
370    pub catalog_name: String,
371    pub schema_name: String,
372    pub table_name: String,
373    /// Values of each column in this table's primary key and time index.
374    ///
375    /// The key is the column name, and the value is the column value.
376    pub key_column_values: HashMap<String, VectorRef>,
377}
378
379#[derive(Debug)]
380pub enum CopyDirection {
381    Export,
382    Import,
383}
384
385/// Copy table request
386#[derive(Debug)]
387pub struct CopyTableRequest {
388    pub catalog_name: String,
389    pub schema_name: String,
390    pub table_name: String,
391    pub location: String,
392    pub with: HashMap<String, String>,
393    pub connection: HashMap<String, String>,
394    pub pattern: Option<String>,
395    pub direction: CopyDirection,
396    pub timestamp_range: Option<TimestampRange>,
397    pub limit: Option<u64>,
398}
399
400#[derive(Debug, Clone, Default)]
401pub struct FlushTableRequest {
402    pub catalog_name: String,
403    pub schema_name: String,
404    pub table_name: String,
405}
406
407#[derive(Debug, Clone, Default)]
408pub struct BuildIndexTableRequest {
409    pub catalog_name: String,
410    pub schema_name: String,
411    pub table_name: String,
412}
413
414#[derive(Debug, Clone, PartialEq)]
415pub struct CompactTableRequest {
416    pub catalog_name: String,
417    pub schema_name: String,
418    pub table_name: String,
419    pub compact_options: compact_request::Options,
420    pub parallelism: u32,
421}
422
423impl Default for CompactTableRequest {
424    fn default() -> Self {
425        Self {
426            catalog_name: Default::default(),
427            schema_name: Default::default(),
428            table_name: Default::default(),
429            compact_options: compact_request::Options::Regular(Default::default()),
430            parallelism: 1,
431        }
432    }
433}
434
435/// Truncate table request
436#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct TruncateTableRequest {
438    pub catalog_name: String,
439    pub schema_name: String,
440    pub table_name: String,
441    pub table_id: TableId,
442}
443
444impl TruncateTableRequest {
445    pub fn table_ref(&self) -> TableReference<'_> {
446        TableReference {
447            catalog: &self.catalog_name,
448            schema: &self.schema_name,
449            table: &self.table_name,
450        }
451    }
452}
453
454#[derive(Debug, Clone, Default, Deserialize, Serialize)]
455pub struct CopyDatabaseRequest {
456    pub catalog_name: String,
457    pub schema_name: String,
458    pub location: String,
459    pub with: HashMap<String, String>,
460    pub connection: HashMap<String, String>,
461    pub time_range: Option<TimestampRange>,
462}
463
464#[derive(Debug, Clone, Default, Deserialize, Serialize)]
465pub struct CopyQueryToRequest {
466    pub location: String,
467    pub with: HashMap<String, String>,
468    pub connection: HashMap<String, String>,
469}
470
471#[cfg(test)]
472mod tests {
473    use std::time::Duration;
474
475    use super::*;
476
477    #[test]
478    fn test_validate_table_option() {
479        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
480        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
481        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
482        assert!(validate_table_option(TTL_KEY));
483        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
484        assert!(validate_table_option(STORAGE_KEY));
485        assert!(!validate_table_option("foo"));
486    }
487
488    #[test]
489    fn test_serialize_table_options() {
490        let options = TableOptions {
491            write_buffer_size: None,
492            ttl: Some(Duration::from_secs(1000).into()),
493            extra_options: HashMap::new(),
494            skip_wal: false,
495        };
496        let serialized = serde_json::to_string(&options).unwrap();
497        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
498        assert_eq!(options, deserialized);
499    }
500
501    #[test]
502    fn test_convert_hashmap_between_table_options() {
503        let options = TableOptions {
504            write_buffer_size: Some(ReadableSize::mb(128)),
505            ttl: Some(Duration::from_secs(1000).into()),
506            extra_options: HashMap::new(),
507            skip_wal: false,
508        };
509        let serialized_map = HashMap::from(&options);
510        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
511        assert_eq!(options, serialized);
512
513        let options = TableOptions {
514            write_buffer_size: None,
515            ttl: Default::default(),
516            extra_options: HashMap::new(),
517            skip_wal: false,
518        };
519        let serialized_map = HashMap::from(&options);
520        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
521        assert_eq!(options, serialized);
522
523        let options = TableOptions {
524            write_buffer_size: Some(ReadableSize::mb(128)),
525            ttl: Some(Duration::from_secs(1000).into()),
526            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
527            skip_wal: false,
528        };
529        let serialized_map = HashMap::from(&options);
530        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
531        assert_eq!(options, serialized);
532    }
533
534    #[test]
535    fn test_table_options_to_string() {
536        let options = TableOptions {
537            write_buffer_size: Some(ReadableSize::mb(128)),
538            ttl: Some(Duration::from_secs(1000).into()),
539            extra_options: HashMap::new(),
540            skip_wal: false,
541        };
542
543        assert_eq!(
544            "write_buffer_size=128.0MiB ttl=16m 40s",
545            options.to_string()
546        );
547
548        let options = TableOptions {
549            write_buffer_size: Some(ReadableSize::mb(128)),
550            ttl: Some(Duration::from_secs(1000).into()),
551            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
552            skip_wal: false,
553        };
554
555        assert_eq!(
556            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
557            options.to_string()
558        );
559
560        let options = TableOptions {
561            write_buffer_size: Some(ReadableSize::mb(128)),
562            ttl: Some(Duration::from_secs(1000).into()),
563            extra_options: HashMap::new(),
564            skip_wal: true,
565        };
566        assert_eq!(
567            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
568            options.to_string()
569        );
570    }
571}