1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30 ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36 LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39 APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY, TWCS_FALLBACK_TO_LOCAL,
40 TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, is_mito_engine_option_key,
41};
42use store_api::region_request::{SetRegionOption, UnsetRegionOption};
43
44use crate::error::{ParseTableOptionSnafu, Result};
45use crate::metadata::{TableId, TableVersion};
46use crate::table_reference::TableReference;
47
48pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
49pub const FILE_TABLE_LOCATION_KEY: &str = "location";
50pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
51pub const FILE_TABLE_FORMAT_KEY: &str = "format";
52
53pub const TABLE_DATA_MODEL: &str = "table_data_model";
54pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
55
56pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
57pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
58
59pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
60 WRITE_BUFFER_SIZE_KEY,
62 TTL_KEY,
63 STORAGE_KEY,
64 COMMENT_KEY,
65 SKIP_WAL_KEY,
66 FILE_TABLE_LOCATION_KEY,
68 FILE_TABLE_FORMAT_KEY,
69 FILE_TABLE_PATTERN_KEY,
70 PHYSICAL_TABLE_METADATA_KEY,
72 LOGICAL_TABLE_METADATA_KEY,
73 TABLE_DATA_MODEL,
75 OTLP_METRIC_COMPAT_KEY,
76];
77
78static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
80 let mut set = HashSet::new();
81 set.insert(TTL_KEY);
82 set.insert(STORAGE_KEY);
83 set.insert(MEMTABLE_TYPE);
84 set.insert(APPEND_MODE_KEY);
85 set.insert(MERGE_MODE_KEY);
86 set.insert(SKIP_WAL_KEY);
87 set.insert(COMPACTION_TYPE);
88 set.insert(TWCS_FALLBACK_TO_LOCAL);
89 set.insert(TWCS_TIME_WINDOW);
90 set.insert(TWCS_TRIGGER_FILE_NUM);
91 set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
92 set
93});
94
95pub fn validate_database_option(key: &str) -> bool {
97 VALID_DB_OPT_KEYS.contains(&key)
98}
99
100pub fn validate_table_option(key: &str) -> bool {
102 if is_supported_in_s3(key) {
103 return true;
104 }
105
106 if is_supported_in_oss(key) {
107 return true;
108 }
109
110 if is_mito_engine_option_key(key) {
111 return true;
112 }
113
114 if is_metric_engine_option_key(key) {
115 return true;
116 }
117
118 VALID_TABLE_OPTION_KEYS.contains(&key)
119}
120
121#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
122#[serde(default)]
123pub struct TableOptions {
124 pub write_buffer_size: Option<ReadableSize>,
126 pub ttl: Option<TimeToLive>,
128 pub skip_wal: bool,
130 pub extra_options: HashMap<String, String>,
132}
133
134pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
135pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
136pub const STORAGE_KEY: &str = "storage";
137pub const COMMENT_KEY: &str = "comment";
138pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
139pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
140
141impl TableOptions {
142 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
143 iter: U,
144 ) -> Result<TableOptions> {
145 let mut options = TableOptions::default();
146
147 let kvs: HashMap<String, String> = iter
148 .into_iter()
149 .map(|(k, v)| (k.to_string(), v.to_string()))
150 .collect();
151
152 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
153 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
154 ParseTableOptionSnafu {
155 key: WRITE_BUFFER_SIZE_KEY,
156 value: write_buffer_size,
157 }
158 .build()
159 })?;
160 options.write_buffer_size = Some(size)
161 }
162
163 if let Some(ttl) = kvs.get(TTL_KEY) {
164 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
165 ParseTableOptionSnafu {
166 key: TTL_KEY,
167 value: ttl,
168 }
169 .build()
170 })?;
171 options.ttl = Some(ttl_value);
172 }
173
174 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
175 options.skip_wal = skip_wal.parse().map_err(|_| {
176 ParseTableOptionSnafu {
177 key: SKIP_WAL_KEY,
178 value: skip_wal,
179 }
180 .build()
181 })?;
182 }
183
184 options.extra_options = HashMap::from_iter(
185 kvs.into_iter()
186 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
187 );
188
189 Ok(options)
190 }
191}
192
193impl fmt::Display for TableOptions {
194 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
195 let mut key_vals = vec![];
196 if let Some(size) = self.write_buffer_size {
197 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
198 }
199
200 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
201 key_vals.push(format!("{}={}", TTL_KEY, ttl));
202 }
203
204 if self.skip_wal {
205 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
206 }
207
208 for (k, v) in &self.extra_options {
209 key_vals.push(format!("{}={}", k, v));
210 }
211
212 write!(f, "{}", key_vals.join(" "))
213 }
214}
215
216impl From<&TableOptions> for HashMap<String, String> {
217 fn from(opts: &TableOptions) -> Self {
218 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
219 if let Some(write_buffer_size) = opts.write_buffer_size {
220 let _ = res.insert(
221 WRITE_BUFFER_SIZE_KEY.to_string(),
222 write_buffer_size.to_string(),
223 );
224 }
225 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
226 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
227 }
228 res.extend(
229 opts.extra_options
230 .iter()
231 .map(|(k, v)| (k.clone(), v.clone())),
232 );
233 res
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct AlterTableRequest {
240 pub catalog_name: String,
241 pub schema_name: String,
242 pub table_name: String,
243 pub table_id: TableId,
244 pub alter_kind: AlterKind,
245 pub table_version: Option<TableVersion>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct AddColumnRequest {
252 pub column_schema: ColumnSchema,
253 pub is_key: bool,
254 pub location: Option<AddColumnLocation>,
255 pub add_if_not_exists: bool,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ModifyColumnTypeRequest {
262 pub column_name: String,
263 pub target_type: ConcreteDataType,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub enum AlterKind {
268 AddColumns {
269 columns: Vec<AddColumnRequest>,
270 },
271 DropColumns {
272 names: Vec<String>,
273 },
274 ModifyColumnTypes {
275 columns: Vec<ModifyColumnTypeRequest>,
276 },
277 RenameTable {
278 new_table_name: String,
279 },
280 SetTableOptions {
281 options: Vec<SetRegionOption>,
282 },
283 UnsetTableOptions {
284 keys: Vec<UnsetRegionOption>,
285 },
286 SetIndexes {
287 options: Vec<SetIndexOption>,
288 },
289 UnsetIndexes {
290 options: Vec<UnsetIndexOption>,
291 },
292 DropDefaults {
293 names: Vec<String>,
294 },
295 SetDefaults {
296 defaults: Vec<SetDefaultRequest>,
297 },
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SetDefaultRequest {
302 pub column_name: String,
303 pub default_constraint: Option<ColumnDefaultConstraint>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub enum SetIndexOption {
308 Fulltext {
309 column_name: String,
310 options: FulltextOptions,
311 },
312 Inverted {
313 column_name: String,
314 },
315 Skipping {
316 column_name: String,
317 options: SkippingIndexOptions,
318 },
319}
320
321impl SetIndexOption {
322 pub fn column_name(&self) -> &str {
324 match self {
325 SetIndexOption::Fulltext { column_name, .. } => column_name,
326 SetIndexOption::Inverted { column_name, .. } => column_name,
327 SetIndexOption::Skipping { column_name, .. } => column_name,
328 }
329 }
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub enum UnsetIndexOption {
334 Fulltext { column_name: String },
335 Inverted { column_name: String },
336 Skipping { column_name: String },
337}
338
339impl UnsetIndexOption {
340 pub fn column_name(&self) -> &str {
342 match self {
343 UnsetIndexOption::Fulltext { column_name, .. } => column_name,
344 UnsetIndexOption::Inverted { column_name, .. } => column_name,
345 UnsetIndexOption::Skipping { column_name, .. } => column_name,
346 }
347 }
348}
349
350#[derive(Debug)]
351pub struct InsertRequest {
352 pub catalog_name: String,
353 pub schema_name: String,
354 pub table_name: String,
355 pub columns_values: HashMap<String, VectorRef>,
356}
357
358#[derive(Debug)]
360pub struct DeleteRequest {
361 pub catalog_name: String,
362 pub schema_name: String,
363 pub table_name: String,
364 pub key_column_values: HashMap<String, VectorRef>,
368}
369
370#[derive(Debug)]
371pub enum CopyDirection {
372 Export,
373 Import,
374}
375
376#[derive(Debug)]
378pub struct CopyTableRequest {
379 pub catalog_name: String,
380 pub schema_name: String,
381 pub table_name: String,
382 pub location: String,
383 pub with: HashMap<String, String>,
384 pub connection: HashMap<String, String>,
385 pub pattern: Option<String>,
386 pub direction: CopyDirection,
387 pub timestamp_range: Option<TimestampRange>,
388 pub limit: Option<u64>,
389}
390
391#[derive(Debug, Clone, Default)]
392pub struct FlushTableRequest {
393 pub catalog_name: String,
394 pub schema_name: String,
395 pub table_name: String,
396}
397
398#[derive(Debug, Clone, Default)]
399pub struct BuildIndexTableRequest {
400 pub catalog_name: String,
401 pub schema_name: String,
402 pub table_name: String,
403}
404
405#[derive(Debug, Clone, PartialEq)]
406pub struct CompactTableRequest {
407 pub catalog_name: String,
408 pub schema_name: String,
409 pub table_name: String,
410 pub compact_options: compact_request::Options,
411 pub parallelism: u32,
412}
413
414impl Default for CompactTableRequest {
415 fn default() -> Self {
416 Self {
417 catalog_name: Default::default(),
418 schema_name: Default::default(),
419 table_name: Default::default(),
420 compact_options: compact_request::Options::Regular(Default::default()),
421 parallelism: 1,
422 }
423 }
424}
425
426#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct TruncateTableRequest {
429 pub catalog_name: String,
430 pub schema_name: String,
431 pub table_name: String,
432 pub table_id: TableId,
433}
434
435impl TruncateTableRequest {
436 pub fn table_ref(&self) -> TableReference<'_> {
437 TableReference {
438 catalog: &self.catalog_name,
439 schema: &self.schema_name,
440 table: &self.table_name,
441 }
442 }
443}
444
445#[derive(Debug, Clone, Default, Deserialize, Serialize)]
446pub struct CopyDatabaseRequest {
447 pub catalog_name: String,
448 pub schema_name: String,
449 pub location: String,
450 pub with: HashMap<String, String>,
451 pub connection: HashMap<String, String>,
452 pub time_range: Option<TimestampRange>,
453}
454
455#[derive(Debug, Clone, Default, Deserialize, Serialize)]
456pub struct CopyQueryToRequest {
457 pub location: String,
458 pub with: HashMap<String, String>,
459 pub connection: HashMap<String, String>,
460}
461
462#[cfg(test)]
463mod tests {
464 use std::time::Duration;
465
466 use super::*;
467
468 #[test]
469 fn test_validate_table_option() {
470 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
471 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
472 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
473 assert!(validate_table_option(TTL_KEY));
474 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
475 assert!(validate_table_option(STORAGE_KEY));
476 assert!(!validate_table_option("foo"));
477 }
478
479 #[test]
480 fn test_serialize_table_options() {
481 let options = TableOptions {
482 write_buffer_size: None,
483 ttl: Some(Duration::from_secs(1000).into()),
484 extra_options: HashMap::new(),
485 skip_wal: false,
486 };
487 let serialized = serde_json::to_string(&options).unwrap();
488 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
489 assert_eq!(options, deserialized);
490 }
491
492 #[test]
493 fn test_convert_hashmap_between_table_options() {
494 let options = TableOptions {
495 write_buffer_size: Some(ReadableSize::mb(128)),
496 ttl: Some(Duration::from_secs(1000).into()),
497 extra_options: HashMap::new(),
498 skip_wal: false,
499 };
500 let serialized_map = HashMap::from(&options);
501 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
502 assert_eq!(options, serialized);
503
504 let options = TableOptions {
505 write_buffer_size: None,
506 ttl: Default::default(),
507 extra_options: HashMap::new(),
508 skip_wal: false,
509 };
510 let serialized_map = HashMap::from(&options);
511 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
512 assert_eq!(options, serialized);
513
514 let options = TableOptions {
515 write_buffer_size: Some(ReadableSize::mb(128)),
516 ttl: Some(Duration::from_secs(1000).into()),
517 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
518 skip_wal: false,
519 };
520 let serialized_map = HashMap::from(&options);
521 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
522 assert_eq!(options, serialized);
523 }
524
525 #[test]
526 fn test_table_options_to_string() {
527 let options = TableOptions {
528 write_buffer_size: Some(ReadableSize::mb(128)),
529 ttl: Some(Duration::from_secs(1000).into()),
530 extra_options: HashMap::new(),
531 skip_wal: false,
532 };
533
534 assert_eq!(
535 "write_buffer_size=128.0MiB ttl=16m 40s",
536 options.to_string()
537 );
538
539 let options = TableOptions {
540 write_buffer_size: Some(ReadableSize::mb(128)),
541 ttl: Some(Duration::from_secs(1000).into()),
542 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
543 skip_wal: false,
544 };
545
546 assert_eq!(
547 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
548 options.to_string()
549 );
550
551 let options = TableOptions {
552 write_buffer_size: Some(ReadableSize::mb(128)),
553 ttl: Some(Duration::from_secs(1000).into()),
554 extra_options: HashMap::new(),
555 skip_wal: true,
556 };
557 assert_eq!(
558 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
559 options.to_string()
560 );
561 }
562}