1use std::collections::HashMap;
18
19use common_meta::SchemaOptions;
20use datatypes::schema::{
21 ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
22 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
23 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
24 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
25 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
26};
27use snafu::ResultExt;
28use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
29use sql::dialect::GreptimeDbDialect;
30use sql::parser::ParserContext;
31use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
32use sql::statements::{self, OptionMap};
33use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
34use table::metadata::{TableInfoRef, TableMeta};
35use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
36
37use crate::error::{
38 ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
39 GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
40};
41
42fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
44 let table_opts = &table_meta.options;
45 let mut options = OptionMap::default();
46 if let Some(write_buffer_size) = table_opts.write_buffer_size {
47 options.insert(
48 WRITE_BUFFER_SIZE_KEY.to_string(),
49 write_buffer_size.to_string(),
50 );
51 }
52 if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
53 options.insert(TTL_KEY.to_string(), ttl);
54 } else if let Some(database_ttl) = schema_options
55 .and_then(|o| o.ttl)
56 .map(|ttl| ttl.to_string())
57 {
58 options.insert(TTL_KEY.to_string(), database_ttl);
59 };
60 for (k, v) in table_opts
61 .extra_options
62 .iter()
63 .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
64 {
65 options.insert(k.to_string(), v.to_string());
66 }
67 options
68}
69
70#[inline]
71fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
72 ColumnOptionDef { name: None, option }
73}
74
75fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
76 let name = &column_schema.name;
77 let mut options = Vec::with_capacity(2);
78 let mut extensions = ColumnExtensions::default();
79
80 if column_schema.is_nullable() {
81 options.push(column_option_def(ColumnOption::Null));
82 } else {
83 options.push(column_option_def(ColumnOption::NotNull));
84 }
85
86 if let Some(c) = column_schema.default_constraint() {
87 let expr = match c {
88 ColumnDefaultConstraint::Value(v) => Expr::Value(
89 statements::value_to_sql_value(v)
90 .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?,
91 ),
92 ColumnDefaultConstraint::Function(expr) => {
93 ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
94 }
95 };
96
97 options.push(column_option_def(ColumnOption::Default(expr)));
98 }
99
100 if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
101 options.push(column_option_def(ColumnOption::Comment(c.to_string())));
102 }
103
104 if let Some(opt) = column_schema
105 .fulltext_options()
106 .context(GetFulltextOptionsSnafu)?
107 && opt.enable
108 {
109 let mut map = HashMap::from([
110 (
111 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
112 opt.analyzer.to_string(),
113 ),
114 (
115 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
116 opt.case_sensitive.to_string(),
117 ),
118 (
119 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
120 opt.backend.to_string(),
121 ),
122 ]);
123 if opt.backend == FulltextBackend::Bloom {
124 map.insert(
125 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
126 opt.granularity.to_string(),
127 );
128 map.insert(
129 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
130 opt.false_positive_rate().to_string(),
131 );
132 }
133 extensions.fulltext_index_options = Some(map.into());
134 }
135
136 if let Some(opt) = column_schema
137 .skipping_index_options()
138 .context(GetSkippingIndexOptionsSnafu)?
139 {
140 let map = HashMap::from([
141 (
142 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
143 opt.granularity.to_string(),
144 ),
145 (
146 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
147 opt.false_positive_rate().to_string(),
148 ),
149 (
150 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
151 opt.index_type.to_string(),
152 ),
153 ]);
154 extensions.skipping_index_options = Some(map.into());
155 }
156
157 if column_schema.is_inverted_indexed() {
158 extensions.inverted_index_options = Some(HashMap::new().into());
159 }
160
161 Ok(Column {
162 column_def: ColumnDef {
163 name: Ident::with_quote(quote_style, name),
164 data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
165 .with_context(|_| ConvertSqlTypeSnafu {
166 datatype: column_schema.data_type.clone(),
167 })?,
168 collation: None,
169 options,
170 },
171 extensions,
172 })
173}
174
175fn primary_key_columns_for_show_create<'a>(
179 table_meta: &'a TableMeta,
180 engine: &str,
181) -> Vec<&'a String> {
182 let is_metric_engine = is_metric_engine(engine);
183 if is_metric_engine {
184 table_meta
185 .row_key_column_names()
186 .filter(|name| !is_metric_engine_internal_column(name))
187 .collect()
188 } else {
189 table_meta.row_key_column_names().collect()
190 }
191}
192
193fn create_table_constraints(
194 engine: &str,
195 schema: &SchemaRef,
196 table_meta: &TableMeta,
197 quote_style: char,
198) -> Vec<TableConstraint> {
199 let mut constraints = Vec::with_capacity(2);
200 if let Some(timestamp_column) = schema.timestamp_column() {
201 let column_name = ×tamp_column.name;
202 constraints.push(TableConstraint::TimeIndex {
203 column: Ident::with_quote(quote_style, column_name),
204 });
205 }
206 if !table_meta.primary_key_indices.is_empty() {
207 let columns = primary_key_columns_for_show_create(table_meta, engine)
208 .into_iter()
209 .map(|name| Ident::with_quote(quote_style, name))
210 .collect();
211 constraints.push(TableConstraint::PrimaryKey { columns });
212 }
213
214 constraints
215}
216
217pub fn create_table_stmt(
219 table_info: &TableInfoRef,
220 schema_options: Option<SchemaOptions>,
221 quote_style: char,
222) -> Result<CreateTable> {
223 let table_meta = &table_info.meta;
224 let table_name = &table_info.name;
225 let schema = &table_info.meta.schema;
226 let is_metric_engine = is_metric_engine(&table_meta.engine);
227 let columns = schema
228 .column_schemas()
229 .iter()
230 .filter_map(|c| {
231 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
232 None
233 } else {
234 Some(create_column(c, quote_style))
235 }
236 })
237 .collect::<Result<Vec<_>>>()?;
238
239 let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
240
241 Ok(CreateTable {
242 if_not_exists: true,
243 table_id: table_info.ident.table_id,
244 name: ObjectName(vec![Ident::with_quote(quote_style, table_name)]),
245 columns,
246 engine: table_meta.engine.clone(),
247 constraints,
248 options: create_sql_options(table_meta, schema_options),
249 partitions: None,
250 })
251}
252
253#[cfg(test)]
254mod tests {
255 use std::sync::Arc;
256 use std::time::Duration;
257
258 use common_time::timestamp::TimeUnit;
259 use datatypes::prelude::ConcreteDataType;
260 use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
261 use table::metadata::*;
262 use table::requests::{
263 TableOptions, FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY,
264 };
265
266 use super::*;
267
268 #[test]
269 fn test_show_create_table_sql() {
270 let schema = vec![
271 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
272 .with_skipping_options(SkippingIndexOptions {
273 granularity: 4096,
274 ..Default::default()
275 })
276 .unwrap(),
277 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
278 .with_inverted_index(true),
279 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
280 ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
281 ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
282 .with_fulltext_options(FulltextOptions {
283 enable: true,
284 ..Default::default()
285 })
286 .unwrap(),
287 ColumnSchema::new(
288 "ts",
289 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
290 false,
291 )
292 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
293 "current_timestamp()",
294 ))))
295 .unwrap()
296 .with_time_index(true),
297 ];
298
299 let table_schema = SchemaRef::new(Schema::new(schema));
300 let table_name = "system_metrics";
301 let schema_name = "public".to_string();
302 let catalog_name = "greptime".to_string();
303 let regions = vec![0, 1, 2];
304
305 let mut options = table::requests::TableOptions {
306 ttl: Some(Duration::from_secs(30).into()),
307 ..Default::default()
308 };
309
310 let _ = options
311 .extra_options
312 .insert("compaction.type".to_string(), "twcs".to_string());
313
314 let meta = TableMetaBuilder::empty()
315 .schema(table_schema)
316 .primary_key_indices(vec![0, 1])
317 .value_indices(vec![2, 3])
318 .engine("mito".to_string())
319 .next_column_id(0)
320 .options(options)
321 .created_on(Default::default())
322 .region_numbers(regions)
323 .build()
324 .unwrap();
325
326 let info = Arc::new(
327 TableInfoBuilder::default()
328 .table_id(1024)
329 .table_version(0 as TableVersion)
330 .name(table_name)
331 .schema_name(schema_name)
332 .catalog_name(catalog_name)
333 .desc(None)
334 .table_type(TableType::Base)
335 .meta(meta)
336 .build()
337 .unwrap(),
338 );
339
340 let stmt = create_table_stmt(&info, None, '"').unwrap();
341
342 let sql = format!("\n{}", stmt);
343 assert_eq!(
344 r#"
345CREATE TABLE IF NOT EXISTS "system_metrics" (
346 "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
347 "host" STRING NULL INVERTED INDEX,
348 "cpu" DOUBLE NULL,
349 "disk" FLOAT NULL,
350 "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
351 "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
352 TIME INDEX ("ts"),
353 PRIMARY KEY ("id", "host")
354)
355ENGINE=mito
356WITH(
357 'compaction.type' = 'twcs',
358 ttl = '30s'
359)"#,
360 sql
361 );
362 }
363
364 #[test]
365 fn test_show_create_external_table_sql() {
366 let schema = vec![
367 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
368 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
369 ];
370 let table_schema = SchemaRef::new(Schema::new(schema));
371 let table_name = "system_metrics";
372 let schema_name = "public".to_string();
373 let catalog_name = "greptime".to_string();
374 let mut options: TableOptions = Default::default();
375 let _ = options
376 .extra_options
377 .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
378 let _ = options.extra_options.insert(
379 FILE_TABLE_META_KEY.to_string(),
380 "{{\"files\":[\"foo.csv\"]}}".to_string(),
381 );
382 let _ = options
383 .extra_options
384 .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
385 let meta = TableMetaBuilder::empty()
386 .schema(table_schema)
387 .primary_key_indices(vec![])
388 .engine("file".to_string())
389 .next_column_id(0)
390 .options(options)
391 .created_on(Default::default())
392 .build()
393 .unwrap();
394
395 let info = Arc::new(
396 TableInfoBuilder::default()
397 .table_id(1024)
398 .table_version(0 as TableVersion)
399 .name(table_name)
400 .schema_name(schema_name)
401 .catalog_name(catalog_name)
402 .desc(None)
403 .table_type(TableType::Base)
404 .meta(meta)
405 .build()
406 .unwrap(),
407 );
408
409 let stmt = create_table_stmt(&info, None, '"').unwrap();
410
411 let sql = format!("\n{}", stmt);
412 assert_eq!(
413 r#"
414CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
415 "host" STRING NULL,
416 "cpu" DOUBLE NULL,
417
418)
419ENGINE=file
420WITH(
421 format = 'csv',
422 location = 'foo.csv'
423)"#,
424 sql
425 );
426 }
427}