1use std::collections::HashMap;
18
19use arrow_schema::extension::ExtensionType;
20use common_meta::SchemaOptions;
21use datatypes::extension::json::JsonExtensionType;
22use datatypes::schema::{
23 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
24 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
25 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
26 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
27 ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
28};
29use snafu::ResultExt;
30use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
31use sql::dialect::GreptimeDbDialect;
32use sql::parser::ParserContext;
33use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
34use sql::statements::{self, OptionMap};
35use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
36use table::metadata::{TableInfoRef, TableMeta};
37use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
38
39use crate::error::{
40 ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
41 GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
42};
43
44fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
46 let table_opts = &table_meta.options;
47 let mut options = OptionMap::default();
48 if let Some(write_buffer_size) = table_opts.write_buffer_size {
49 options.insert(
50 WRITE_BUFFER_SIZE_KEY.to_string(),
51 write_buffer_size.to_string(),
52 );
53 }
54 if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
55 options.insert(TTL_KEY.to_string(), ttl);
56 } else if let Some(database_ttl) = schema_options
57 .and_then(|o| o.ttl)
58 .map(|ttl| ttl.to_string())
59 {
60 options.insert(TTL_KEY.to_string(), database_ttl);
61 };
62 for (k, v) in table_opts
63 .extra_options
64 .iter()
65 .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
66 {
67 options.insert(k.clone(), v.clone());
68 }
69 options
70}
71
72#[inline]
73fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
74 ColumnOptionDef { name: None, option }
75}
76
77fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
78 let name = &column_schema.name;
79 let mut options = Vec::with_capacity(2);
80 let mut extensions = ColumnExtensions::default();
81
82 if column_schema.is_nullable() {
83 options.push(column_option_def(ColumnOption::Null));
84 } else {
85 options.push(column_option_def(ColumnOption::NotNull));
86 }
87
88 if let Some(c) = column_schema.default_constraint() {
89 let expr = match c {
90 ColumnDefaultConstraint::Value(v) => Expr::Value(
91 statements::value_to_sql_value(v)
92 .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?
93 .into(),
94 ),
95 ColumnDefaultConstraint::Function(expr) => {
96 ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
97 }
98 };
99
100 options.push(column_option_def(ColumnOption::Default(expr)));
101 }
102
103 if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
104 options.push(column_option_def(ColumnOption::Comment(c.clone())));
105 }
106
107 if let Some(opt) = column_schema
108 .fulltext_options()
109 .context(GetFulltextOptionsSnafu)?
110 && opt.enable
111 {
112 let mut map = HashMap::from([
113 (
114 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
115 opt.analyzer.to_string(),
116 ),
117 (
118 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
119 opt.case_sensitive.to_string(),
120 ),
121 (
122 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
123 opt.backend.to_string(),
124 ),
125 ]);
126 if opt.backend == FulltextBackend::Bloom {
127 map.insert(
128 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
129 opt.granularity.to_string(),
130 );
131 map.insert(
132 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
133 opt.false_positive_rate().to_string(),
134 );
135 }
136 extensions.fulltext_index_options = Some(map.into());
137 }
138
139 if let Some(opt) = column_schema
140 .skipping_index_options()
141 .context(GetSkippingIndexOptionsSnafu)?
142 {
143 let map = HashMap::from([
144 (
145 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
146 opt.granularity.to_string(),
147 ),
148 (
149 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
150 opt.false_positive_rate().to_string(),
151 ),
152 (
153 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
154 opt.index_type.to_string(),
155 ),
156 ]);
157 extensions.skipping_index_options = Some(map.into());
158 }
159
160 if column_schema.is_inverted_indexed() {
161 extensions.inverted_index_options = Some(HashMap::new().into());
162 }
163
164 if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
165 let settings = json_extension
166 .metadata()
167 .json_structure_settings
168 .clone()
169 .unwrap_or_default();
170 extensions.set_json_structure_settings(settings);
171 }
172
173 Ok(Column {
174 column_def: ColumnDef {
175 name: Ident::with_quote(quote_style, name),
176 data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
177 .with_context(|_| ConvertSqlTypeSnafu {
178 datatype: column_schema.data_type.clone(),
179 })?,
180 options,
181 },
182 extensions,
183 })
184}
185
186fn primary_key_columns_for_show_create<'a>(
190 table_meta: &'a TableMeta,
191 engine: &str,
192) -> Vec<&'a String> {
193 let is_metric_engine = is_metric_engine(engine);
194 if is_metric_engine {
195 table_meta
196 .row_key_column_names()
197 .filter(|name| !is_metric_engine_internal_column(name))
198 .collect()
199 } else {
200 table_meta.row_key_column_names().collect()
201 }
202}
203
204fn create_table_constraints(
205 engine: &str,
206 schema: &SchemaRef,
207 table_meta: &TableMeta,
208 quote_style: char,
209) -> Vec<TableConstraint> {
210 let mut constraints = Vec::with_capacity(2);
211 if let Some(timestamp_column) = schema.timestamp_column() {
212 let column_name = ×tamp_column.name;
213 constraints.push(TableConstraint::TimeIndex {
214 column: Ident::with_quote(quote_style, column_name),
215 });
216 }
217 if !table_meta.primary_key_indices.is_empty() {
218 let columns = primary_key_columns_for_show_create(table_meta, engine)
219 .into_iter()
220 .map(|name| Ident::with_quote(quote_style, name))
221 .collect();
222 constraints.push(TableConstraint::PrimaryKey { columns });
223 }
224
225 constraints
226}
227
228pub fn create_table_stmt(
230 table_info: &TableInfoRef,
231 schema_options: Option<SchemaOptions>,
232 quote_style: char,
233) -> Result<CreateTable> {
234 let table_meta = &table_info.meta;
235 let table_name = &table_info.name;
236 let schema = &table_info.meta.schema;
237 let is_metric_engine = is_metric_engine(&table_meta.engine);
238 let columns = schema
239 .column_schemas()
240 .iter()
241 .filter_map(|c| {
242 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
243 None
244 } else {
245 Some(create_column(c, quote_style))
246 }
247 })
248 .collect::<Result<Vec<_>>>()?;
249
250 let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
251
252 Ok(CreateTable {
253 if_not_exists: true,
254 table_id: table_info.ident.table_id,
255 name: ObjectName::from(vec![Ident::with_quote(quote_style, table_name)]),
256 columns,
257 engine: table_meta.engine.clone(),
258 constraints,
259 options: create_sql_options(table_meta, schema_options),
260 partitions: None,
261 })
262}
263
264#[cfg(test)]
265mod tests {
266 use std::sync::Arc;
267 use std::time::Duration;
268
269 use common_time::timestamp::TimeUnit;
270 use datatypes::prelude::ConcreteDataType;
271 use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
272 use table::metadata::*;
273 use table::requests::{
274 FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
275 };
276
277 use super::*;
278
279 #[test]
280 fn test_show_create_table_sql() {
281 let schema = vec![
282 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
283 .with_skipping_options(SkippingIndexOptions {
284 granularity: 4096,
285 ..Default::default()
286 })
287 .unwrap(),
288 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
289 .with_inverted_index(true),
290 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
291 ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
292 ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
293 .with_fulltext_options(FulltextOptions {
294 enable: true,
295 ..Default::default()
296 })
297 .unwrap(),
298 ColumnSchema::new(
299 "ts",
300 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
301 false,
302 )
303 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
304 "current_timestamp()",
305 ))))
306 .unwrap()
307 .with_time_index(true),
308 ];
309
310 let table_schema = SchemaRef::new(Schema::new(schema));
311 let table_name = "system_metrics";
312 let schema_name = "public".to_string();
313 let catalog_name = "greptime".to_string();
314 let regions = vec![0, 1, 2];
315
316 let mut options = table::requests::TableOptions {
317 ttl: Some(Duration::from_secs(30).into()),
318 ..Default::default()
319 };
320
321 let _ = options
322 .extra_options
323 .insert("compaction.type".to_string(), "twcs".to_string());
324
325 let meta = TableMetaBuilder::empty()
326 .schema(table_schema)
327 .primary_key_indices(vec![0, 1])
328 .value_indices(vec![2, 3])
329 .engine("mito".to_string())
330 .next_column_id(0)
331 .options(options)
332 .created_on(Default::default())
333 .region_numbers(regions)
334 .build()
335 .unwrap();
336
337 let info = Arc::new(
338 TableInfoBuilder::default()
339 .table_id(1024)
340 .table_version(0 as TableVersion)
341 .name(table_name)
342 .schema_name(schema_name)
343 .catalog_name(catalog_name)
344 .desc(None)
345 .table_type(TableType::Base)
346 .meta(meta)
347 .build()
348 .unwrap(),
349 );
350
351 let stmt = create_table_stmt(&info, None, '"').unwrap();
352
353 let sql = format!("\n{}", stmt);
354 assert_eq!(
355 r#"
356CREATE TABLE IF NOT EXISTS "system_metrics" (
357 "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
358 "host" STRING NULL INVERTED INDEX,
359 "cpu" DOUBLE NULL,
360 "disk" FLOAT NULL,
361 "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
362 "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
363 TIME INDEX ("ts"),
364 PRIMARY KEY ("id", "host")
365)
366ENGINE=mito
367WITH(
368 'compaction.type' = 'twcs',
369 ttl = '30s'
370)"#,
371 sql
372 );
373 }
374
375 #[test]
376 fn test_show_create_external_table_sql() {
377 let schema = vec![
378 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
379 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
380 ];
381 let table_schema = SchemaRef::new(Schema::new(schema));
382 let table_name = "system_metrics";
383 let schema_name = "public".to_string();
384 let catalog_name = "greptime".to_string();
385 let mut options: TableOptions = Default::default();
386 let _ = options
387 .extra_options
388 .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
389 let _ = options.extra_options.insert(
390 FILE_TABLE_META_KEY.to_string(),
391 "{{\"files\":[\"foo.csv\"]}}".to_string(),
392 );
393 let _ = options
394 .extra_options
395 .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
396 let meta = TableMetaBuilder::empty()
397 .schema(table_schema)
398 .primary_key_indices(vec![])
399 .engine("file".to_string())
400 .next_column_id(0)
401 .options(options)
402 .created_on(Default::default())
403 .build()
404 .unwrap();
405
406 let info = Arc::new(
407 TableInfoBuilder::default()
408 .table_id(1024)
409 .table_version(0 as TableVersion)
410 .name(table_name)
411 .schema_name(schema_name)
412 .catalog_name(catalog_name)
413 .desc(None)
414 .table_type(TableType::Base)
415 .meta(meta)
416 .build()
417 .unwrap(),
418 );
419
420 let stmt = create_table_stmt(&info, None, '"').unwrap();
421
422 let sql = format!("\n{}", stmt);
423 assert_eq!(
424 r#"
425CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
426 "host" STRING NULL,
427 "cpu" DOUBLE NULL,
428
429)
430ENGINE=file
431WITH(
432 format = 'csv',
433 location = 'foo.csv'
434)"#,
435 sql
436 );
437 }
438}