table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30    ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36    LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39    APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY, TWCS_FALLBACK_TO_LOCAL,
40    TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, is_mito_engine_option_key,
41};
42use store_api::region_request::{SetRegionOption, UnsetRegionOption};
43
44use crate::error::{ParseTableOptionSnafu, Result};
45use crate::metadata::{TableId, TableVersion};
46use crate::table_reference::TableReference;
47
48pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
49pub const FILE_TABLE_LOCATION_KEY: &str = "location";
50pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
51pub const FILE_TABLE_FORMAT_KEY: &str = "format";
52
53pub const TABLE_DATA_MODEL: &str = "table_data_model";
54pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
55
56pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
57pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
58
59pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
60    // common keys:
61    WRITE_BUFFER_SIZE_KEY,
62    TTL_KEY,
63    STORAGE_KEY,
64    COMMENT_KEY,
65    SKIP_WAL_KEY,
66    // file engine keys:
67    FILE_TABLE_LOCATION_KEY,
68    FILE_TABLE_FORMAT_KEY,
69    FILE_TABLE_PATTERN_KEY,
70    // metric engine keys:
71    PHYSICAL_TABLE_METADATA_KEY,
72    LOGICAL_TABLE_METADATA_KEY,
73    // table model info
74    TABLE_DATA_MODEL,
75    OTLP_METRIC_COMPAT_KEY,
76];
77
78pub const DDL_TIMEOUT: &str = "timeout";
79pub const DDL_WAIT: &str = "wait";
80
81pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
82
83// Valid option keys when creating a db.
84static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
85    let mut set = HashSet::new();
86    set.insert(TTL_KEY);
87    set.insert(STORAGE_KEY);
88    set.insert(MEMTABLE_TYPE);
89    set.insert(APPEND_MODE_KEY);
90    set.insert(MERGE_MODE_KEY);
91    set.insert(SKIP_WAL_KEY);
92    set.insert(COMPACTION_TYPE);
93    set.insert(TWCS_FALLBACK_TO_LOCAL);
94    set.insert(TWCS_TIME_WINDOW);
95    set.insert(TWCS_TRIGGER_FILE_NUM);
96    set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
97    set
98});
99
100/// Returns true if the `key` is a valid key for database.
101pub fn validate_database_option(key: &str) -> bool {
102    VALID_DB_OPT_KEYS.contains(&key)
103}
104
105/// Returns true if the `key` is a valid key for any engine or storage.
106pub fn validate_table_option(key: &str) -> bool {
107    if is_supported_in_s3(key) {
108        return true;
109    }
110
111    if is_supported_in_oss(key) {
112        return true;
113    }
114
115    if is_mito_engine_option_key(key) {
116        return true;
117    }
118
119    if is_metric_engine_option_key(key) {
120        return true;
121    }
122
123    VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
124}
125
126#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
127#[serde(default)]
128pub struct TableOptions {
129    /// Memtable size of memtable.
130    pub write_buffer_size: Option<ReadableSize>,
131    /// Time-to-live of table. Expired data will be automatically purged.
132    pub ttl: Option<TimeToLive>,
133    /// Skip wal write for this table.
134    pub skip_wal: bool,
135    /// Extra options that may not applicable to all table engines.
136    pub extra_options: HashMap<String, String>,
137}
138
139pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
140pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
141pub const STORAGE_KEY: &str = "storage";
142pub const COMMENT_KEY: &str = "comment";
143pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
144pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
145
146impl TableOptions {
147    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
148        iter: U,
149    ) -> Result<TableOptions> {
150        let mut options = TableOptions::default();
151
152        let kvs: HashMap<String, String> = iter
153            .into_iter()
154            .map(|(k, v)| (k.to_string(), v.to_string()))
155            .collect();
156
157        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
158            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
159                ParseTableOptionSnafu {
160                    key: WRITE_BUFFER_SIZE_KEY,
161                    value: write_buffer_size,
162                }
163                .build()
164            })?;
165            options.write_buffer_size = Some(size)
166        }
167
168        if let Some(ttl) = kvs.get(TTL_KEY) {
169            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
170                ParseTableOptionSnafu {
171                    key: TTL_KEY,
172                    value: ttl,
173                }
174                .build()
175            })?;
176            options.ttl = Some(ttl_value);
177        }
178
179        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
180            options.skip_wal = skip_wal.parse().map_err(|_| {
181                ParseTableOptionSnafu {
182                    key: SKIP_WAL_KEY,
183                    value: skip_wal,
184                }
185                .build()
186            })?;
187        }
188
189        options.extra_options = HashMap::from_iter(
190            kvs.into_iter()
191                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
192        );
193
194        Ok(options)
195    }
196}
197
198impl fmt::Display for TableOptions {
199    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
200        let mut key_vals = vec![];
201        if let Some(size) = self.write_buffer_size {
202            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
203        }
204
205        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
206            key_vals.push(format!("{}={}", TTL_KEY, ttl));
207        }
208
209        if self.skip_wal {
210            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
211        }
212
213        for (k, v) in &self.extra_options {
214            key_vals.push(format!("{}={}", k, v));
215        }
216
217        write!(f, "{}", key_vals.join(" "))
218    }
219}
220
221impl From<&TableOptions> for HashMap<String, String> {
222    fn from(opts: &TableOptions) -> Self {
223        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
224        if let Some(write_buffer_size) = opts.write_buffer_size {
225            let _ = res.insert(
226                WRITE_BUFFER_SIZE_KEY.to_string(),
227                write_buffer_size.to_string(),
228            );
229        }
230        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
231            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
232        }
233        res.extend(
234            opts.extra_options
235                .iter()
236                .map(|(k, v)| (k.clone(), v.clone())),
237        );
238        res
239    }
240}
241
242/// Alter table request
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct AlterTableRequest {
245    pub catalog_name: String,
246    pub schema_name: String,
247    pub table_name: String,
248    pub table_id: TableId,
249    pub alter_kind: AlterKind,
250    // None in standalone.
251    pub table_version: Option<TableVersion>,
252}
253
254/// Add column request
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct AddColumnRequest {
257    pub column_schema: ColumnSchema,
258    pub is_key: bool,
259    pub location: Option<AddColumnLocation>,
260    /// Add column if not exists.
261    pub add_if_not_exists: bool,
262}
263
264/// Change column datatype request
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ModifyColumnTypeRequest {
267    pub column_name: String,
268    pub target_type: ConcreteDataType,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub enum AlterKind {
273    AddColumns {
274        columns: Vec<AddColumnRequest>,
275    },
276    DropColumns {
277        names: Vec<String>,
278    },
279    ModifyColumnTypes {
280        columns: Vec<ModifyColumnTypeRequest>,
281    },
282    RenameTable {
283        new_table_name: String,
284    },
285    SetTableOptions {
286        options: Vec<SetRegionOption>,
287    },
288    UnsetTableOptions {
289        keys: Vec<UnsetRegionOption>,
290    },
291    SetIndexes {
292        options: Vec<SetIndexOption>,
293    },
294    UnsetIndexes {
295        options: Vec<UnsetIndexOption>,
296    },
297    DropDefaults {
298        names: Vec<String>,
299    },
300    SetDefaults {
301        defaults: Vec<SetDefaultRequest>,
302    },
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct SetDefaultRequest {
307    pub column_name: String,
308    pub default_constraint: Option<ColumnDefaultConstraint>,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub enum SetIndexOption {
313    Fulltext {
314        column_name: String,
315        options: FulltextOptions,
316    },
317    Inverted {
318        column_name: String,
319    },
320    Skipping {
321        column_name: String,
322        options: SkippingIndexOptions,
323    },
324}
325
326impl SetIndexOption {
327    /// Returns the column name of the index option.
328    pub fn column_name(&self) -> &str {
329        match self {
330            SetIndexOption::Fulltext { column_name, .. } => column_name,
331            SetIndexOption::Inverted { column_name, .. } => column_name,
332            SetIndexOption::Skipping { column_name, .. } => column_name,
333        }
334    }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub enum UnsetIndexOption {
339    Fulltext { column_name: String },
340    Inverted { column_name: String },
341    Skipping { column_name: String },
342}
343
344impl UnsetIndexOption {
345    /// Returns the column name of the index option.
346    pub fn column_name(&self) -> &str {
347        match self {
348            UnsetIndexOption::Fulltext { column_name, .. } => column_name,
349            UnsetIndexOption::Inverted { column_name, .. } => column_name,
350            UnsetIndexOption::Skipping { column_name, .. } => column_name,
351        }
352    }
353}
354
355#[derive(Debug)]
356pub struct InsertRequest {
357    pub catalog_name: String,
358    pub schema_name: String,
359    pub table_name: String,
360    pub columns_values: HashMap<String, VectorRef>,
361}
362
363/// Delete (by primary key) request
364#[derive(Debug)]
365pub struct DeleteRequest {
366    pub catalog_name: String,
367    pub schema_name: String,
368    pub table_name: String,
369    /// Values of each column in this table's primary key and time index.
370    ///
371    /// The key is the column name, and the value is the column value.
372    pub key_column_values: HashMap<String, VectorRef>,
373}
374
375#[derive(Debug)]
376pub enum CopyDirection {
377    Export,
378    Import,
379}
380
381/// Copy table request
382#[derive(Debug)]
383pub struct CopyTableRequest {
384    pub catalog_name: String,
385    pub schema_name: String,
386    pub table_name: String,
387    pub location: String,
388    pub with: HashMap<String, String>,
389    pub connection: HashMap<String, String>,
390    pub pattern: Option<String>,
391    pub direction: CopyDirection,
392    pub timestamp_range: Option<TimestampRange>,
393    pub limit: Option<u64>,
394}
395
396#[derive(Debug, Clone, Default)]
397pub struct FlushTableRequest {
398    pub catalog_name: String,
399    pub schema_name: String,
400    pub table_name: String,
401}
402
403#[derive(Debug, Clone, Default)]
404pub struct BuildIndexTableRequest {
405    pub catalog_name: String,
406    pub schema_name: String,
407    pub table_name: String,
408}
409
410#[derive(Debug, Clone, PartialEq)]
411pub struct CompactTableRequest {
412    pub catalog_name: String,
413    pub schema_name: String,
414    pub table_name: String,
415    pub compact_options: compact_request::Options,
416    pub parallelism: u32,
417}
418
419impl Default for CompactTableRequest {
420    fn default() -> Self {
421        Self {
422            catalog_name: Default::default(),
423            schema_name: Default::default(),
424            table_name: Default::default(),
425            compact_options: compact_request::Options::Regular(Default::default()),
426            parallelism: 1,
427        }
428    }
429}
430
431/// Truncate table request
432#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct TruncateTableRequest {
434    pub catalog_name: String,
435    pub schema_name: String,
436    pub table_name: String,
437    pub table_id: TableId,
438}
439
440impl TruncateTableRequest {
441    pub fn table_ref(&self) -> TableReference<'_> {
442        TableReference {
443            catalog: &self.catalog_name,
444            schema: &self.schema_name,
445            table: &self.table_name,
446        }
447    }
448}
449
450#[derive(Debug, Clone, Default, Deserialize, Serialize)]
451pub struct CopyDatabaseRequest {
452    pub catalog_name: String,
453    pub schema_name: String,
454    pub location: String,
455    pub with: HashMap<String, String>,
456    pub connection: HashMap<String, String>,
457    pub time_range: Option<TimestampRange>,
458}
459
460#[derive(Debug, Clone, Default, Deserialize, Serialize)]
461pub struct CopyQueryToRequest {
462    pub location: String,
463    pub with: HashMap<String, String>,
464    pub connection: HashMap<String, String>,
465}
466
467#[cfg(test)]
468mod tests {
469    use std::time::Duration;
470
471    use super::*;
472
473    #[test]
474    fn test_validate_table_option() {
475        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
476        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
477        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
478        assert!(validate_table_option(TTL_KEY));
479        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
480        assert!(validate_table_option(STORAGE_KEY));
481        assert!(!validate_table_option("foo"));
482    }
483
484    #[test]
485    fn test_serialize_table_options() {
486        let options = TableOptions {
487            write_buffer_size: None,
488            ttl: Some(Duration::from_secs(1000).into()),
489            extra_options: HashMap::new(),
490            skip_wal: false,
491        };
492        let serialized = serde_json::to_string(&options).unwrap();
493        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
494        assert_eq!(options, deserialized);
495    }
496
497    #[test]
498    fn test_convert_hashmap_between_table_options() {
499        let options = TableOptions {
500            write_buffer_size: Some(ReadableSize::mb(128)),
501            ttl: Some(Duration::from_secs(1000).into()),
502            extra_options: HashMap::new(),
503            skip_wal: false,
504        };
505        let serialized_map = HashMap::from(&options);
506        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
507        assert_eq!(options, serialized);
508
509        let options = TableOptions {
510            write_buffer_size: None,
511            ttl: Default::default(),
512            extra_options: HashMap::new(),
513            skip_wal: false,
514        };
515        let serialized_map = HashMap::from(&options);
516        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
517        assert_eq!(options, serialized);
518
519        let options = TableOptions {
520            write_buffer_size: Some(ReadableSize::mb(128)),
521            ttl: Some(Duration::from_secs(1000).into()),
522            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
523            skip_wal: false,
524        };
525        let serialized_map = HashMap::from(&options);
526        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
527        assert_eq!(options, serialized);
528    }
529
530    #[test]
531    fn test_table_options_to_string() {
532        let options = TableOptions {
533            write_buffer_size: Some(ReadableSize::mb(128)),
534            ttl: Some(Duration::from_secs(1000).into()),
535            extra_options: HashMap::new(),
536            skip_wal: false,
537        };
538
539        assert_eq!(
540            "write_buffer_size=128.0MiB ttl=16m 40s",
541            options.to_string()
542        );
543
544        let options = TableOptions {
545            write_buffer_size: Some(ReadableSize::mb(128)),
546            ttl: Some(Duration::from_secs(1000).into()),
547            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
548            skip_wal: false,
549        };
550
551        assert_eq!(
552            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
553            options.to_string()
554        );
555
556        let options = TableOptions {
557            write_buffer_size: Some(ReadableSize::mb(128)),
558            ttl: Some(Duration::from_secs(1000).into()),
559            extra_options: HashMap::new(),
560            skip_wal: true,
561        };
562        assert_eq!(
563            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
564            options.to_string()
565        );
566    }
567}