1use std::collections::HashMap;
18use std::fmt;
19use std::str::FromStr;
20
21use common_base::readable_size::ReadableSize;
22use common_datasource::object_store::s3::is_supported_in_s3;
23use common_query::AddColumnLocation;
24use common_time::range::TimestampRange;
25use common_time::TimeToLive;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::prelude::VectorRef;
28use datatypes::schema::{ColumnSchema, FulltextOptions, SkippingIndexOptions};
29use greptime_proto::v1::region::compact_request;
30use serde::{Deserialize, Serialize};
31use store_api::metric_engine_consts::{
32 is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
33};
34use store_api::mito_engine_options::is_mito_engine_option_key;
35use store_api::region_request::{SetRegionOption, UnsetRegionOption};
36
37use crate::error::{ParseTableOptionSnafu, Result};
38use crate::metadata::{TableId, TableVersion};
39use crate::table_reference::TableReference;
40
41pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
42pub const FILE_TABLE_LOCATION_KEY: &str = "location";
43pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
44pub const FILE_TABLE_FORMAT_KEY: &str = "format";
45
46pub const TABLE_DATA_MODEL: &str = "table_data_model";
47pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
48
49pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [
50 WRITE_BUFFER_SIZE_KEY,
52 TTL_KEY,
53 STORAGE_KEY,
54 COMMENT_KEY,
55 SKIP_WAL_KEY,
56 FILE_TABLE_LOCATION_KEY,
58 FILE_TABLE_FORMAT_KEY,
59 FILE_TABLE_PATTERN_KEY,
60 PHYSICAL_TABLE_METADATA_KEY,
62 LOGICAL_TABLE_METADATA_KEY,
63 TABLE_DATA_MODEL,
65];
66
67pub fn validate_table_option(key: &str) -> bool {
69 if is_supported_in_s3(key) {
70 return true;
71 }
72
73 if is_mito_engine_option_key(key) {
74 return true;
75 }
76
77 if is_metric_engine_option_key(key) {
78 return true;
79 }
80
81 VALID_TABLE_OPTION_KEYS.contains(&key)
82}
83
84#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct TableOptions {
87 pub write_buffer_size: Option<ReadableSize>,
89 pub ttl: Option<TimeToLive>,
91 pub skip_wal: bool,
93 pub extra_options: HashMap<String, String>,
95}
96
97pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
98pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
99pub const STORAGE_KEY: &str = "storage";
100pub const COMMENT_KEY: &str = "comment";
101pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
102pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
103
104impl TableOptions {
105 pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
106 iter: U,
107 ) -> Result<TableOptions> {
108 let mut options = TableOptions::default();
109
110 let kvs: HashMap<String, String> = iter
111 .into_iter()
112 .map(|(k, v)| (k.to_string(), v.to_string()))
113 .collect();
114
115 if let Some(write_buffer_size) = kvs.get(WRITE_BUFFER_SIZE_KEY) {
116 let size = ReadableSize::from_str(write_buffer_size).map_err(|_| {
117 ParseTableOptionSnafu {
118 key: WRITE_BUFFER_SIZE_KEY,
119 value: write_buffer_size,
120 }
121 .build()
122 })?;
123 options.write_buffer_size = Some(size)
124 }
125
126 if let Some(ttl) = kvs.get(TTL_KEY) {
127 let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
128 ParseTableOptionSnafu {
129 key: TTL_KEY,
130 value: ttl,
131 }
132 .build()
133 })?;
134 options.ttl = Some(ttl_value);
135 }
136
137 if let Some(skip_wal) = kvs.get(SKIP_WAL_KEY) {
138 options.skip_wal = skip_wal.parse().map_err(|_| {
139 ParseTableOptionSnafu {
140 key: SKIP_WAL_KEY,
141 value: skip_wal,
142 }
143 .build()
144 })?;
145 }
146
147 options.extra_options = HashMap::from_iter(
148 kvs.into_iter()
149 .filter(|(k, _)| k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY),
150 );
151
152 Ok(options)
153 }
154}
155
156impl fmt::Display for TableOptions {
157 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
158 let mut key_vals = vec![];
159 if let Some(size) = self.write_buffer_size {
160 key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
161 }
162
163 if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
164 key_vals.push(format!("{}={}", TTL_KEY, ttl));
165 }
166
167 if self.skip_wal {
168 key_vals.push(format!("{}={}", SKIP_WAL_KEY, self.skip_wal));
169 }
170
171 for (k, v) in &self.extra_options {
172 key_vals.push(format!("{}={}", k, v));
173 }
174
175 write!(f, "{}", key_vals.join(" "))
176 }
177}
178
179impl From<&TableOptions> for HashMap<String, String> {
180 fn from(opts: &TableOptions) -> Self {
181 let mut res = HashMap::with_capacity(2 + opts.extra_options.len());
182 if let Some(write_buffer_size) = opts.write_buffer_size {
183 let _ = res.insert(
184 WRITE_BUFFER_SIZE_KEY.to_string(),
185 write_buffer_size.to_string(),
186 );
187 }
188 if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
189 let _ = res.insert(TTL_KEY.to_string(), ttl_str);
190 }
191 res.extend(
192 opts.extra_options
193 .iter()
194 .map(|(k, v)| (k.clone(), v.clone())),
195 );
196 res
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct AlterTableRequest {
203 pub catalog_name: String,
204 pub schema_name: String,
205 pub table_name: String,
206 pub table_id: TableId,
207 pub alter_kind: AlterKind,
208 pub table_version: Option<TableVersion>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct AddColumnRequest {
215 pub column_schema: ColumnSchema,
216 pub is_key: bool,
217 pub location: Option<AddColumnLocation>,
218 pub add_if_not_exists: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct ModifyColumnTypeRequest {
225 pub column_name: String,
226 pub target_type: ConcreteDataType,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum AlterKind {
231 AddColumns {
232 columns: Vec<AddColumnRequest>,
233 },
234 DropColumns {
235 names: Vec<String>,
236 },
237 ModifyColumnTypes {
238 columns: Vec<ModifyColumnTypeRequest>,
239 },
240 RenameTable {
241 new_table_name: String,
242 },
243 SetTableOptions {
244 options: Vec<SetRegionOption>,
245 },
246 UnsetTableOptions {
247 keys: Vec<UnsetRegionOption>,
248 },
249 SetIndex {
250 options: SetIndexOptions,
251 },
252 UnsetIndex {
253 options: UnsetIndexOptions,
254 },
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub enum SetIndexOptions {
259 Fulltext {
260 column_name: String,
261 options: FulltextOptions,
262 },
263 Inverted {
264 column_name: String,
265 },
266 Skipping {
267 column_name: String,
268 options: SkippingIndexOptions,
269 },
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub enum UnsetIndexOptions {
274 Fulltext { column_name: String },
275 Inverted { column_name: String },
276 Skipping { column_name: String },
277}
278
279#[derive(Debug)]
280pub struct InsertRequest {
281 pub catalog_name: String,
282 pub schema_name: String,
283 pub table_name: String,
284 pub columns_values: HashMap<String, VectorRef>,
285}
286
287#[derive(Debug)]
289pub struct DeleteRequest {
290 pub catalog_name: String,
291 pub schema_name: String,
292 pub table_name: String,
293 pub key_column_values: HashMap<String, VectorRef>,
297}
298
299#[derive(Debug)]
300pub enum CopyDirection {
301 Export,
302 Import,
303}
304
305#[derive(Debug)]
307pub struct CopyTableRequest {
308 pub catalog_name: String,
309 pub schema_name: String,
310 pub table_name: String,
311 pub location: String,
312 pub with: HashMap<String, String>,
313 pub connection: HashMap<String, String>,
314 pub pattern: Option<String>,
315 pub direction: CopyDirection,
316 pub timestamp_range: Option<TimestampRange>,
317 pub limit: Option<u64>,
318}
319
320#[derive(Debug, Clone, Default)]
321pub struct FlushTableRequest {
322 pub catalog_name: String,
323 pub schema_name: String,
324 pub table_name: String,
325}
326
327#[derive(Debug, Clone, PartialEq)]
328pub struct CompactTableRequest {
329 pub catalog_name: String,
330 pub schema_name: String,
331 pub table_name: String,
332 pub compact_options: compact_request::Options,
333}
334
335impl Default for CompactTableRequest {
336 fn default() -> Self {
337 Self {
338 catalog_name: Default::default(),
339 schema_name: Default::default(),
340 table_name: Default::default(),
341 compact_options: compact_request::Options::Regular(Default::default()),
342 }
343 }
344}
345
346#[derive(Debug, Clone, Serialize, Deserialize)]
348pub struct TruncateTableRequest {
349 pub catalog_name: String,
350 pub schema_name: String,
351 pub table_name: String,
352 pub table_id: TableId,
353}
354
355impl TruncateTableRequest {
356 pub fn table_ref(&self) -> TableReference {
357 TableReference {
358 catalog: &self.catalog_name,
359 schema: &self.schema_name,
360 table: &self.table_name,
361 }
362 }
363}
364
365#[derive(Debug, Clone, Default, Deserialize, Serialize)]
366pub struct CopyDatabaseRequest {
367 pub catalog_name: String,
368 pub schema_name: String,
369 pub location: String,
370 pub with: HashMap<String, String>,
371 pub connection: HashMap<String, String>,
372 pub time_range: Option<TimestampRange>,
373}
374
375#[derive(Debug, Clone, Default, Deserialize, Serialize)]
376pub struct CopyQueryToRequest {
377 pub location: String,
378 pub with: HashMap<String, String>,
379 pub connection: HashMap<String, String>,
380}
381
382#[cfg(test)]
383mod tests {
384 use std::time::Duration;
385
386 use super::*;
387
388 #[test]
389 fn test_validate_table_option() {
390 assert!(validate_table_option(FILE_TABLE_LOCATION_KEY));
391 assert!(validate_table_option(FILE_TABLE_FORMAT_KEY));
392 assert!(validate_table_option(FILE_TABLE_PATTERN_KEY));
393 assert!(validate_table_option(TTL_KEY));
394 assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
395 assert!(validate_table_option(STORAGE_KEY));
396 assert!(!validate_table_option("foo"));
397 }
398
399 #[test]
400 fn test_serialize_table_options() {
401 let options = TableOptions {
402 write_buffer_size: None,
403 ttl: Some(Duration::from_secs(1000).into()),
404 extra_options: HashMap::new(),
405 skip_wal: false,
406 };
407 let serialized = serde_json::to_string(&options).unwrap();
408 let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
409 assert_eq!(options, deserialized);
410 }
411
412 #[test]
413 fn test_convert_hashmap_between_table_options() {
414 let options = TableOptions {
415 write_buffer_size: Some(ReadableSize::mb(128)),
416 ttl: Some(Duration::from_secs(1000).into()),
417 extra_options: HashMap::new(),
418 skip_wal: false,
419 };
420 let serialized_map = HashMap::from(&options);
421 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
422 assert_eq!(options, serialized);
423
424 let options = TableOptions {
425 write_buffer_size: None,
426 ttl: Default::default(),
427 extra_options: HashMap::new(),
428 skip_wal: false,
429 };
430 let serialized_map = HashMap::from(&options);
431 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
432 assert_eq!(options, serialized);
433
434 let options = TableOptions {
435 write_buffer_size: Some(ReadableSize::mb(128)),
436 ttl: Some(Duration::from_secs(1000).into()),
437 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
438 skip_wal: false,
439 };
440 let serialized_map = HashMap::from(&options);
441 let serialized = TableOptions::try_from_iter(&serialized_map).unwrap();
442 assert_eq!(options, serialized);
443 }
444
445 #[test]
446 fn test_table_options_to_string() {
447 let options = TableOptions {
448 write_buffer_size: Some(ReadableSize::mb(128)),
449 ttl: Some(Duration::from_secs(1000).into()),
450 extra_options: HashMap::new(),
451 skip_wal: false,
452 };
453
454 assert_eq!(
455 "write_buffer_size=128.0MiB ttl=16m 40s",
456 options.to_string()
457 );
458
459 let options = TableOptions {
460 write_buffer_size: Some(ReadableSize::mb(128)),
461 ttl: Some(Duration::from_secs(1000).into()),
462 extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
463 skip_wal: false,
464 };
465
466 assert_eq!(
467 "write_buffer_size=128.0MiB ttl=16m 40s a=A",
468 options.to_string()
469 );
470
471 let options = TableOptions {
472 write_buffer_size: Some(ReadableSize::mb(128)),
473 ttl: Some(Duration::from_secs(1000).into()),
474 extra_options: HashMap::new(),
475 skip_wal: true,
476 };
477 assert_eq!(
478 "write_buffer_size=128.0MiB ttl=16m 40s skip_wal=true",
479 options.to_string()
480 );
481 }
482}