table/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Table and TableEngine requests
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::range::TimestampRange;
26use common_time::TimeToLive;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30    ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36    is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
37};
38use store_api::mito_engine_options::{
39    is_mito_engine_option_key, APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY,
40    TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
41};
42use store_api::region_request::{SetRegionOption, UnsetRegionOption};
43
44use crate::error::{ParseTableOptionSnafu, Result};
45use crate::metadata::{TableId, TableVersion};
46use crate::table_reference::TableReference;
47
48pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
49pub const FILE_TABLE_LOCATION_KEY: &str = "location";
50pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
51pub const FILE_TABLE_FORMAT_KEY: &str = "format";
52
53pub const TABLE_DATA_MODEL: &str = "table_data_model";
54pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
55
56pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
57pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
58
59pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
60    // common keys:
61    WRITE_BUFFER_SIZE_KEY,
62    TTL_KEY,
63    STORAGE_KEY,
64    COMMENT_KEY,
65    SKIP_WAL_KEY,
66    // file engine keys:
67    FILE_TABLE_LOCATION_KEY,
68    FILE_TABLE_FORMAT_KEY,
69    FILE_TABLE_PATTERN_KEY,
70    // metric engine keys:
71    PHYSICAL_TABLE_METADATA_KEY,
72    LOGICAL_TABLE_METADATA_KEY,
73    // table model info
74    TABLE_DATA_MODEL,
75    OTLP_METRIC_COMPAT_KEY,
76];
77
78// Valid option keys when creating a db.
79static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
80    let mut set = HashSet::new();
81    set.insert(TTL_KEY);
82    set.insert(STORAGE_KEY);
83    set.insert(MEMTABLE_TYPE);
84    set.insert(APPEND_MODE_KEY);
85    set.insert(MERGE_MODE_KEY);
86    set.insert(SKIP_WAL_KEY);
87    set.insert(COMPACTION_TYPE);
88    set.insert(TWCS_FALLBACK_TO_LOCAL);
89    set.insert(TWCS_TIME_WINDOW);
90    set.insert(TWCS_TRIGGER_FILE_NUM);
91    set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
92    set
93});
94
95/// Returns true if the `key` is a valid key for database.
96pub fn validate_database_option(key: &str) -> bool {
97    VALID_DB_OPT_KEYS.contains(&key)
98}
99
100/// Returns true if the `key` is a valid key for any engine or storage.
101pub fn validate_table_option(key: &str) -> bool {
102    if is_supported_in_s3(key) {
103        return true;
104    }
105
106    if is_supported_in_oss(key) {
107        return true;
108    }
109
110    if is_mito_engine_option_key(key) {
111        return true;
112    }
113
114    if is_metric_engine_option_key(key) {
115        return true;
116    }
117
118    VALID_TABLE_OPTION_KEYS.contains(&key)
119}
120
121#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
122#[serde(default)]
123pub struct TableOptions {
124    /// Memtable size of memtable.
125    pub write_buffer_size: Option<ReadableSize>,
126    /// Time-to-live of table. Expired data will be automatically purged.
127    pub ttl: Option<TimeToLive>,
128    /// Skip wal write for this table.
129    pub skip_wal: bool,
130    /// Extra options that may not applicable to all table engines.
131    pub extra_options: HashMap<String, String>,
132}
133
134pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
135pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
136pub const STORAGE_KEY: &str = "storage";
137pub const COMMENT_KEY: &str = "comment";
138pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
139pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
140
141impl TableOptions {
142    pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
143        iter: U,
144    ) -> Result<TableOptions> {
145        let mut options = TableOptions::default();
146
147        let kvs: HashMap<String, String> = iter
148            .into_iter()
149            .map(|(k, v)| (k.to_string(), v.to_string()))
150            .collect();
151
152        if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
153            let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
154                ParseTableOptionSnafu {
155                    key: WRITE_BUFFER_SIZE_KEY,
156                    value: write_buffer_size,
157                }
158                .build()
159            })?;
160            options.write_buffer_size = Some(size)
161        }
162
163        if let Some(ttl) = kvs.get(TTL_KEY) {
164            let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
165                ParseTableOptionSnafu {
166                    key: TTL_KEY,
167                    value: ttl,
168                }
169                .build()
170            })?;
171            options.ttl = Some(ttl_value);
172        }
173
174        if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
175            options.skip_wal = skip_wal.parse().map_err(|_| {
176                ParseTableOptionSnafu {
177                    key: SKIP_WAL_KEY,
178                    value: skip_wal,
179                }
180                .build()
181            })?;
182        }
183
184        options.extra_options = HashMap::from_iter(
185            kvs.into_iter()
186                .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
187        );
188
189        Ok(options)
190    }
191}
192
193impl fmt::Display for TableOptions {
194    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
195        let mut key_vals = vec![];
196        if let Some(size) = self.write_buffer_size {
197            key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
198        }
199
200        if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
201            key_vals.push(format!("{}={}", TTL_KEY, ttl));
202        }
203
204        if self.skip_wal {
205            key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
206        }
207
208        for (k, v) in &self.extra_options {
209            key_vals.push(format!("{}={}", k, v));
210        }
211
212        write!(f, "{}", key_vals.join(" "))
213    }
214}
215
216impl From<&TableOptions> for HashMap<String, String> {
217    fn from(opts: &TableOptions) -> Self {
218        let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
219        if let Some(write_buffer_size) = opts.write_buffer_size {
220            let _ = res.insert(
221                WRITE_BUFFER_SIZE_KEY.to_string(),
222                write_buffer_size.to_string(),
223            );
224        }
225        if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
226            let _ = res.insert(TTL_KEY.to_string(), ttl_str);
227        }
228        res.extend(
229            opts.extra_options
230                .iter()
231                .map(|(k, v)| (k.clone(), v.clone())),
232        );
233        res
234    }
235}
236
237/// Alter table request
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct AlterTableRequest {
240    pub catalog_name: String,
241    pub schema_name: String,
242    pub table_name: String,
243    pub table_id: TableId,
244    pub alter_kind: AlterKind,
245    // None in standalone.
246    pub table_version: Option<TableVersion>,
247}
248
249/// Add column request
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct AddColumnRequest {
252    pub column_schema: ColumnSchema,
253    pub is_key: bool,
254    pub location: Option<AddColumnLocation>,
255    /// Add column if not exists.
256    pub add_if_not_exists: bool,
257}
258
259/// Change column datatype request
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ModifyColumnTypeRequest {
262    pub column_name: String,
263    pub target_type: ConcreteDataType,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub enum AlterKind {
268    AddColumns {
269        columns: Vec<AddColumnRequest>,
270    },
271    DropColumns {
272        names: Vec<String>,
273    },
274    ModifyColumnTypes {
275        columns: Vec<ModifyColumnTypeRequest>,
276    },
277    RenameTable {
278        new_table_name: String,
279    },
280    SetTableOptions {
281        options: Vec<SetRegionOption>,
282    },
283    UnsetTableOptions {
284        keys: Vec<UnsetRegionOption>,
285    },
286    SetIndexes {
287        options: Vec<SetIndexOption>,
288    },
289    UnsetIndexes {
290        options: Vec<UnsetIndexOption>,
291    },
292    DropDefaults {
293        names: Vec<String>,
294    },
295    SetDefaults {
296        defaults: Vec<SetDefaultRequest>,
297    },
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SetDefaultRequest {
302    pub column_name: String,
303    pub default_constraint: Option<ColumnDefaultConstraint>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub enum SetIndexOption {
308    Fulltext {
309        column_name: String,
310        options: FulltextOptions,
311    },
312    Inverted {
313        column_name: String,
314    },
315    Skipping {
316        column_name: String,
317        options: SkippingIndexOptions,
318    },
319}
320
321impl SetIndexOption {
322    /// Returns the column name of the index option.
323    pub fn column_name(&self) -> &str {
324        match self {
325            SetIndexOption::Fulltext { column_name, .. } => column_name,
326            SetIndexOption::Inverted { column_name, .. } => column_name,
327            SetIndexOption::Skipping { column_name, .. } => column_name,
328        }
329    }
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub enum UnsetIndexOption {
334    Fulltext { column_name: String },
335    Inverted { column_name: String },
336    Skipping { column_name: String },
337}
338
339impl UnsetIndexOption {
340    /// Returns the column name of the index option.
341    pub fn column_name(&self) -> &str {
342        match self {
343            UnsetIndexOption::Fulltext { column_name, .. } => column_name,
344            UnsetIndexOption::Inverted { column_name, .. } => column_name,
345            UnsetIndexOption::Skipping { column_name, .. } => column_name,
346        }
347    }
348}
349
350#[derive(Debug)]
351pub struct InsertRequest {
352    pub catalog_name: String,
353    pub schema_name: String,
354    pub table_name: String,
355    pub columns_values: HashMap<String, VectorRef>,
356}
357
358/// Delete (by primary key) request
359#[derive(Debug)]
360pub struct DeleteRequest {
361    pub catalog_name: String,
362    pub schema_name: String,
363    pub table_name: String,
364    /// Values of each column in this table's primary key and time index.
365    ///
366    /// The key is the column name, and the value is the column value.
367    pub key_column_values: HashMap<String, VectorRef>,
368}
369
370#[derive(Debug)]
371pub enum CopyDirection {
372    Export,
373    Import,
374}
375
376/// Copy table request
377#[derive(Debug)]
378pub struct CopyTableRequest {
379    pub catalog_name: String,
380    pub schema_name: String,
381    pub table_name: String,
382    pub location: String,
383    pub with: HashMap<String, String>,
384    pub connection: HashMap<String, String>,
385    pub pattern: Option<String>,
386    pub direction: CopyDirection,
387    pub timestamp_range: Option<TimestampRange>,
388    pub limit: Option<u64>,
389}
390
391#[derive(Debug, Clone, Default)]
392pub struct FlushTableRequest {
393    pub catalog_name: String,
394    pub schema_name: String,
395    pub table_name: String,
396}
397
398#[derive(Debug, Clone, PartialEq)]
399pub struct CompactTableRequest {
400    pub catalog_name: String,
401    pub schema_name: String,
402    pub table_name: String,
403    pub compact_options: compact_request::Options,
404}
405
406impl Default for CompactTableRequest {
407    fn default() -> Self {
408        Self {
409            catalog_name: Default::default(),
410            schema_name: Default::default(),
411            table_name: Default::default(),
412            compact_options: compact_request::Options::Regular(Default::default()),
413        }
414    }
415}
416
417/// Truncate table request
418#[derive(Debug, Clone, Serialize, Deserialize)]
419pub struct TruncateTableRequest {
420    pub catalog_name: String,
421    pub schema_name: String,
422    pub table_name: String,
423    pub table_id: TableId,
424}
425
426impl TruncateTableRequest {
427    pub fn table_ref(&self) -> TableReference {
428        TableReference {
429            catalog: &self.catalog_name,
430            schema: &self.schema_name,
431            table: &self.table_name,
432        }
433    }
434}
435
436#[derive(Debug, Clone, Default, Deserialize, Serialize)]
437pub struct CopyDatabaseRequest {
438    pub catalog_name: String,
439    pub schema_name: String,
440    pub location: String,
441    pub with: HashMap<String, String>,
442    pub connection: HashMap<String, String>,
443    pub time_range: Option<TimestampRange>,
444}
445
446#[derive(Debug, Clone, Default, Deserialize, Serialize)]
447pub struct CopyQueryToRequest {
448    pub location: String,
449    pub with: HashMap<String, String>,
450    pub connection: HashMap<String, String>,
451}
452
453#[cfg(test)]
454mod tests {
455    use std::time::Duration;
456
457    use super::*;
458
459    #[test]
460    fn test_validate_table_option() {
461        assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
462        assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
463        assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
464        assert!(validate_table_option(TTL_KEY));
465        assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
466        assert!(validate_table_option(STORAGE_KEY));
467        assert!(!validate_table_option("foo"));
468    }
469
470    #[test]
471    fn test_serialize_table_options() {
472        let options = TableOptions {
473            write_buffer_size: None,
474            ttl: Some(Duration::from_secs(1000).into()),
475            extra_options: HashMap::new(),
476            skip_wal: false,
477        };
478        let serialized = serde_json::to_string(&options).unwrap();
479        let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
480        assert_eq!(options, deserialized);
481    }
482
483    #[test]
484    fn test_convert_hashmap_between_table_options() {
485        let options = TableOptions {
486            write_buffer_size: Some(ReadableSize::mb(128)),
487            ttl: Some(Duration::from_secs(1000).into()),
488            extra_options: HashMap::new(),
489            skip_wal: false,
490        };
491        let serialized_map = HashMap::from(&options);
492        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
493        assert_eq!(options, serialized);
494
495        let options = TableOptions {
496            write_buffer_size: None,
497            ttl: Default::default(),
498            extra_options: HashMap::new(),
499            skip_wal: false,
500        };
501        let serialized_map = HashMap::from(&options);
502        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
503        assert_eq!(options, serialized);
504
505        let options = TableOptions {
506            write_buffer_size: Some(ReadableSize::mb(128)),
507            ttl: Some(Duration::from_secs(1000).into()),
508            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
509            skip_wal: false,
510        };
511        let serialized_map = HashMap::from(&options);
512        let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
513        assert_eq!(options, serialized);
514    }
515
516    #[test]
517    fn test_table_options_to_string() {
518        let options = TableOptions {
519            write_buffer_size: Some(ReadableSize::mb(128)),
520            ttl: Some(Duration::from_secs(1000).into()),
521            extra_options: HashMap::new(),
522            skip_wal: false,
523        };
524
525        assert_eq!(
526            "write_buffer_size=128.0MiB ttl=16m 40s",
527            options.to_string()
528        );
529
530        let options = TableOptions {
531            write_buffer_size: Some(ReadableSize::mb(128)),
532            ttl: Some(Duration::from_secs(1000).into()),
533            extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
534            skip_wal: false,
535        };
536
537        assert_eq!(
538            "write_buffer_size=128.0MiB ttl=16m 40s a=A",
539            options.to_string()
540        );
541
542        let options = TableOptions {
543            write_buffer_size: Some(ReadableSize::mb(128)),
544            ttl: Some(Duration::from_secs(1000).into()),
545            extra_options: HashMap::new(),
546            skip_wal: true,
547        };
548        assert_eq!(
549            "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
550            options.to_string()
551        );
552    }
553}