1use std::collections::HashMap;
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::oss::is_supported_in_oss;
23use common_datasource::object_store::s3::is_supported_in_s3;
24use common_query::AddColumnLocation;
25use common_time::range::TimestampRange;
26use common_time::TimeToLive;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::VectorRef;
29use datatypes::schema::{ColumnSchema, FulltextOptions, SkippingIndexOptions};
30use greptime_proto::v1::region::compact_request;
31use serde::{Deserialize, Serialize};
32use store_api::metric_engine_consts::{
33 is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
34};
35use store_api::mito_engine_options::is_mito_engine_option_key;
36use store_api::region_request::{SetRegionOption, UnsetRegionOption};
37
38use crate::error::{ParseTableOptionSnafu, Result};
39use crate::metadata::{TableId, TableVersion};
40use crate::table_reference::TableReference;
41
42pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
43pub const FILE_TABLE_LOCATION_KEY: &str = "location";
44pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
45pub const FILE_TABLE_FORMAT_KEY: &str = "format";
46
47pub const TABLE_DATA_MODEL: &str = "table_data_model";
48pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
49
50pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [
51 WRITE_BUFFER_SIZE_KEY,
53 TTL_KEY,
54 STORAGE_KEY,
55 COMMENT_KEY,
56 SKIP_WAL_KEY,
57 FILE_TABLE_LOCATION_KEY,
59 FILE_TABLE_FORMAT_KEY,
60 FILE_TABLE_PATTERN_KEY,
61 PHYSICAL_TABLE_METADATA_KEY,
63 LOGICAL_TABLE_METADATA_KEY,
64 TABLE_DATA_MODEL,
66];
67
68pub fn validate_table_option(key: &str) -> bool {
70 if is_supported_in_s3(key) {
71 return true;
72 }
73
74 if is_supported_in_oss(key) {
75 return true;
76 }
77
78 if is_mito_engine_option_key(key) {
79 return true;
80 }
81
82 if is_metric_engine_option_key(key) {
83 return true;
84 }
85
86 VALID_TABLE_OPTION_KEYS.contains(&key)
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
90#[serde(default)]
91pub struct TableOptions {
92 pub write_buffer_size: Option<ReadableSize>,
94 pub ttl: Option<TimeToLive>,
96 pub skip_wal: bool,
98 pub extra_options: HashMap<String, String>,
100}
101
102pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
103pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
104pub const STORAGE_KEY: &str = "storage";
105pub const COMMENT_KEY: &str = "comment";
106pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
107pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
108
109impl TableOptions {
110 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
111 iter: U,
112 ) -> Result<TableOptions> {
113 let mut options = TableOptions::default();
114
115 let kvs: HashMap<String, String> = iter
116 .into_iter()
117 .map(|(k, v)| (k.to_string(), v.to_string()))
118 .collect();
119
120 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
121 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
122 ParseTableOptionSnafu {
123 key: WRITE_BUFFER_SIZE_KEY,
124 value: write_buffer_size,
125 }
126 .build()
127 })?;
128 options.write_buffer_size = Some(size)
129 }
130
131 if let Some(ttl) = kvs.get(TTL_KEY) {
132 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
133 ParseTableOptionSnafu {
134 key: TTL_KEY,
135 value: ttl,
136 }
137 .build()
138 })?;
139 options.ttl = Some(ttl_value);
140 }
141
142 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
143 options.skip_wal = skip_wal.parse().map_err(|_| {
144 ParseTableOptionSnafu {
145 key: SKIP_WAL_KEY,
146 value: skip_wal,
147 }
148 .build()
149 })?;
150 }
151
152 options.extra_options = HashMap::from_iter(
153 kvs.into_iter()
154 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
155 );
156
157 Ok(options)
158 }
159}
160
161impl fmt::Display for TableOptions {
162 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163 let mut key_vals = vec![];
164 if let Some(size) = self.write_buffer_size {
165 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
166 }
167
168 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
169 key_vals.push(format!("{}={}", TTL_KEY, ttl));
170 }
171
172 if self.skip_wal {
173 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
174 }
175
176 for (k, v) in &self.extra_options {
177 key_vals.push(format!("{}={}", k, v));
178 }
179
180 write!(f, "{}", key_vals.join(" "))
181 }
182}
183
184impl From<&TableOptions> for HashMap<String, String> {
185 fn from(opts: &TableOptions) -> Self {
186 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
187 if let Some(write_buffer_size) = opts.write_buffer_size {
188 let _ = res.insert(
189 WRITE_BUFFER_SIZE_KEY.to_string(),
190 write_buffer_size.to_string(),
191 );
192 }
193 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
194 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
195 }
196 res.extend(
197 opts.extra_options
198 .iter()
199 .map(|(k, v)| (k.clone(), v.clone())),
200 );
201 res
202 }
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AlterTableRequest {
208 pub catalog_name: String,
209 pub schema_name: String,
210 pub table_name: String,
211 pub table_id: TableId,
212 pub alter_kind: AlterKind,
213 pub table_version: Option<TableVersion>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct AddColumnRequest {
220 pub column_schema: ColumnSchema,
221 pub is_key: bool,
222 pub location: Option<AddColumnLocation>,
223 pub add_if_not_exists: bool,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct ModifyColumnTypeRequest {
230 pub column_name: String,
231 pub target_type: ConcreteDataType,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub enum AlterKind {
236 AddColumns {
237 columns: Vec<AddColumnRequest>,
238 },
239 DropColumns {
240 names: Vec<String>,
241 },
242 ModifyColumnTypes {
243 columns: Vec<ModifyColumnTypeRequest>,
244 },
245 RenameTable {
246 new_table_name: String,
247 },
248 SetTableOptions {
249 options: Vec<SetRegionOption>,
250 },
251 UnsetTableOptions {
252 keys: Vec<UnsetRegionOption>,
253 },
254 SetIndex {
255 options: SetIndexOptions,
256 },
257 UnsetIndex {
258 options: UnsetIndexOptions,
259 },
260 DropDefaults {
261 names: Vec<String>,
262 },
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum SetIndexOptions {
267 Fulltext {
268 column_name: String,
269 options: FulltextOptions,
270 },
271 Inverted {
272 column_name: String,
273 },
274 Skipping {
275 column_name: String,
276 options: SkippingIndexOptions,
277 },
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub enum UnsetIndexOptions {
282 Fulltext { column_name: String },
283 Inverted { column_name: String },
284 Skipping { column_name: String },
285}
286
287#[derive(Debug)]
288pub struct InsertRequest {
289 pub catalog_name: String,
290 pub schema_name: String,
291 pub table_name: String,
292 pub columns_values: HashMap<String, VectorRef>,
293}
294
295#[derive(Debug)]
297pub struct DeleteRequest {
298 pub catalog_name: String,
299 pub schema_name: String,
300 pub table_name: String,
301 pub key_column_values: HashMap<String, VectorRef>,
305}
306
307#[derive(Debug)]
308pub enum CopyDirection {
309 Export,
310 Import,
311}
312
313#[derive(Debug)]
315pub struct CopyTableRequest {
316 pub catalog_name: String,
317 pub schema_name: String,
318 pub table_name: String,
319 pub location: String,
320 pub with: HashMap<String, String>,
321 pub connection: HashMap<String, String>,
322 pub pattern: Option<String>,
323 pub direction: CopyDirection,
324 pub timestamp_range: Option<TimestampRange>,
325 pub limit: Option<u64>,
326}
327
328#[derive(Debug, Clone, Default)]
329pub struct FlushTableRequest {
330 pub catalog_name: String,
331 pub schema_name: String,
332 pub table_name: String,
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub struct CompactTableRequest {
337 pub catalog_name: String,
338 pub schema_name: String,
339 pub table_name: String,
340 pub compact_options: compact_request::Options,
341}
342
343impl Default for CompactTableRequest {
344 fn default() -> Self {
345 Self {
346 catalog_name: Default::default(),
347 schema_name: Default::default(),
348 table_name: Default::default(),
349 compact_options: compact_request::Options::Regular(Default::default()),
350 }
351 }
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct TruncateTableRequest {
357 pub catalog_name: String,
358 pub schema_name: String,
359 pub table_name: String,
360 pub table_id: TableId,
361}
362
363impl TruncateTableRequest {
364 pub fn table_ref(&self) -> TableReference {
365 TableReference {
366 catalog: &self.catalog_name,
367 schema: &self.schema_name,
368 table: &self.table_name,
369 }
370 }
371}
372
373#[derive(Debug, Clone, Default, Deserialize, Serialize)]
374pub struct CopyDatabaseRequest {
375 pub catalog_name: String,
376 pub schema_name: String,
377 pub location: String,
378 pub with: HashMap<String, String>,
379 pub connection: HashMap<String, String>,
380 pub time_range: Option<TimestampRange>,
381}
382
383#[derive(Debug, Clone, Default, Deserialize, Serialize)]
384pub struct CopyQueryToRequest {
385 pub location: String,
386 pub with: HashMap<String, String>,
387 pub connection: HashMap<String, String>,
388}
389
390#[cfg(test)]
391mod tests {
392 use std::time::Duration;
393
394 use super::*;
395
396 #[test]
397 fn test_validate_table_option() {
398 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
399 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
400 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
401 assert!(validate_table_option(TTL_KEY));
402 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
403 assert!(validate_table_option(STORAGE_KEY));
404 assert!(!validate_table_option("foo"));
405 }
406
407 #[test]
408 fn test_serialize_table_options() {
409 let options = TableOptions {
410 write_buffer_size: None,
411 ttl: Some(Duration::from_secs(1000).into()),
412 extra_options: HashMap::new(),
413 skip_wal: false,
414 };
415 let serialized = serde_json::to_string(&options).unwrap();
416 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
417 assert_eq!(options, deserialized);
418 }
419
420 #[test]
421 fn test_convert_hashmap_between_table_options() {
422 let options = TableOptions {
423 write_buffer_size: Some(ReadableSize::mb(128)),
424 ttl: Some(Duration::from_secs(1000).into()),
425 extra_options: HashMap::new(),
426 skip_wal: false,
427 };
428 let serialized_map = HashMap::from(&options);
429 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
430 assert_eq!(options, serialized);
431
432 let options = TableOptions {
433 write_buffer_size: None,
434 ttl: Default::default(),
435 extra_options: HashMap::new(),
436 skip_wal: false,
437 };
438 let serialized_map = HashMap::from(&options);
439 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
440 assert_eq!(options, serialized);
441
442 let options = TableOptions {
443 write_buffer_size: Some(ReadableSize::mb(128)),
444 ttl: Some(Duration::from_secs(1000).into()),
445 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
446 skip_wal: false,
447 };
448 let serialized_map = HashMap::from(&options);
449 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
450 assert_eq!(options, serialized);
451 }
452
453 #[test]
454 fn test_table_options_to_string() {
455 let options = TableOptions {
456 write_buffer_size: Some(ReadableSize::mb(128)),
457 ttl: Some(Duration::from_secs(1000).into()),
458 extra_options: HashMap::new(),
459 skip_wal: false,
460 };
461
462 assert_eq!(
463 "write_buffer_size=128.0MiB ttl=16m 40s",
464 options.to_string()
465 );
466
467 let options = TableOptions {
468 write_buffer_size: Some(ReadableSize::mb(128)),
469 ttl: Some(Duration::from_secs(1000).into()),
470 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
471 skip_wal: false,
472 };
473
474 assert_eq!(
475 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
476 options.to_string()
477 );
478
479 let options = TableOptions {
480 write_buffer_size: Some(ReadableSize::mb(128)),
481 ttl: Some(Duration::from_secs(1000).into()),
482 extra_options: HashMap::new(),
483 skip_wal: true,
484 };
485 assert_eq!(
486 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
487 options.to_string()
488 );
489 }
490}