1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30 ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36 LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39 APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY, TWCS_FALLBACK_TO_LOCAL,
40 TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, is_mito_engine_option_key,
41};
42use store_api::region_request::{SetRegionOption, UnsetRegionOption};
43
44use crate::error::{ParseTableOptionSnafu, Result};
45use crate::metadata::{TableId, TableVersion};
46use crate::table_reference::TableReference;
47
48pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
49pub const FILE_TABLE_LOCATION_KEY: &str = "location";
50pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
51pub const FILE_TABLE_FORMAT_KEY: &str = "format";
52
53pub const TABLE_DATA_MODEL: &str = "table_data_model";
54pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
55
56pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
57pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
58
59pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
60 WRITE_BUFFER_SIZE_KEY,
62 TTL_KEY,
63 STORAGE_KEY,
64 COMMENT_KEY,
65 SKIP_WAL_KEY,
66 FILE_TABLE_LOCATION_KEY,
68 FILE_TABLE_FORMAT_KEY,
69 FILE_TABLE_PATTERN_KEY,
70 PHYSICAL_TABLE_METADATA_KEY,
72 LOGICAL_TABLE_METADATA_KEY,
73 TABLE_DATA_MODEL,
75 OTLP_METRIC_COMPAT_KEY,
76];
77
78pub const DDL_TIMEOUT: &str = "timeout";
79pub const DDL_WAIT: &str = "wait";
80
81pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
82
83static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
85 let mut set = HashSet::new();
86 set.insert(TTL_KEY);
87 set.insert(STORAGE_KEY);
88 set.insert(MEMTABLE_TYPE);
89 set.insert(APPEND_MODE_KEY);
90 set.insert(MERGE_MODE_KEY);
91 set.insert(SKIP_WAL_KEY);
92 set.insert(COMPACTION_TYPE);
93 set.insert(TWCS_FALLBACK_TO_LOCAL);
94 set.insert(TWCS_TIME_WINDOW);
95 set.insert(TWCS_TRIGGER_FILE_NUM);
96 set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
97 set
98});
99
100pub fn validate_database_option(key: &str) -> bool {
102 VALID_DB_OPT_KEYS.contains(&key)
103}
104
105pub fn validate_table_option(key: &str) -> bool {
107 if is_supported_in_s3(key) {
108 return true;
109 }
110
111 if is_supported_in_oss(key) {
112 return true;
113 }
114
115 if is_mito_engine_option_key(key) {
116 return true;
117 }
118
119 if is_metric_engine_option_key(key) {
120 return true;
121 }
122
123 VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
124}
125
126#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
127#[serde(default)]
128pub struct TableOptions {
129 pub write_buffer_size: Option<ReadableSize>,
131 pub ttl: Option<TimeToLive>,
133 pub skip_wal: bool,
135 pub extra_options: HashMap<String, String>,
137}
138
139pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
140pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
141pub const STORAGE_KEY: &str = "storage";
142pub const COMMENT_KEY: &str = "comment";
143pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
144pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
145
146impl TableOptions {
147 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
148 iter: U,
149 ) -> Result<TableOptions> {
150 let mut options = TableOptions::default();
151
152 let kvs: HashMap<String, String> = iter
153 .into_iter()
154 .map(|(k, v)| (k.to_string(), v.to_string()))
155 .collect();
156
157 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
158 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
159 ParseTableOptionSnafu {
160 key: WRITE_BUFFER_SIZE_KEY,
161 value: write_buffer_size,
162 }
163 .build()
164 })?;
165 options.write_buffer_size = Some(size)
166 }
167
168 if let Some(ttl) = kvs.get(TTL_KEY) {
169 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
170 ParseTableOptionSnafu {
171 key: TTL_KEY,
172 value: ttl,
173 }
174 .build()
175 })?;
176 options.ttl = Some(ttl_value);
177 }
178
179 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
180 options.skip_wal = skip_wal.parse().map_err(|_| {
181 ParseTableOptionSnafu {
182 key: SKIP_WAL_KEY,
183 value: skip_wal,
184 }
185 .build()
186 })?;
187 }
188
189 options.extra_options = HashMap::from_iter(
190 kvs.into_iter()
191 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
192 );
193
194 Ok(options)
195 }
196}
197
198impl fmt::Display for TableOptions {
199 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
200 let mut key_vals = vec![];
201 if let Some(size) = self.write_buffer_size {
202 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
203 }
204
205 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
206 key_vals.push(format!("{}={}", TTL_KEY, ttl));
207 }
208
209 if self.skip_wal {
210 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
211 }
212
213 for (k, v) in &self.extra_options {
214 key_vals.push(format!("{}={}", k, v));
215 }
216
217 write!(f, "{}", key_vals.join(" "))
218 }
219}
220
221impl From<&TableOptions> for HashMap<String, String> {
222 fn from(opts: &TableOptions) -> Self {
223 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
224 if let Some(write_buffer_size) = opts.write_buffer_size {
225 let _ = res.insert(
226 WRITE_BUFFER_SIZE_KEY.to_string(),
227 write_buffer_size.to_string(),
228 );
229 }
230 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
231 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
232 }
233 res.extend(
234 opts.extra_options
235 .iter()
236 .map(|(k, v)| (k.clone(), v.clone())),
237 );
238 res
239 }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct AlterTableRequest {
245 pub catalog_name: String,
246 pub schema_name: String,
247 pub table_name: String,
248 pub table_id: TableId,
249 pub alter_kind: AlterKind,
250 pub table_version: Option<TableVersion>,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct AddColumnRequest {
257 pub column_schema: ColumnSchema,
258 pub is_key: bool,
259 pub location: Option<AddColumnLocation>,
260 pub add_if_not_exists: bool,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ModifyColumnTypeRequest {
267 pub column_name: String,
268 pub target_type: ConcreteDataType,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub enum AlterKind {
273 AddColumns {
274 columns: Vec<AddColumnRequest>,
275 },
276 DropColumns {
277 names: Vec<String>,
278 },
279 ModifyColumnTypes {
280 columns: Vec<ModifyColumnTypeRequest>,
281 },
282 RenameTable {
283 new_table_name: String,
284 },
285 SetTableOptions {
286 options: Vec<SetRegionOption>,
287 },
288 UnsetTableOptions {
289 keys: Vec<UnsetRegionOption>,
290 },
291 SetIndexes {
292 options: Vec<SetIndexOption>,
293 },
294 UnsetIndexes {
295 options: Vec<UnsetIndexOption>,
296 },
297 DropDefaults {
298 names: Vec<String>,
299 },
300 SetDefaults {
301 defaults: Vec<SetDefaultRequest>,
302 },
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct SetDefaultRequest {
307 pub column_name: String,
308 pub default_constraint: Option<ColumnDefaultConstraint>,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub enum SetIndexOption {
313 Fulltext {
314 column_name: String,
315 options: FulltextOptions,
316 },
317 Inverted {
318 column_name: String,
319 },
320 Skipping {
321 column_name: String,
322 options: SkippingIndexOptions,
323 },
324}
325
326impl SetIndexOption {
327 pub fn column_name(&self) -> &str {
329 match self {
330 SetIndexOption::Fulltext { column_name, .. } => column_name,
331 SetIndexOption::Inverted { column_name, .. } => column_name,
332 SetIndexOption::Skipping { column_name, .. } => column_name,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub enum UnsetIndexOption {
339 Fulltext { column_name: String },
340 Inverted { column_name: String },
341 Skipping { column_name: String },
342}
343
344impl UnsetIndexOption {
345 pub fn column_name(&self) -> &str {
347 match self {
348 UnsetIndexOption::Fulltext { column_name, .. } => column_name,
349 UnsetIndexOption::Inverted { column_name, .. } => column_name,
350 UnsetIndexOption::Skipping { column_name, .. } => column_name,
351 }
352 }
353}
354
355#[derive(Debug)]
356pub struct InsertRequest {
357 pub catalog_name: String,
358 pub schema_name: String,
359 pub table_name: String,
360 pub columns_values: HashMap<String, VectorRef>,
361}
362
363#[derive(Debug)]
365pub struct DeleteRequest {
366 pub catalog_name: String,
367 pub schema_name: String,
368 pub table_name: String,
369 pub key_column_values: HashMap<String, VectorRef>,
373}
374
375#[derive(Debug)]
376pub enum CopyDirection {
377 Export,
378 Import,
379}
380
381#[derive(Debug)]
383pub struct CopyTableRequest {
384 pub catalog_name: String,
385 pub schema_name: String,
386 pub table_name: String,
387 pub location: String,
388 pub with: HashMap<String, String>,
389 pub connection: HashMap<String, String>,
390 pub pattern: Option<String>,
391 pub direction: CopyDirection,
392 pub timestamp_range: Option<TimestampRange>,
393 pub limit: Option<u64>,
394}
395
396#[derive(Debug, Clone, Default)]
397pub struct FlushTableRequest {
398 pub catalog_name: String,
399 pub schema_name: String,
400 pub table_name: String,
401}
402
403#[derive(Debug, Clone, Default)]
404pub struct BuildIndexTableRequest {
405 pub catalog_name: String,
406 pub schema_name: String,
407 pub table_name: String,
408}
409
410#[derive(Debug, Clone, PartialEq)]
411pub struct CompactTableRequest {
412 pub catalog_name: String,
413 pub schema_name: String,
414 pub table_name: String,
415 pub compact_options: compact_request::Options,
416 pub parallelism: u32,
417}
418
419impl Default for CompactTableRequest {
420 fn default() -> Self {
421 Self {
422 catalog_name: Default::default(),
423 schema_name: Default::default(),
424 table_name: Default::default(),
425 compact_options: compact_request::Options::Regular(Default::default()),
426 parallelism: 1,
427 }
428 }
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct TruncateTableRequest {
434 pub catalog_name: String,
435 pub schema_name: String,
436 pub table_name: String,
437 pub table_id: TableId,
438}
439
440impl TruncateTableRequest {
441 pub fn table_ref(&self) -> TableReference<'_> {
442 TableReference {
443 catalog: &self.catalog_name,
444 schema: &self.schema_name,
445 table: &self.table_name,
446 }
447 }
448}
449
450#[derive(Debug, Clone, Default, Deserialize, Serialize)]
451pub struct CopyDatabaseRequest {
452 pub catalog_name: String,
453 pub schema_name: String,
454 pub location: String,
455 pub with: HashMap<String, String>,
456 pub connection: HashMap<String, String>,
457 pub time_range: Option<TimestampRange>,
458}
459
460#[derive(Debug, Clone, Default, Deserialize, Serialize)]
461pub struct CopyQueryToRequest {
462 pub location: String,
463 pub with: HashMap<String, String>,
464 pub connection: HashMap<String, String>,
465}
466
467#[cfg(test)]
468mod tests {
469 use std::time::Duration;
470
471 use super::*;
472
473 #[test]
474 fn test_validate_table_option() {
475 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
476 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
477 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
478 assert!(validate_table_option(TTL_KEY));
479 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
480 assert!(validate_table_option(STORAGE_KEY));
481 assert!(!validate_table_option("foo"));
482 }
483
484 #[test]
485 fn test_serialize_table_options() {
486 let options = TableOptions {
487 write_buffer_size: None,
488 ttl: Some(Duration::from_secs(1000).into()),
489 extra_options: HashMap::new(),
490 skip_wal: false,
491 };
492 let serialized = serde_json::to_string(&options).unwrap();
493 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
494 assert_eq!(options, deserialized);
495 }
496
497 #[test]
498 fn test_convert_hashmap_between_table_options() {
499 let options = TableOptions {
500 write_buffer_size: Some(ReadableSize::mb(128)),
501 ttl: Some(Duration::from_secs(1000).into()),
502 extra_options: HashMap::new(),
503 skip_wal: false,
504 };
505 let serialized_map = HashMap::from(&options);
506 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
507 assert_eq!(options, serialized);
508
509 let options = TableOptions {
510 write_buffer_size: None,
511 ttl: Default::default(),
512 extra_options: HashMap::new(),
513 skip_wal: false,
514 };
515 let serialized_map = HashMap::from(&options);
516 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
517 assert_eq!(options, serialized);
518
519 let options = TableOptions {
520 write_buffer_size: Some(ReadableSize::mb(128)),
521 ttl: Some(Duration::from_secs(1000).into()),
522 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
523 skip_wal: false,
524 };
525 let serialized_map = HashMap::from(&options);
526 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
527 assert_eq!(options, serialized);
528 }
529
530 #[test]
531 fn test_table_options_to_string() {
532 let options = TableOptions {
533 write_buffer_size: Some(ReadableSize::mb(128)),
534 ttl: Some(Duration::from_secs(1000).into()),
535 extra_options: HashMap::new(),
536 skip_wal: false,
537 };
538
539 assert_eq!(
540 "write_buffer_size=128.0MiB ttl=16m 40s",
541 options.to_string()
542 );
543
544 let options = TableOptions {
545 write_buffer_size: Some(ReadableSize::mb(128)),
546 ttl: Some(Duration::from_secs(1000).into()),
547 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
548 skip_wal: false,
549 };
550
551 assert_eq!(
552 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
553 options.to_string()
554 );
555
556 let options = TableOptions {
557 write_buffer_size: Some(ReadableSize::mb(128)),
558 ttl: Some(Duration::from_secs(1000).into()),
559 extra_options: HashMap::new(),
560 skip_wal: true,
561 };
562 assert_eq!(
563 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
564 options.to_string()
565 );
566 }
567}