1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::range::TimestampRange;
26use common_time::TimeToLive;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{
30 ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions,
31};
32use greptime_proto::v1::region::compact_request;
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use store_api::metric_engine_consts::{
36 is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
37};
38use store_api::mito_engine_options::{
39 is_mito_engine_option_key, APPEND_MODE_KEY, COMPACTION_TYPE, MEMTABLE_TYPE, MERGE_MODE_KEY,
40 TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
41};
42use store_api::region_request::{SetRegionOption, UnsetRegionOption};
43
44use crate::error::{ParseTableOptionSnafu, Result};
45use crate::metadata::{TableId, TableVersion};
46use crate::table_reference::TableReference;
47
48pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
49pub const FILE_TABLE_LOCATION_KEY: &str = "location";
50pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
51pub const FILE_TABLE_FORMAT_KEY: &str = "format";
52
53pub const TABLE_DATA_MODEL: &str = "table_data_model";
54pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
55
56pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
57pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
58
59pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
60 WRITE_BUFFER_SIZE_KEY,
62 TTL_KEY,
63 STORAGE_KEY,
64 COMMENT_KEY,
65 SKIP_WAL_KEY,
66 FILE_TABLE_LOCATION_KEY,
68 FILE_TABLE_FORMAT_KEY,
69 FILE_TABLE_PATTERN_KEY,
70 PHYSICAL_TABLE_METADATA_KEY,
72 LOGICAL_TABLE_METADATA_KEY,
73 TABLE_DATA_MODEL,
75 OTLP_METRIC_COMPAT_KEY,
76];
77
78static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
80 let mut set = HashSet::new();
81 set.insert(TTL_KEY);
82 set.insert(STORAGE_KEY);
83 set.insert(MEMTABLE_TYPE);
84 set.insert(APPEND_MODE_KEY);
85 set.insert(MERGE_MODE_KEY);
86 set.insert(SKIP_WAL_KEY);
87 set.insert(COMPACTION_TYPE);
88 set.insert(TWCS_FALLBACK_TO_LOCAL);
89 set.insert(TWCS_TIME_WINDOW);
90 set.insert(TWCS_TRIGGER_FILE_NUM);
91 set.insert(TWCS_MAX_OUTPUT_FILE_SIZE);
92 set
93});
94
95pub fn validate_database_option(key: &str) -> bool {
97 VALID_DB_OPT_KEYS.contains(&key)
98}
99
100pub fn validate_table_option(key: &str) -> bool {
102 if is_supported_in_s3(key) {
103 return true;
104 }
105
106 if is_supported_in_oss(key) {
107 return true;
108 }
109
110 if is_mito_engine_option_key(key) {
111 return true;
112 }
113
114 if is_metric_engine_option_key(key) {
115 return true;
116 }
117
118 VALID_TABLE_OPTION_KEYS.contains(&key)
119}
120
121#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
122#[serde(default)]
123pub struct TableOptions {
124 pub write_buffer_size: Option<ReadableSize>,
126 pub ttl: Option<TimeToLive>,
128 pub skip_wal: bool,
130 pub extra_options: HashMap<String, String>,
132}
133
134pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
135pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
136pub const STORAGE_KEY: &str = "storage";
137pub const COMMENT_KEY: &str = "comment";
138pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
139pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
140
141impl TableOptions {
142 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
143 iter: U,
144 ) -> Result<TableOptions> {
145 let mut options = TableOptions::default();
146
147 let kvs: HashMap<String, String> = iter
148 .into_iter()
149 .map(|(k, v)| (k.to_string(), v.to_string()))
150 .collect();
151
152 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
153 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
154 ParseTableOptionSnafu {
155 key: WRITE_BUFFER_SIZE_KEY,
156 value: write_buffer_size,
157 }
158 .build()
159 })?;
160 options.write_buffer_size = Some(size)
161 }
162
163 if let Some(ttl) = kvs.get(TTL_KEY) {
164 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
165 ParseTableOptionSnafu {
166 key: TTL_KEY,
167 value: ttl,
168 }
169 .build()
170 })?;
171 options.ttl = Some(ttl_value);
172 }
173
174 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
175 options.skip_wal = skip_wal.parse().map_err(|_| {
176 ParseTableOptionSnafu {
177 key: SKIP_WAL_KEY,
178 value: skip_wal,
179 }
180 .build()
181 })?;
182 }
183
184 options.extra_options = HashMap::from_iter(
185 kvs.into_iter()
186 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
187 );
188
189 Ok(options)
190 }
191}
192
193impl fmt::Display for TableOptions {
194 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
195 let mut key_vals = vec![];
196 if let Some(size) = self.write_buffer_size {
197 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
198 }
199
200 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
201 key_vals.push(format!("{}={}", TTL_KEY, ttl));
202 }
203
204 if self.skip_wal {
205 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
206 }
207
208 for (k, v) in &self.extra_options {
209 key_vals.push(format!("{}={}", k, v));
210 }
211
212 write!(f, "{}", key_vals.join(" "))
213 }
214}
215
216impl From<&TableOptions> for HashMap<String, String> {
217 fn from(opts: &TableOptions) -> Self {
218 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
219 if let Some(write_buffer_size) = opts.write_buffer_size {
220 let _ = res.insert(
221 WRITE_BUFFER_SIZE_KEY.to_string(),
222 write_buffer_size.to_string(),
223 );
224 }
225 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
226 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
227 }
228 res.extend(
229 opts.extra_options
230 .iter()
231 .map(|(k, v)| (k.clone(), v.clone())),
232 );
233 res
234 }
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct AlterTableRequest {
240 pub catalog_name: String,
241 pub schema_name: String,
242 pub table_name: String,
243 pub table_id: TableId,
244 pub alter_kind: AlterKind,
245 pub table_version: Option<TableVersion>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct AddColumnRequest {
252 pub column_schema: ColumnSchema,
253 pub is_key: bool,
254 pub location: Option<AddColumnLocation>,
255 pub add_if_not_exists: bool,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ModifyColumnTypeRequest {
262 pub column_name: String,
263 pub target_type: ConcreteDataType,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub enum AlterKind {
268 AddColumns {
269 columns: Vec<AddColumnRequest>,
270 },
271 DropColumns {
272 names: Vec<String>,
273 },
274 ModifyColumnTypes {
275 columns: Vec<ModifyColumnTypeRequest>,
276 },
277 RenameTable {
278 new_table_name: String,
279 },
280 SetTableOptions {
281 options: Vec<SetRegionOption>,
282 },
283 UnsetTableOptions {
284 keys: Vec<UnsetRegionOption>,
285 },
286 SetIndexes {
287 options: Vec<SetIndexOption>,
288 },
289 UnsetIndexes {
290 options: Vec<UnsetIndexOption>,
291 },
292 DropDefaults {
293 names: Vec<String>,
294 },
295 SetDefaults {
296 defaults: Vec<SetDefaultRequest>,
297 },
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SetDefaultRequest {
302 pub column_name: String,
303 pub default_constraint: Option<ColumnDefaultConstraint>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub enum SetIndexOption {
308 Fulltext {
309 column_name: String,
310 options: FulltextOptions,
311 },
312 Inverted {
313 column_name: String,
314 },
315 Skipping {
316 column_name: String,
317 options: SkippingIndexOptions,
318 },
319}
320
321impl SetIndexOption {
322 pub fn column_name(&self) -> &str {
324 match self {
325 SetIndexOption::Fulltext { column_name, .. } => column_name,
326 SetIndexOption::Inverted { column_name, .. } => column_name,
327 SetIndexOption::Skipping { column_name, .. } => column_name,
328 }
329 }
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub enum UnsetIndexOption {
334 Fulltext { column_name: String },
335 Inverted { column_name: String },
336 Skipping { column_name: String },
337}
338
339impl UnsetIndexOption {
340 pub fn column_name(&self) -> &str {
342 match self {
343 UnsetIndexOption::Fulltext { column_name, .. } => column_name,
344 UnsetIndexOption::Inverted { column_name, .. } => column_name,
345 UnsetIndexOption::Skipping { column_name, .. } => column_name,
346 }
347 }
348}
349
350#[derive(Debug)]
351pub struct InsertRequest {
352 pub catalog_name: String,
353 pub schema_name: String,
354 pub table_name: String,
355 pub columns_values: HashMap<String, VectorRef>,
356}
357
358#[derive(Debug)]
360pub struct DeleteRequest {
361 pub catalog_name: String,
362 pub schema_name: String,
363 pub table_name: String,
364 pub key_column_values: HashMap<String, VectorRef>,
368}
369
370#[derive(Debug)]
371pub enum CopyDirection {
372 Export,
373 Import,
374}
375
376#[derive(Debug)]
378pub struct CopyTableRequest {
379 pub catalog_name: String,
380 pub schema_name: String,
381 pub table_name: String,
382 pub location: String,
383 pub with: HashMap<String, String>,
384 pub connection: HashMap<String, String>,
385 pub pattern: Option<String>,
386 pub direction: CopyDirection,
387 pub timestamp_range: Option<TimestampRange>,
388 pub limit: Option<u64>,
389}
390
391#[derive(Debug, Clone, Default)]
392pub struct FlushTableRequest {
393 pub catalog_name: String,
394 pub schema_name: String,
395 pub table_name: String,
396}
397
398#[derive(Debug, Clone, PartialEq)]
399pub struct CompactTableRequest {
400 pub catalog_name: String,
401 pub schema_name: String,
402 pub table_name: String,
403 pub compact_options: compact_request::Options,
404}
405
406impl Default for CompactTableRequest {
407 fn default() -> Self {
408 Self {
409 catalog_name: Default::default(),
410 schema_name: Default::default(),
411 table_name: Default::default(),
412 compact_options: compact_request::Options::Regular(Default::default()),
413 }
414 }
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
419pub struct TruncateTableRequest {
420 pub catalog_name: String,
421 pub schema_name: String,
422 pub table_name: String,
423 pub table_id: TableId,
424}
425
426impl TruncateTableRequest {
427 pub fn table_ref(&self) -> TableReference {
428 TableReference {
429 catalog: &self.catalog_name,
430 schema: &self.schema_name,
431 table: &self.table_name,
432 }
433 }
434}
435
436#[derive(Debug, Clone, Default, Deserialize, Serialize)]
437pub struct CopyDatabaseRequest {
438 pub catalog_name: String,
439 pub schema_name: String,
440 pub location: String,
441 pub with: HashMap<String, String>,
442 pub connection: HashMap<String, String>,
443 pub time_range: Option<TimestampRange>,
444}
445
446#[derive(Debug, Clone, Default, Deserialize, Serialize)]
447pub struct CopyQueryToRequest {
448 pub location: String,
449 pub with: HashMap<String, String>,
450 pub connection: HashMap<String, String>,
451}
452
453#[cfg(test)]
454mod tests {
455 use std::time::Duration;
456
457 use super::*;
458
459 #[test]
460 fn test_validate_table_option() {
461 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
462 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
463 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
464 assert!(validate_table_option(TTL_KEY));
465 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
466 assert!(validate_table_option(STORAGE_KEY));
467 assert!(!validate_table_option("foo"));
468 }
469
470 #[test]
471 fn test_serialize_table_options() {
472 let options = TableOptions {
473 write_buffer_size: None,
474 ttl: Some(Duration::from_secs(1000).into()),
475 extra_options: HashMap::new(),
476 skip_wal: false,
477 };
478 let serialized = serde_json::to_string(&options).unwrap();
479 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
480 assert_eq!(options, deserialized);
481 }
482
483 #[test]
484 fn test_convert_hashmap_between_table_options() {
485 let options = TableOptions {
486 write_buffer_size: Some(ReadableSize::mb(128)),
487 ttl: Some(Duration::from_secs(1000).into()),
488 extra_options: HashMap::new(),
489 skip_wal: false,
490 };
491 let serialized_map = HashMap::from(&options);
492 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
493 assert_eq!(options, serialized);
494
495 let options = TableOptions {
496 write_buffer_size: None,
497 ttl: Default::default(),
498 extra_options: HashMap::new(),
499 skip_wal: false,
500 };
501 let serialized_map = HashMap::from(&options);
502 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
503 assert_eq!(options, serialized);
504
505 let options = TableOptions {
506 write_buffer_size: Some(ReadableSize::mb(128)),
507 ttl: Some(Duration::from_secs(1000).into()),
508 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
509 skip_wal: false,
510 };
511 let serialized_map = HashMap::from(&options);
512 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
513 assert_eq!(options, serialized);
514 }
515
516 #[test]
517 fn test_table_options_to_string() {
518 let options = TableOptions {
519 write_buffer_size: Some(ReadableSize::mb(128)),
520 ttl: Some(Duration::from_secs(1000).into()),
521 extra_options: HashMap::new(),
522 skip_wal: false,
523 };
524
525 assert_eq!(
526 "write_buffer_size=128.0MiB ttl=16m 40s",
527 options.to_string()
528 );
529
530 let options = TableOptions {
531 write_buffer_size: Some(ReadableSize::mb(128)),
532 ttl: Some(Duration::from_secs(1000).into()),
533 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
534 skip_wal: false,
535 };
536
537 assert_eq!(
538 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
539 options.to_string()
540 );
541
542 let options = TableOptions {
543 write_buffer_size: Some(ReadableSize::mb(128)),
544 ttl: Some(Duration::from_secs(1000).into()),
545 extra_options: HashMap::new(),
546 skip_wal: true,
547 };
548 assert_eq!(
549 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
550 options.to_string()
551 );
552 }
553}