1use std::collections::HashMap;
18
19use arrow_schema::extension::ExtensionType;
20use common_meta::SchemaOptions;
21use datatypes::extension::json::JsonExtensionType;
22use datatypes::schema::{
23 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
24 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
25 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
26 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
27 ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
28};
29use snafu::ResultExt;
30use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
31use sql::dialect::GreptimeDbDialect;
32use sql::parser::ParserContext;
33use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
34use sql::statements::{self, OptionMap};
35use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
36use table::metadata::{TableInfoRef, TableMeta};
37use table::requests::{
38 COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY,
39};
40
41use crate::error::{
42 ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
43 GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
44};
45
46fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
48 let table_opts = &table_meta.options;
49 let mut options = OptionMap::default();
50 if let Some(write_buffer_size) = table_opts.write_buffer_size {
51 options.insert(
52 WRITE_BUFFER_SIZE_KEY.to_string(),
53 write_buffer_size.to_string(),
54 );
55 }
56 if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
57 options.insert(TTL_KEY.to_string(), ttl);
58 } else if let Some(database_ttl) = schema_options
59 .and_then(|o| o.ttl)
60 .map(|ttl| ttl.to_string())
61 {
62 options.insert(TTL_KEY.to_string(), database_ttl);
63 };
64 for (k, v) in table_opts
65 .extra_options
66 .iter()
67 .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
68 {
69 options.insert(k.clone(), v.clone());
70 }
71 options
72}
73
74#[inline]
75fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
76 ColumnOptionDef { name: None, option }
77}
78
79fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
80 let name = &column_schema.name;
81 let mut options = Vec::with_capacity(2);
82 let mut extensions = ColumnExtensions::default();
83
84 if column_schema.is_nullable() {
85 options.push(column_option_def(ColumnOption::Null));
86 } else {
87 options.push(column_option_def(ColumnOption::NotNull));
88 }
89
90 if let Some(c) = column_schema.default_constraint() {
91 let expr = match c {
92 ColumnDefaultConstraint::Value(v) => Expr::Value(
93 statements::value_to_sql_value(v)
94 .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?
95 .into(),
96 ),
97 ColumnDefaultConstraint::Function(expr) => {
98 ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
99 }
100 };
101
102 options.push(column_option_def(ColumnOption::Default(expr)));
103 }
104
105 if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
106 options.push(column_option_def(ColumnOption::Comment(c.clone())));
107 }
108
109 if let Some(opt) = column_schema
110 .fulltext_options()
111 .context(GetFulltextOptionsSnafu)?
112 && opt.enable
113 {
114 let mut map = HashMap::from([
115 (
116 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
117 opt.analyzer.to_string(),
118 ),
119 (
120 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
121 opt.case_sensitive.to_string(),
122 ),
123 (
124 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
125 opt.backend.to_string(),
126 ),
127 ]);
128 if opt.backend == FulltextBackend::Bloom {
129 map.insert(
130 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
131 opt.granularity.to_string(),
132 );
133 map.insert(
134 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
135 opt.false_positive_rate().to_string(),
136 );
137 }
138 extensions.fulltext_index_options = Some(map.into());
139 }
140
141 if let Some(opt) = column_schema
142 .skipping_index_options()
143 .context(GetSkippingIndexOptionsSnafu)?
144 {
145 let map = HashMap::from([
146 (
147 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
148 opt.granularity.to_string(),
149 ),
150 (
151 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
152 opt.false_positive_rate().to_string(),
153 ),
154 (
155 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
156 opt.index_type.to_string(),
157 ),
158 ]);
159 extensions.skipping_index_options = Some(map.into());
160 }
161
162 if column_schema.is_inverted_indexed() {
163 extensions.inverted_index_options = Some(HashMap::new().into());
164 }
165
166 if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
167 let settings = json_extension
168 .metadata()
169 .json_structure_settings
170 .clone()
171 .unwrap_or_default();
172 extensions.set_json_structure_settings(settings);
173 }
174
175 Ok(Column {
176 column_def: ColumnDef {
177 name: Ident::with_quote(quote_style, name),
178 data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
179 .with_context(|_| ConvertSqlTypeSnafu {
180 datatype: column_schema.data_type.clone(),
181 })?,
182 options,
183 },
184 extensions,
185 })
186}
187
188fn primary_key_columns_for_show_create<'a>(
192 table_meta: &'a TableMeta,
193 engine: &str,
194) -> Vec<&'a String> {
195 let is_metric_engine = is_metric_engine(engine);
196 if is_metric_engine {
197 table_meta
198 .row_key_column_names()
199 .filter(|name| !is_metric_engine_internal_column(name))
200 .collect()
201 } else {
202 table_meta.row_key_column_names().collect()
203 }
204}
205
206fn create_table_constraints(
207 engine: &str,
208 schema: &SchemaRef,
209 table_meta: &TableMeta,
210 quote_style: char,
211) -> Vec<TableConstraint> {
212 let mut constraints = Vec::with_capacity(2);
213 if let Some(timestamp_column) = schema.timestamp_column() {
214 let column_name = ×tamp_column.name;
215 constraints.push(TableConstraint::TimeIndex {
216 column: Ident::with_quote(quote_style, column_name),
217 });
218 }
219 if !table_meta.primary_key_indices.is_empty() {
220 let columns = primary_key_columns_for_show_create(table_meta, engine)
221 .into_iter()
222 .map(|name| Ident::with_quote(quote_style, name))
223 .collect();
224 constraints.push(TableConstraint::PrimaryKey { columns });
225 }
226
227 constraints
228}
229
230pub fn create_table_stmt(
232 table_info: &TableInfoRef,
233 schema_options: Option<SchemaOptions>,
234 quote_style: char,
235) -> Result<CreateTable> {
236 let table_meta = &table_info.meta;
237 let table_name = &table_info.name;
238 let schema = &table_info.meta.schema;
239 let is_metric_engine = is_metric_engine(&table_meta.engine);
240 let columns = schema
241 .column_schemas()
242 .iter()
243 .filter_map(|c| {
244 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
245 None
246 } else {
247 Some(create_column(c, quote_style))
248 }
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
253
254 let mut options = create_sql_options(table_meta, schema_options);
255 if let Some(comment) = &table_info.desc
256 && options.get(TABLE_COMMENT_KEY).is_none()
257 {
258 options.insert(format!("'{TABLE_COMMENT_KEY}'"), comment.clone());
259 }
260
261 Ok(CreateTable {
262 if_not_exists: true,
263 table_id: table_info.ident.table_id,
264 name: ObjectName::from(vec![Ident::with_quote(quote_style, table_name)]),
265 columns,
266 engine: table_meta.engine.clone(),
267 constraints,
268 options,
269 partitions: None,
270 })
271}
272
273#[cfg(test)]
274mod tests {
275 use std::sync::Arc;
276 use std::time::Duration;
277
278 use common_time::timestamp::TimeUnit;
279 use datatypes::prelude::ConcreteDataType;
280 use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
281 use table::metadata::*;
282 use table::requests::{
283 FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
284 };
285
286 use super::*;
287
288 #[test]
289 fn test_show_create_table_sql() {
290 let schema = vec![
291 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
292 .with_skipping_options(SkippingIndexOptions {
293 granularity: 4096,
294 ..Default::default()
295 })
296 .unwrap(),
297 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
298 .with_inverted_index(true),
299 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
300 ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
301 ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
302 .with_fulltext_options(FulltextOptions {
303 enable: true,
304 ..Default::default()
305 })
306 .unwrap(),
307 ColumnSchema::new(
308 "ts",
309 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
310 false,
311 )
312 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
313 "current_timestamp()",
314 ))))
315 .unwrap()
316 .with_time_index(true),
317 ];
318
319 let table_schema = SchemaRef::new(Schema::new(schema));
320 let table_name = "system_metrics";
321 let schema_name = "public".to_string();
322 let catalog_name = "greptime".to_string();
323 let regions = vec![0, 1, 2];
324
325 let mut options = table::requests::TableOptions {
326 ttl: Some(Duration::from_secs(30).into()),
327 ..Default::default()
328 };
329
330 let _ = options
331 .extra_options
332 .insert("compaction.type".to_string(), "twcs".to_string());
333
334 let meta = TableMetaBuilder::empty()
335 .schema(table_schema)
336 .primary_key_indices(vec![0, 1])
337 .value_indices(vec![2, 3])
338 .engine("mito".to_string())
339 .next_column_id(0)
340 .options(options)
341 .created_on(Default::default())
342 .region_numbers(regions)
343 .build()
344 .unwrap();
345
346 let info = Arc::new(
347 TableInfoBuilder::default()
348 .table_id(1024)
349 .table_version(0 as TableVersion)
350 .name(table_name)
351 .schema_name(schema_name)
352 .catalog_name(catalog_name)
353 .desc(None)
354 .table_type(TableType::Base)
355 .meta(meta)
356 .build()
357 .unwrap(),
358 );
359
360 let stmt = create_table_stmt(&info, None, '"').unwrap();
361
362 let sql = format!("\n{}", stmt);
363 assert_eq!(
364 r#"
365CREATE TABLE IF NOT EXISTS "system_metrics" (
366 "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
367 "host" STRING NULL INVERTED INDEX,
368 "cpu" DOUBLE NULL,
369 "disk" FLOAT NULL,
370 "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
371 "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
372 TIME INDEX ("ts"),
373 PRIMARY KEY ("id", "host")
374)
375ENGINE=mito
376WITH(
377 'compaction.type' = 'twcs',
378 ttl = '30s'
379)"#,
380 sql
381 );
382 }
383
384 #[test]
385 fn test_show_create_external_table_sql() {
386 let schema = vec![
387 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
388 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
389 ];
390 let table_schema = SchemaRef::new(Schema::new(schema));
391 let table_name = "system_metrics";
392 let schema_name = "public".to_string();
393 let catalog_name = "greptime".to_string();
394 let mut options: TableOptions = Default::default();
395 let _ = options
396 .extra_options
397 .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
398 let _ = options.extra_options.insert(
399 FILE_TABLE_META_KEY.to_string(),
400 "{{\"files\":[\"foo.csv\"]}}".to_string(),
401 );
402 let _ = options
403 .extra_options
404 .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
405 let meta = TableMetaBuilder::empty()
406 .schema(table_schema)
407 .primary_key_indices(vec![])
408 .engine("file".to_string())
409 .next_column_id(0)
410 .options(options)
411 .created_on(Default::default())
412 .build()
413 .unwrap();
414
415 let info = Arc::new(
416 TableInfoBuilder::default()
417 .table_id(1024)
418 .table_version(0 as TableVersion)
419 .name(table_name)
420 .schema_name(schema_name)
421 .catalog_name(catalog_name)
422 .desc(None)
423 .table_type(TableType::Base)
424 .meta(meta)
425 .build()
426 .unwrap(),
427 );
428
429 let stmt = create_table_stmt(&info, None, '"').unwrap();
430
431 let sql = format!("\n{}", stmt);
432 assert_eq!(
433 r#"
434CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
435 "host" STRING NULL,
436 "cpu" DOUBLE NULL,
437
438)
439ENGINE=file
440WITH(
441 format = 'csv',
442 location = 'foo.csv'
443)"#,
444 sql
445 );
446 }
447}