Skip to main content

table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30    ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36    LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39    APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD,
40    MEMTABLE_BULK_ENCODE_ROW_THRESHOLD, MEMTABLE_BULK_MAX_MERGE_GROUPS,
41    MEMTABLE_BULK_MERGE_THRESHOLD, MEMTABLE_TYPE, MERGE_MODE_KEY, SST_FORMAT_KEY,
42    TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
43    is_mito_engine_option_key,
44};
45use store_api::region_request::{SetRegionOption, UnsetRegionOption};
46
47use crate::error::{ParseTableOptionSnafu, Result};
48use crate::metadata::{TableId, TableVersion};
49use crate::table_reference::TableReference;
50
51mod semantic;
52pub use semantic::*;
53
54pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
55pub const FILE_TABLE_LOCATION_KEY: &str = "location";
56pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
57pub const FILE_TABLE_FORMAT_KEY: &str = "format";
58
59pub const TABLE_DATA_MODEL: &str = "table_data_model";
60pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
61
62pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
63pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
64
65pub const VALID_TABLE_OPTION_KEYS: [&str; 14] = [
66    // common keys:
67    WRITE_BUFFER_SIZE_KEY,
68    TTL_KEY,
69    STORAGE_KEY,
70    COMMENT_KEY,
71    SKIP_WAL_KEY,
72    SST_FORMAT_KEY,
73    // file engine keys:
74    FILE_TABLE_LOCATION_KEY,
75    FILE_TABLE_FORMAT_KEY,
76    FILE_TABLE_PATTERN_KEY,
77    // metric engine keys:
78    PHYSICAL_TABLE_METADATA_KEY,
79    LOGICAL_TABLE_METADATA_KEY,
80    // table model info
81    TABLE_DATA_MODEL,
82    OTLP_METRIC_COMPAT_KEY,
83    REPARTITION_COLUMN_HINT_KEY,
84];
85
86pub const DDL_TIMEOUT: &str = "timeout";
87pub const DDL_WAIT: &str = "wait";
88
89pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
90
91// Valid option keys when creating a db.
92static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
93    let mut set = HashSet::new();
94    set.insert(TTL_KEY);
95    set.insert(STORAGE_KEY);
96    set.insert(MEMTABLE_TYPE);
97    set.insert(MEMTABLE_BULK_MERGE_THRESHOLD);
98    set.insert(MEMTABLE_BULK_ENCODE_ROW_THRESHOLD);
99    set.insert(MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD);
100    set.insert(MEMTABLE_BULK_MAX_MERGE_GROUPS);
101    set.insert(APPEND_MODE_KEY);
102    set.insert(MERGE_MODE_KEY);
103    set.insert(SKIP_WAL_KEY);
104    set.insert(COMPACTION_TYPE);
105    set.insert(TWCS_FALLBACK_TO_LOCAL);
106    set.insert(TWCS_TIME_WINDOW);
107    set.insert(TWCS_TRIGGER_FILE_NUM);
108    set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
109    set.insert(SST_FORMAT_KEY);
110    set
111});
112
113/// Returns true if the `key` is a valid key for database.
114pub fn validate_database_option(key: &str) -> bool {
115    VALID_DB_OPT_KEYS.contains(&key)
116}
117
118/// Returns true if the `key` is a valid key for any engine or storage.
119pub fn validate_table_option(key: &str) -> bool {
120    if is_supported_in_s3(key) {
121        return true;
122    }
123
124    if is_supported_in_oss(key) {
125        return true;
126    }
127
128    if is_mito_engine_option_key(key) {
129        return true;
130    }
131
132    if is_metric_engine_option_key(key) {
133        return true;
134    }
135
136    // Semantic-layer keys share a reserved prefix instead of a fixed allowlist so
137    // the vocabulary can grow without touching this gate. See `semantic` module.
138    if is_semantic_option_key(key) {
139        return true;
140    }
141
142    VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
143}
144
145#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
146#[serde(default)]
147pub struct TableOptions {
148    /// Memtable size of memtable.
149    pub write_buffer_size: Option<ReadableSize>,
150    /// Time-to-live of table. Expired data will be automatically purged.
151    pub ttl: Option<TimeToLive>,
152    /// Skip wal write for this table.
153    pub skip_wal: bool,
154    /// Extra options that may not applicable to all table engines.
155    pub extra_options: HashMap<String, String>,
156}
157
158pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
159pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
160pub const STORAGE_KEY: &str = "storage";
161pub const COMMENT_KEY: &str = "comment";
162pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
163pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
164pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
165pub const REPARTITION_COLUMN_HINT_KEY: &str = "repartition.column.hint";
166
167impl TableOptions {
168    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
169        iter: U,
170    ) -> Result<TableOptions> {
171        let mut options = TableOptions::default();
172
173        let kvs: HashMap<String, String> = iter
174            .into_iter()
175            .map(|(k, v)| (k.to_string(), v.to_string()))
176            .collect();
177
178        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
179            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
180                ParseTableOptionSnafu {
181                    key: WRITE_BUFFER_SIZE_KEY,
182                    value: write_buffer_size,
183                }
184                .build()
185            })?;
186            options.write_buffer_size = Some(size)
187        }
188
189        if let Some(ttl) = kvs.get(TTL_KEY) {
190            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
191                ParseTableOptionSnafu {
192                    key: TTL_KEY,
193                    value: ttl,
194                }
195                .build()
196            })?;
197            options.ttl = Some(ttl_value);
198        }
199
200        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
201            options.skip_wal = skip_wal.parse().map_err(|_| {
202                ParseTableOptionSnafu {
203                    key: SKIP_WAL_KEY,
204                    value: skip_wal,
205                }
206                .build()
207            })?;
208        }
209
210        options.extra_options = HashMap::from_iter(
211            kvs.into_iter()
212                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
213        );
214
215        Ok(options)
216    }
217}
218
219impl fmt::Display for TableOptions {
220    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221        let mut key_vals = vec![];
222        if let Some(size) = self.write_buffer_size {
223            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
224        }
225
226        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
227            key_vals.push(format!("{}={}", TTL_KEY, ttl));
228        }
229
230        if self.skip_wal {
231            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
232        }
233
234        for (k, v) in &self.extra_options {
235            key_vals.push(format!("{}={}", k, v));
236        }
237
238        write!(f, "{}", key_vals.join(" "))
239    }
240}
241
242impl From<&TableOptions> for HashMap<String, String> {
243    fn from(opts: &TableOptions) -> Self {
244        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
245        if let Some(write_buffer_size) = opts.write_buffer_size {
246            let _ = res.insert(
247                WRITE_BUFFER_SIZE_KEY.to_string(),
248                write_buffer_size.to_string(),
249            );
250        }
251        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
252            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
253        }
254        res.extend(
255            opts.extra_options
256                .iter()
257                .map(|(k, v)| (k.clone(), v.clone())),
258        );
259        res
260    }
261}
262
263/// Alter table request
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct AlterTableRequest {
266    pub catalog_name: String,
267    pub schema_name: String,
268    pub table_name: String,
269    pub table_id: TableId,
270    pub alter_kind: AlterKind,
271    // None in standalone.
272    pub table_version: Option<TableVersion>,
273}
274
275/// Add column request
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct AddColumnRequest {
278    pub column_schema: ColumnSchema,
279    pub is_key: bool,
280    pub location: Option<AddColumnLocation>,
281    /// Add column if not exists.
282    pub add_if_not_exists: bool,
283}
284
285/// Change column datatype request
286#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct ModifyColumnTypeRequest {
288    pub column_name: String,
289    pub target_type: ConcreteDataType,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub enum AlterKind {
294    AddColumns {
295        columns: Vec<AddColumnRequest>,
296    },
297    DropColumns {
298        names: Vec<String>,
299    },
300    ModifyColumnTypes {
301        columns: Vec<ModifyColumnTypeRequest>,
302    },
303    RenameTable {
304        new_table_name: String,
305    },
306    SetTableOptions {
307        options: Vec<SetRegionOption>,
308    },
309    UnsetTableOptions {
310        keys: Vec<UnsetRegionOption>,
311    },
312    SetRepartitionColumnHint {
313        column_name: String,
314    },
315    UnsetRepartitionColumnHint,
316    SetIndexes {
317        options: Vec<SetIndexOption>,
318    },
319    UnsetIndexes {
320        options: Vec<UnsetIndexOption>,
321    },
322    DropDefaults {
323        names: Vec<String>,
324    },
325    SetDefaults {
326        defaults: Vec<SetDefaultRequest>,
327    },
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SetDefaultRequest {
332    pub column_name: String,
333    pub default_constraint: Option<ColumnDefaultConstraint>,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
337pub enum SetIndexOption {
338    Fulltext {
339        column_name: String,
340        options: FulltextOptions,
341    },
342    Inverted {
343        column_name: String,
344    },
345    Skipping {
346        column_name: String,
347        options: SkippingIndexOptions,
348    },
349}
350
351impl SetIndexOption {
352    /// Returns the column name of the index option.
353    pub fn column_name(&self) -> &str {
354        match self {
355            SetIndexOption::Fulltext { column_name, .. } => column_name,
356            SetIndexOption::Inverted { column_name, .. } => column_name,
357            SetIndexOption::Skipping { column_name, .. } => column_name,
358        }
359    }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363pub enum UnsetIndexOption {
364    Fulltext { column_name: String },
365    Inverted { column_name: String },
366    Skipping { column_name: String },
367}
368
369impl UnsetIndexOption {
370    /// Returns the column name of the index option.
371    pub fn column_name(&self) -> &str {
372        match self {
373            UnsetIndexOption::Fulltext { column_name, .. } => column_name,
374            UnsetIndexOption::Inverted { column_name, .. } => column_name,
375            UnsetIndexOption::Skipping { column_name, .. } => column_name,
376        }
377    }
378}
379
380#[derive(Debug)]
381pub struct InsertRequest {
382    pub catalog_name: String,
383    pub schema_name: String,
384    pub table_name: String,
385    pub columns_values: HashMap<String, VectorRef>,
386}
387
388/// Delete (by primary key) request
389#[derive(Debug)]
390pub struct DeleteRequest {
391    pub catalog_name: String,
392    pub schema_name: String,
393    pub table_name: String,
394    /// Values of each column in this table's primary key and time index.
395    ///
396    /// The key is the column name, and the value is the column value.
397    pub key_column_values: HashMap<String, VectorRef>,
398}
399
400#[derive(Debug)]
401pub enum CopyDirection {
402    Export,
403    Import,
404}
405
406/// Copy table request
407#[derive(Debug)]
408pub struct CopyTableRequest {
409    pub catalog_name: String,
410    pub schema_name: String,
411    pub table_name: String,
412    pub location: String,
413    pub with: HashMap<String, String>,
414    pub connection: HashMap<String, String>,
415    pub pattern: Option<String>,
416    pub direction: CopyDirection,
417    pub timestamp_range: Option<TimestampRange>,
418    pub limit: Option<u64>,
419}
420
421#[derive(Debug, Clone, Default)]
422pub struct FlushTableRequest {
423    pub catalog_name: String,
424    pub schema_name: String,
425    pub table_name: String,
426}
427
428#[derive(Debug, Clone, Default)]
429pub struct BuildIndexTableRequest {
430    pub catalog_name: String,
431    pub schema_name: String,
432    pub table_name: String,
433}
434
435#[derive(Debug, Clone, PartialEq)]
436pub struct CompactTableRequest {
437    pub catalog_name: String,
438    pub schema_name: String,
439    pub table_name: String,
440    pub compact_options: compact_request::Options,
441    pub parallelism: u32,
442}
443
444impl Default for CompactTableRequest {
445    fn default() -> Self {
446        Self {
447            catalog_name: Default::default(),
448            schema_name: Default::default(),
449            table_name: Default::default(),
450            compact_options: compact_request::Options::Regular(Default::default()),
451            parallelism: 1,
452        }
453    }
454}
455
456/// Truncate table request
457#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct TruncateTableRequest {
459    pub catalog_name: String,
460    pub schema_name: String,
461    pub table_name: String,
462    pub table_id: TableId,
463}
464
465impl TruncateTableRequest {
466    pub fn table_ref(&self) -> TableReference<'_> {
467        TableReference {
468            catalog: &self.catalog_name,
469            schema: &self.schema_name,
470            table: &self.table_name,
471        }
472    }
473}
474
475#[derive(Debug, Clone, Default, Deserialize, Serialize)]
476pub struct CopyDatabaseRequest {
477    pub catalog_name: String,
478    pub schema_name: String,
479    pub location: String,
480    pub with: HashMap<String, String>,
481    pub connection: HashMap<String, String>,
482    pub time_range: Option<TimestampRange>,
483}
484
485#[derive(Debug, Clone, Default, Deserialize, Serialize)]
486pub struct CopyQueryToRequest {
487    pub location: String,
488    pub with: HashMap<String, String>,
489    pub connection: HashMap<String, String>,
490}
491
492#[cfg(test)]
493mod tests {
494    use std::time::Duration;
495
496    use super::*;
497
498    #[test]
499    fn test_validate_table_option() {
500        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
501        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
502        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
503        assert!(validate_table_option(TTL_KEY));
504        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
505        assert!(validate_table_option(STORAGE_KEY));
506        assert!(validate_table_option(MEMTABLE_BULK_MERGE_THRESHOLD));
507        assert!(validate_table_option(REPARTITION_COLUMN_HINT_KEY));
508        assert!(!validate_table_option("foo"));
509
510        // Only whitelisted semantic keys are accepted.
511        assert!(validate_table_option(SEMANTIC_SIGNAL_TYPE));
512        assert!(validate_table_option(SEMANTIC_METRIC_TYPE));
513        // Unknown semantic key, near-miss, and the internal transport key are rejected.
514        assert!(!validate_table_option("greptime.semantic.future.key"));
515        assert!(!validate_table_option("greptime.semanticx"));
516        assert!(!validate_table_option(SEMANTIC_PER_TABLE_INDEX_KEY));
517    }
518
519    #[test]
520    fn test_validate_database_option() {
521        assert!(validate_database_option(MEMTABLE_TYPE));
522        assert!(validate_database_option(MEMTABLE_BULK_MERGE_THRESHOLD));
523        assert!(validate_database_option(MEMTABLE_BULK_ENCODE_ROW_THRESHOLD));
524        assert!(validate_database_option(
525            MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD
526        ));
527        assert!(validate_database_option(MEMTABLE_BULK_MAX_MERGE_GROUPS));
528        assert!(!validate_database_option("foo"));
529    }
530
531    #[test]
532    fn test_serialize_table_options() {
533        let options = TableOptions {
534            write_buffer_size: None,
535            ttl: Some(Duration::from_secs(1000).into()),
536            extra_options: HashMap::new(),
537            skip_wal: false,
538        };
539        let serialized = serde_json::to_string(&options).unwrap();
540        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
541        assert_eq!(options, deserialized);
542    }
543
544    #[test]
545    fn test_convert_hashmap_between_table_options() {
546        let options = TableOptions {
547            write_buffer_size: Some(ReadableSize::mb(128)),
548            ttl: Some(Duration::from_secs(1000).into()),
549            extra_options: HashMap::new(),
550            skip_wal: false,
551        };
552        let serialized_map = HashMap::from(&options);
553        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
554        assert_eq!(options, serialized);
555
556        let options = TableOptions {
557            write_buffer_size: None,
558            ttl: Default::default(),
559            extra_options: HashMap::new(),
560            skip_wal: false,
561        };
562        let serialized_map = HashMap::from(&options);
563        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
564        assert_eq!(options, serialized);
565
566        let options = TableOptions {
567            write_buffer_size: Some(ReadableSize::mb(128)),
568            ttl: Some(Duration::from_secs(1000).into()),
569            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
570            skip_wal: false,
571        };
572        let serialized_map = HashMap::from(&options);
573        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
574        assert_eq!(options, serialized);
575    }
576
577    #[test]
578    fn test_table_options_to_string() {
579        let options = TableOptions {
580            write_buffer_size: Some(ReadableSize::mb(128)),
581            ttl: Some(Duration::from_secs(1000).into()),
582            extra_options: HashMap::new(),
583            skip_wal: false,
584        };
585
586        assert_eq!(
587            "write_buffer_size=128.0MiB ttl=16m 40s",
588            options.to_string()
589        );
590
591        let options = TableOptions {
592            write_buffer_size: Some(ReadableSize::mb(128)),
593            ttl: Some(Duration::from_secs(1000).into()),
594            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
595            skip_wal: false,
596        };
597
598        assert_eq!(
599            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
600            options.to_string()
601        );
602
603        let options = TableOptions {
604            write_buffer_size: Some(ReadableSize::mb(128)),
605            ttl: Some(Duration::from_secs(1000).into()),
606            extra_options: HashMap::new(),
607            skip_wal: true,
608        };
609        assert_eq!(
610            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
611            options.to_string()
612        );
613    }
614}