1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::TimeToLive;
26use common_time::range::TimestampRange;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30 ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36 LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, is_metric_engine_option_key,
37};
38use store_api::mito_engine_options::{
39 APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY, SST_FORMAT_KEY,
40 TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
41 is_mito_engine_option_key,
42};
43use store_api::region_request::{SetRegionOption, UnsetRegionOption};
44
45use crate::error::{ParseTableOptionSnafu, Result};
46use crate::metadata::{TableId, TableVersion};
47use crate::table_reference::TableReference;
48
49pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
50pub const FILE_TABLE_LOCATION_KEY: &str = "location";
51pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
52pub const FILE_TABLE_FORMAT_KEY: &str = "format";
53
54pub const TABLE_DATA_MODEL: &str = "table_data_model";
55pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
56
57pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
58pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
59
60pub const VALID_TABLE_OPTION_KEYS: [&str; 13] = [
61 WRITE_BUFFER_SIZE_KEY,
63 TTL_KEY,
64 STORAGE_KEY,
65 COMMENT_KEY,
66 SKIP_WAL_KEY,
67 SST_FORMAT_KEY,
68 FILE_TABLE_LOCATION_KEY,
70 FILE_TABLE_FORMAT_KEY,
71 FILE_TABLE_PATTERN_KEY,
72 PHYSICAL_TABLE_METADATA_KEY,
74 LOGICAL_TABLE_METADATA_KEY,
75 TABLE_DATA_MODEL,
77 OTLP_METRIC_COMPAT_KEY,
78];
79
80pub const DDL_TIMEOUT: &str = "timeout";
81pub const DDL_WAIT: &str = "wait";
82
83pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
84
85static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
87 let mut set = HashSet::new();
88 set.insert(TTL_KEY);
89 set.insert(STORAGE_KEY);
90 set.insert(MEMTABLE_TYPE);
91 set.insert(APPEND_MODE_KEY);
92 set.insert(MERGE_MODE_KEY);
93 set.insert(SKIP_WAL_KEY);
94 set.insert(COMPACTION_TYPE);
95 set.insert(TWCS_FALLBACK_TO_LOCAL);
96 set.insert(TWCS_TIME_WINDOW);
97 set.insert(TWCS_TRIGGER_FILE_NUM);
98 set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
99 set.insert(SST_FORMAT_KEY);
100 set
101});
102
103pub fn validate_database_option(key: &str) -> bool {
105 VALID_DB_OPT_KEYS.contains(&key)
106}
107
108pub fn validate_table_option(key: &str) -> bool {
110 if is_supported_in_s3(key) {
111 return true;
112 }
113
114 if is_supported_in_oss(key) {
115 return true;
116 }
117
118 if is_mito_engine_option_key(key) {
119 return true;
120 }
121
122 if is_metric_engine_option_key(key) {
123 return true;
124 }
125
126 VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
127}
128
129#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
130#[serde(default)]
131pub struct TableOptions {
132 pub write_buffer_size: Option<ReadableSize>,
134 pub ttl: Option<TimeToLive>,
136 pub skip_wal: bool,
138 pub extra_options: HashMap<String, String>,
140}
141
142pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
143pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
144pub const STORAGE_KEY: &str = "storage";
145pub const COMMENT_KEY: &str = "comment";
146pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
147pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
148pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
149
150impl TableOptions {
151 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
152 iter: U,
153 ) -> Result<TableOptions> {
154 let mut options = TableOptions::default();
155
156 let kvs: HashMap<String, String> = iter
157 .into_iter()
158 .map(|(k, v)| (k.to_string(), v.to_string()))
159 .collect();
160
161 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
162 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
163 ParseTableOptionSnafu {
164 key: WRITE_BUFFER_SIZE_KEY,
165 value: write_buffer_size,
166 }
167 .build()
168 })?;
169 options.write_buffer_size = Some(size)
170 }
171
172 if let Some(ttl) = kvs.get(TTL_KEY) {
173 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
174 ParseTableOptionSnafu {
175 key: TTL_KEY,
176 value: ttl,
177 }
178 .build()
179 })?;
180 options.ttl = Some(ttl_value);
181 }
182
183 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
184 options.skip_wal = skip_wal.parse().map_err(|_| {
185 ParseTableOptionSnafu {
186 key: SKIP_WAL_KEY,
187 value: skip_wal,
188 }
189 .build()
190 })?;
191 }
192
193 options.extra_options = HashMap::from_iter(
194 kvs.into_iter()
195 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
196 );
197
198 Ok(options)
199 }
200}
201
202impl fmt::Display for TableOptions {
203 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
204 let mut key_vals = vec![];
205 if let Some(size) = self.write_buffer_size {
206 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
207 }
208
209 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
210 key_vals.push(format!("{}={}", TTL_KEY, ttl));
211 }
212
213 if self.skip_wal {
214 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
215 }
216
217 for (k, v) in &self.extra_options {
218 key_vals.push(format!("{}={}", k, v));
219 }
220
221 write!(f, "{}", key_vals.join(" "))
222 }
223}
224
225impl From<&TableOptions> for HashMap<String, String> {
226 fn from(opts: &TableOptions) -> Self {
227 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
228 if let Some(write_buffer_size) = opts.write_buffer_size {
229 let _ = res.insert(
230 WRITE_BUFFER_SIZE_KEY.to_string(),
231 write_buffer_size.to_string(),
232 );
233 }
234 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
235 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
236 }
237 res.extend(
238 opts.extra_options
239 .iter()
240 .map(|(k, v)| (k.clone(), v.clone())),
241 );
242 res
243 }
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct AlterTableRequest {
249 pub catalog_name: String,
250 pub schema_name: String,
251 pub table_name: String,
252 pub table_id: TableId,
253 pub alter_kind: AlterKind,
254 pub table_version: Option<TableVersion>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct AddColumnRequest {
261 pub column_schema: ColumnSchema,
262 pub is_key: bool,
263 pub location: Option<AddColumnLocation>,
264 pub add_if_not_exists: bool,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct ModifyColumnTypeRequest {
271 pub column_name: String,
272 pub target_type: ConcreteDataType,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub enum AlterKind {
277 AddColumns {
278 columns: Vec<AddColumnRequest>,
279 },
280 DropColumns {
281 names: Vec<String>,
282 },
283 ModifyColumnTypes {
284 columns: Vec<ModifyColumnTypeRequest>,
285 },
286 RenameTable {
287 new_table_name: String,
288 },
289 SetTableOptions {
290 options: Vec<SetRegionOption>,
291 },
292 UnsetTableOptions {
293 keys: Vec<UnsetRegionOption>,
294 },
295 SetIndexes {
296 options: Vec<SetIndexOption>,
297 },
298 UnsetIndexes {
299 options: Vec<UnsetIndexOption>,
300 },
301 DropDefaults {
302 names: Vec<String>,
303 },
304 SetDefaults {
305 defaults: Vec<SetDefaultRequest>,
306 },
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SetDefaultRequest {
311 pub column_name: String,
312 pub default_constraint: Option<ColumnDefaultConstraint>,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub enum SetIndexOption {
317 Fulltext {
318 column_name: String,
319 options: FulltextOptions,
320 },
321 Inverted {
322 column_name: String,
323 },
324 Skipping {
325 column_name: String,
326 options: SkippingIndexOptions,
327 },
328}
329
330impl SetIndexOption {
331 pub fn column_name(&self) -> &str {
333 match self {
334 SetIndexOption::Fulltext { column_name, .. } => column_name,
335 SetIndexOption::Inverted { column_name, .. } => column_name,
336 SetIndexOption::Skipping { column_name, .. } => column_name,
337 }
338 }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub enum UnsetIndexOption {
343 Fulltext { column_name: String },
344 Inverted { column_name: String },
345 Skipping { column_name: String },
346}
347
348impl UnsetIndexOption {
349 pub fn column_name(&self) -> &str {
351 match self {
352 UnsetIndexOption::Fulltext { column_name, .. } => column_name,
353 UnsetIndexOption::Inverted { column_name, .. } => column_name,
354 UnsetIndexOption::Skipping { column_name, .. } => column_name,
355 }
356 }
357}
358
359#[derive(Debug)]
360pub struct InsertRequest {
361 pub catalog_name: String,
362 pub schema_name: String,
363 pub table_name: String,
364 pub columns_values: HashMap<String, VectorRef>,
365}
366
367#[derive(Debug)]
369pub struct DeleteRequest {
370 pub catalog_name: String,
371 pub schema_name: String,
372 pub table_name: String,
373 pub key_column_values: HashMap<String, VectorRef>,
377}
378
379#[derive(Debug)]
380pub enum CopyDirection {
381 Export,
382 Import,
383}
384
385#[derive(Debug)]
387pub struct CopyTableRequest {
388 pub catalog_name: String,
389 pub schema_name: String,
390 pub table_name: String,
391 pub location: String,
392 pub with: HashMap<String, String>,
393 pub connection: HashMap<String, String>,
394 pub pattern: Option<String>,
395 pub direction: CopyDirection,
396 pub timestamp_range: Option<TimestampRange>,
397 pub limit: Option<u64>,
398}
399
400#[derive(Debug, Clone, Default)]
401pub struct FlushTableRequest {
402 pub catalog_name: String,
403 pub schema_name: String,
404 pub table_name: String,
405}
406
407#[derive(Debug, Clone, Default)]
408pub struct BuildIndexTableRequest {
409 pub catalog_name: String,
410 pub schema_name: String,
411 pub table_name: String,
412}
413
414#[derive(Debug, Clone, PartialEq)]
415pub struct CompactTableRequest {
416 pub catalog_name: String,
417 pub schema_name: String,
418 pub table_name: String,
419 pub compact_options: compact_request::Options,
420 pub parallelism: u32,
421}
422
423impl Default for CompactTableRequest {
424 fn default() -> Self {
425 Self {
426 catalog_name: Default::default(),
427 schema_name: Default::default(),
428 table_name: Default::default(),
429 compact_options: compact_request::Options::Regular(Default::default()),
430 parallelism: 1,
431 }
432 }
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct TruncateTableRequest {
438 pub catalog_name: String,
439 pub schema_name: String,
440 pub table_name: String,
441 pub table_id: TableId,
442}
443
444impl TruncateTableRequest {
445 pub fn table_ref(&self) -> TableReference<'_> {
446 TableReference {
447 catalog: &self.catalog_name,
448 schema: &self.schema_name,
449 table: &self.table_name,
450 }
451 }
452}
453
454#[derive(Debug, Clone, Default, Deserialize, Serialize)]
455pub struct CopyDatabaseRequest {
456 pub catalog_name: String,
457 pub schema_name: String,
458 pub location: String,
459 pub with: HashMap<String, String>,
460 pub connection: HashMap<String, String>,
461 pub time_range: Option<TimestampRange>,
462}
463
464#[derive(Debug, Clone, Default, Deserialize, Serialize)]
465pub struct CopyQueryToRequest {
466 pub location: String,
467 pub with: HashMap<String, String>,
468 pub connection: HashMap<String, String>,
469}
470
471#[cfg(test)]
472mod tests {
473 use std::time::Duration;
474
475 use super::*;
476
477 #[test]
478 fn test_validate_table_option() {
479 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
480 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
481 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
482 assert!(validate_table_option(TTL_KEY));
483 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
484 assert!(validate_table_option(STORAGE_KEY));
485 assert!(!validate_table_option("foo"));
486 }
487
488 #[test]
489 fn test_serialize_table_options() {
490 let options = TableOptions {
491 write_buffer_size: None,
492 ttl: Some(Duration::from_secs(1000).into()),
493 extra_options: HashMap::new(),
494 skip_wal: false,
495 };
496 let serialized = serde_json::to_string(&options).unwrap();
497 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
498 assert_eq!(options, deserialized);
499 }
500
501 #[test]
502 fn test_convert_hashmap_between_table_options() {
503 let options = TableOptions {
504 write_buffer_size: Some(ReadableSize::mb(128)),
505 ttl: Some(Duration::from_secs(1000).into()),
506 extra_options: HashMap::new(),
507 skip_wal: false,
508 };
509 let serialized_map = HashMap::from(&options);
510 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
511 assert_eq!(options, serialized);
512
513 let options = TableOptions {
514 write_buffer_size: None,
515 ttl: Default::default(),
516 extra_options: HashMap::new(),
517 skip_wal: false,
518 };
519 let serialized_map = HashMap::from(&options);
520 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
521 assert_eq!(options, serialized);
522
523 let options = TableOptions {
524 write_buffer_size: Some(ReadableSize::mb(128)),
525 ttl: Some(Duration::from_secs(1000).into()),
526 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
527 skip_wal: false,
528 };
529 let serialized_map = HashMap::from(&options);
530 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
531 assert_eq!(options, serialized);
532 }
533
534 #[test]
535 fn test_table_options_to_string() {
536 let options = TableOptions {
537 write_buffer_size: Some(ReadableSize::mb(128)),
538 ttl: Some(Duration::from_secs(1000).into()),
539 extra_options: HashMap::new(),
540 skip_wal: false,
541 };
542
543 assert_eq!(
544 "write_buffer_size=128.0MiB ttl=16m 40s",
545 options.to_string()
546 );
547
548 let options = TableOptions {
549 write_buffer_size: Some(ReadableSize::mb(128)),
550 ttl: Some(Duration::from_secs(1000).into()),
551 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
552 skip_wal: false,
553 };
554
555 assert_eq!(
556 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
557 options.to_string()
558 );
559
560 let options = TableOptions {
561 write_buffer_size: Some(ReadableSize::mb(128)),
562 ttl: Some(Duration::from_secs(1000).into()),
563 extra_options: HashMap::new(),
564 skip_wal: true,
565 };
566 assert_eq!(
567 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
568 options.to_string()
569 );
570 }
571}