1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30 ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36 LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39 APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD,
40 MEMTABLE_BULK_ENCODE_ROW_THRESHOLD, MEMTABLE_BULK_MAX_MERGE_GROUPS,
41 MEMTABLE_BULK_MERGE_THRESHOLD, MEMTABLE_TYPE, MERGE_MODE_KEY, SST_FORMAT_KEY,
42 TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
43 is_mito_engine_option_key,
44};
45use store_api::region_request::{SetRegionOption, UnsetRegionOption};
46
47use crate::error::{ParseTableOptionSnafu, Result};
48use crate::metadata::{TableId, TableVersion};
49use crate::table_reference::TableReference;
50
51mod semantic;
52pub use semantic::*;
53
54pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
55pub const FILE_TABLE_LOCATION_KEY: &str = "location";
56pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
57pub const FILE_TABLE_FORMAT_KEY: &str = "format";
58
59pub const TABLE_DATA_MODEL: &str = "table_data_model";
60pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
61
62pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
63pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
64
65pub const VALID_TABLE_OPTION_KEYS: [&str; 14] = [
66 WRITE_BUFFER_SIZE_KEY,
68 TTL_KEY,
69 STORAGE_KEY,
70 COMMENT_KEY,
71 SKIP_WAL_KEY,
72 SST_FORMAT_KEY,
73 FILE_TABLE_LOCATION_KEY,
75 FILE_TABLE_FORMAT_KEY,
76 FILE_TABLE_PATTERN_KEY,
77 PHYSICAL_TABLE_METADATA_KEY,
79 LOGICAL_TABLE_METADATA_KEY,
80 TABLE_DATA_MODEL,
82 OTLP_METRIC_COMPAT_KEY,
83 REPARTITION_COLUMN_HINT_KEY,
84];
85
86pub const DDL_TIMEOUT: &str = "timeout";
87pub const DDL_WAIT: &str = "wait";
88
89pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
90
91static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
93 let mut set = HashSet::new();
94 set.insert(TTL_KEY);
95 set.insert(STORAGE_KEY);
96 set.insert(MEMTABLE_TYPE);
97 set.insert(MEMTABLE_BULK_MERGE_THRESHOLD);
98 set.insert(MEMTABLE_BULK_ENCODE_ROW_THRESHOLD);
99 set.insert(MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD);
100 set.insert(MEMTABLE_BULK_MAX_MERGE_GROUPS);
101 set.insert(APPEND_MODE_KEY);
102 set.insert(MERGE_MODE_KEY);
103 set.insert(SKIP_WAL_KEY);
104 set.insert(COMPACTION_TYPE);
105 set.insert(TWCS_FALLBACK_TO_LOCAL);
106 set.insert(TWCS_TIME_WINDOW);
107 set.insert(TWCS_TRIGGER_FILE_NUM);
108 set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
109 set.insert(SST_FORMAT_KEY);
110 set
111});
112
113pub fn validate_database_option(key: &str) -> bool {
115 VALID_DB_OPT_KEYS.contains(&key)
116}
117
118pub fn validate_table_option(key: &str) -> bool {
120 if is_supported_in_s3(key) {
121 return true;
122 }
123
124 if is_supported_in_oss(key) {
125 return true;
126 }
127
128 if is_mito_engine_option_key(key) {
129 return true;
130 }
131
132 if is_metric_engine_option_key(key) {
133 return true;
134 }
135
136 if is_semantic_option_key(key) {
139 return true;
140 }
141
142 VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
143}
144
145#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
146#[serde(default)]
147pub struct TableOptions {
148 pub write_buffer_size: Option<ReadableSize>,
150 pub ttl: Option<TimeToLive>,
152 pub skip_wal: bool,
154 pub extra_options: HashMap<String, String>,
156}
157
158pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
159pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
160pub const STORAGE_KEY: &str = "storage";
161pub const COMMENT_KEY: &str = "comment";
162pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
163pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
164pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
165pub const REPARTITION_COLUMN_HINT_KEY: &str = "repartition.column.hint";
166
167impl TableOptions {
168 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
169 iter: U,
170 ) -> Result<TableOptions> {
171 let mut options = TableOptions::default();
172
173 let kvs: HashMap<String, String> = iter
174 .into_iter()
175 .map(|(k, v)| (k.to_string(), v.to_string()))
176 .collect();
177
178 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
179 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
180 ParseTableOptionSnafu {
181 key: WRITE_BUFFER_SIZE_KEY,
182 value: write_buffer_size,
183 }
184 .build()
185 })?;
186 options.write_buffer_size = Some(size)
187 }
188
189 if let Some(ttl) = kvs.get(TTL_KEY) {
190 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
191 ParseTableOptionSnafu {
192 key: TTL_KEY,
193 value: ttl,
194 }
195 .build()
196 })?;
197 options.ttl = Some(ttl_value);
198 }
199
200 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
201 options.skip_wal = skip_wal.parse().map_err(|_| {
202 ParseTableOptionSnafu {
203 key: SKIP_WAL_KEY,
204 value: skip_wal,
205 }
206 .build()
207 })?;
208 }
209
210 options.extra_options = HashMap::from_iter(
211 kvs.into_iter()
212 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
213 );
214
215 Ok(options)
216 }
217}
218
219impl fmt::Display for TableOptions {
220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221 let mut key_vals = vec![];
222 if let Some(size) = self.write_buffer_size {
223 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
224 }
225
226 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
227 key_vals.push(format!("{}={}", TTL_KEY, ttl));
228 }
229
230 if self.skip_wal {
231 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
232 }
233
234 for (k, v) in &self.extra_options {
235 key_vals.push(format!("{}={}", k, v));
236 }
237
238 write!(f, "{}", key_vals.join(" "))
239 }
240}
241
242impl From<&TableOptions> for HashMap<String, String> {
243 fn from(opts: &TableOptions) -> Self {
244 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
245 if let Some(write_buffer_size) = opts.write_buffer_size {
246 let _ = res.insert(
247 WRITE_BUFFER_SIZE_KEY.to_string(),
248 write_buffer_size.to_string(),
249 );
250 }
251 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
252 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
253 }
254 res.extend(
255 opts.extra_options
256 .iter()
257 .map(|(k, v)| (k.clone(), v.clone())),
258 );
259 res
260 }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct AlterTableRequest {
266 pub catalog_name: String,
267 pub schema_name: String,
268 pub table_name: String,
269 pub table_id: TableId,
270 pub alter_kind: AlterKind,
271 pub table_version: Option<TableVersion>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct AddColumnRequest {
278 pub column_schema: ColumnSchema,
279 pub is_key: bool,
280 pub location: Option<AddColumnLocation>,
281 pub add_if_not_exists: bool,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct ModifyColumnTypeRequest {
288 pub column_name: String,
289 pub target_type: ConcreteDataType,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub enum AlterKind {
294 AddColumns {
295 columns: Vec<AddColumnRequest>,
296 },
297 DropColumns {
298 names: Vec<String>,
299 },
300 ModifyColumnTypes {
301 columns: Vec<ModifyColumnTypeRequest>,
302 },
303 RenameTable {
304 new_table_name: String,
305 },
306 SetTableOptions {
307 options: Vec<SetRegionOption>,
308 },
309 UnsetTableOptions {
310 keys: Vec<UnsetRegionOption>,
311 },
312 SetRepartitionColumnHint {
313 column_name: String,
314 },
315 UnsetRepartitionColumnHint,
316 SetIndexes {
317 options: Vec<SetIndexOption>,
318 },
319 UnsetIndexes {
320 options: Vec<UnsetIndexOption>,
321 },
322 DropDefaults {
323 names: Vec<String>,
324 },
325 SetDefaults {
326 defaults: Vec<SetDefaultRequest>,
327 },
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SetDefaultRequest {
332 pub column_name: String,
333 pub default_constraint: Option<ColumnDefaultConstraint>,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
337pub enum SetIndexOption {
338 Fulltext {
339 column_name: String,
340 options: FulltextOptions,
341 },
342 Inverted {
343 column_name: String,
344 },
345 Skipping {
346 column_name: String,
347 options: SkippingIndexOptions,
348 },
349}
350
351impl SetIndexOption {
352 pub fn column_name(&self) -> &str {
354 match self {
355 SetIndexOption::Fulltext { column_name, .. } => column_name,
356 SetIndexOption::Inverted { column_name, .. } => column_name,
357 SetIndexOption::Skipping { column_name, .. } => column_name,
358 }
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363pub enum UnsetIndexOption {
364 Fulltext { column_name: String },
365 Inverted { column_name: String },
366 Skipping { column_name: String },
367}
368
369impl UnsetIndexOption {
370 pub fn column_name(&self) -> &str {
372 match self {
373 UnsetIndexOption::Fulltext { column_name, .. } => column_name,
374 UnsetIndexOption::Inverted { column_name, .. } => column_name,
375 UnsetIndexOption::Skipping { column_name, .. } => column_name,
376 }
377 }
378}
379
380#[derive(Debug)]
381pub struct InsertRequest {
382 pub catalog_name: String,
383 pub schema_name: String,
384 pub table_name: String,
385 pub columns_values: HashMap<String, VectorRef>,
386}
387
388#[derive(Debug)]
390pub struct DeleteRequest {
391 pub catalog_name: String,
392 pub schema_name: String,
393 pub table_name: String,
394 pub key_column_values: HashMap<String, VectorRef>,
398}
399
400#[derive(Debug)]
401pub enum CopyDirection {
402 Export,
403 Import,
404}
405
406#[derive(Debug)]
408pub struct CopyTableRequest {
409 pub catalog_name: String,
410 pub schema_name: String,
411 pub table_name: String,
412 pub location: String,
413 pub with: HashMap<String, String>,
414 pub connection: HashMap<String, String>,
415 pub pattern: Option<String>,
416 pub direction: CopyDirection,
417 pub timestamp_range: Option<TimestampRange>,
418 pub limit: Option<u64>,
419}
420
421#[derive(Debug, Clone, Default)]
422pub struct FlushTableRequest {
423 pub catalog_name: String,
424 pub schema_name: String,
425 pub table_name: String,
426}
427
428#[derive(Debug, Clone, Default)]
429pub struct BuildIndexTableRequest {
430 pub catalog_name: String,
431 pub schema_name: String,
432 pub table_name: String,
433}
434
435#[derive(Debug, Clone, PartialEq)]
436pub struct CompactTableRequest {
437 pub catalog_name: String,
438 pub schema_name: String,
439 pub table_name: String,
440 pub compact_options: compact_request::Options,
441 pub parallelism: u32,
442}
443
444impl Default for CompactTableRequest {
445 fn default() -> Self {
446 Self {
447 catalog_name: Default::default(),
448 schema_name: Default::default(),
449 table_name: Default::default(),
450 compact_options: compact_request::Options::Regular(Default::default()),
451 parallelism: 1,
452 }
453 }
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct TruncateTableRequest {
459 pub catalog_name: String,
460 pub schema_name: String,
461 pub table_name: String,
462 pub table_id: TableId,
463}
464
465impl TruncateTableRequest {
466 pub fn table_ref(&self) -> TableReference<'_> {
467 TableReference {
468 catalog: &self.catalog_name,
469 schema: &self.schema_name,
470 table: &self.table_name,
471 }
472 }
473}
474
475#[derive(Debug, Clone, Default, Deserialize, Serialize)]
476pub struct CopyDatabaseRequest {
477 pub catalog_name: String,
478 pub schema_name: String,
479 pub location: String,
480 pub with: HashMap<String, String>,
481 pub connection: HashMap<String, String>,
482 pub time_range: Option<TimestampRange>,
483}
484
485#[derive(Debug, Clone, Default, Deserialize, Serialize)]
486pub struct CopyQueryToRequest {
487 pub location: String,
488 pub with: HashMap<String, String>,
489 pub connection: HashMap<String, String>,
490}
491
492#[cfg(test)]
493mod tests {
494 use std::time::Duration;
495
496 use super::*;
497
498 #[test]
499 fn test_validate_table_option() {
500 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
501 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
502 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
503 assert!(validate_table_option(TTL_KEY));
504 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
505 assert!(validate_table_option(STORAGE_KEY));
506 assert!(validate_table_option(MEMTABLE_BULK_MERGE_THRESHOLD));
507 assert!(validate_table_option(REPARTITION_COLUMN_HINT_KEY));
508 assert!(!validate_table_option("foo"));
509
510 assert!(validate_table_option(SEMANTIC_SIGNAL_TYPE));
512 assert!(validate_table_option(SEMANTIC_METRIC_TYPE));
513 assert!(!validate_table_option("greptime.semantic.future.key"));
515 assert!(!validate_table_option("greptime.semanticx"));
516 assert!(!validate_table_option(SEMANTIC_PER_TABLE_INDEX_KEY));
517 }
518
519 #[test]
520 fn test_validate_database_option() {
521 assert!(validate_database_option(MEMTABLE_TYPE));
522 assert!(validate_database_option(MEMTABLE_BULK_MERGE_THRESHOLD));
523 assert!(validate_database_option(MEMTABLE_BULK_ENCODE_ROW_THRESHOLD));
524 assert!(validate_database_option(
525 MEMTABLE_BULK_ENCODE_BYTES_THRESHOLD
526 ));
527 assert!(validate_database_option(MEMTABLE_BULK_MAX_MERGE_GROUPS));
528 assert!(!validate_database_option("foo"));
529 }
530
531 #[test]
532 fn test_serialize_table_options() {
533 let options = TableOptions {
534 write_buffer_size: None,
535 ttl: Some(Duration::from_secs(1000).into()),
536 extra_options: HashMap::new(),
537 skip_wal: false,
538 };
539 let serialized = serde_json::to_string(&options).unwrap();
540 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
541 assert_eq!(options, deserialized);
542 }
543
544 #[test]
545 fn test_convert_hashmap_between_table_options() {
546 let options = TableOptions {
547 write_buffer_size: Some(ReadableSize::mb(128)),
548 ttl: Some(Duration::from_secs(1000).into()),
549 extra_options: HashMap::new(),
550 skip_wal: false,
551 };
552 let serialized_map = HashMap::from(&options);
553 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
554 assert_eq!(options, serialized);
555
556 let options = TableOptions {
557 write_buffer_size: None,
558 ttl: Default::default(),
559 extra_options: HashMap::new(),
560 skip_wal: false,
561 };
562 let serialized_map = HashMap::from(&options);
563 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
564 assert_eq!(options, serialized);
565
566 let options = TableOptions {
567 write_buffer_size: Some(ReadableSize::mb(128)),
568 ttl: Some(Duration::from_secs(1000).into()),
569 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
570 skip_wal: false,
571 };
572 let serialized_map = HashMap::from(&options);
573 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
574 assert_eq!(options, serialized);
575 }
576
577 #[test]
578 fn test_table_options_to_string() {
579 let options = TableOptions {
580 write_buffer_size: Some(ReadableSize::mb(128)),
581 ttl: Some(Duration::from_secs(1000).into()),
582 extra_options: HashMap::new(),
583 skip_wal: false,
584 };
585
586 assert_eq!(
587 "write_buffer_size=128.0MiB ttl=16m 40s",
588 options.to_string()
589 );
590
591 let options = TableOptions {
592 write_buffer_size: Some(ReadableSize::mb(128)),
593 ttl: Some(Duration::from_secs(1000).into()),
594 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
595 skip_wal: false,
596 };
597
598 assert_eq!(
599 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
600 options.to_string()
601 );
602
603 let options = TableOptions {
604 write_buffer_size: Some(ReadableSize::mb(128)),
605 ttl: Some(Duration::from_secs(1000).into()),
606 extra_options: HashMap::new(),
607 skip_wal: true,
608 };
609 assert_eq!(
610 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
611 options.to_string()
612 );
613 }
614}