1use std::collections::HashMap;
18
19use common_meta::SchemaOptions;
20use datatypes::schema::{
21 ColumnDefaultConstraint, ColumnSchema, SchemaRef, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
22 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
23 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
24};
25use snafu::ResultExt;
26use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
27use sql::dialect::GreptimeDbDialect;
28use sql::parser::ParserContext;
29use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
30use sql::statements::{self, OptionMap};
31use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
32use table::metadata::{TableInfoRef, TableMeta};
33use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
34
35use crate::error::{
36 ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
37 GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
38};
39
40fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
42 let table_opts = &table_meta.options;
43 let mut options = OptionMap::default();
44 if let Some(write_buffer_size) = table_opts.write_buffer_size {
45 options.insert(
46 WRITE_BUFFER_SIZE_KEY.to_string(),
47 write_buffer_size.to_string(),
48 );
49 }
50 if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
51 options.insert(TTL_KEY.to_string(), ttl);
52 } else if let Some(database_ttl) = schema_options
53 .and_then(|o| o.ttl)
54 .map(|ttl| ttl.to_string())
55 {
56 options.insert(TTL_KEY.to_string(), database_ttl);
57 };
58 for (k, v) in table_opts
59 .extra_options
60 .iter()
61 .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
62 {
63 options.insert(k.to_string(), v.to_string());
64 }
65 options
66}
67
68#[inline]
69fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
70 ColumnOptionDef { name: None, option }
71}
72
73fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
74 let name = &column_schema.name;
75 let mut options = Vec::with_capacity(2);
76 let mut extensions = ColumnExtensions::default();
77
78 if column_schema.is_nullable() {
79 options.push(column_option_def(ColumnOption::Null));
80 } else {
81 options.push(column_option_def(ColumnOption::NotNull));
82 }
83
84 if let Some(c) = column_schema.default_constraint() {
85 let expr = match c {
86 ColumnDefaultConstraint::Value(v) => Expr::Value(
87 statements::value_to_sql_value(v)
88 .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?,
89 ),
90 ColumnDefaultConstraint::Function(expr) => {
91 ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
92 }
93 };
94
95 options.push(column_option_def(ColumnOption::Default(expr)));
96 }
97
98 if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
99 options.push(column_option_def(ColumnOption::Comment(c.to_string())));
100 }
101
102 if let Some(opt) = column_schema
103 .fulltext_options()
104 .context(GetFulltextOptionsSnafu)?
105 && opt.enable
106 {
107 let map = HashMap::from([
108 (
109 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
110 opt.analyzer.to_string(),
111 ),
112 (
113 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
114 opt.case_sensitive.to_string(),
115 ),
116 (
117 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
118 opt.backend.to_string(),
119 ),
120 ]);
121 extensions.fulltext_index_options = Some(map.into());
122 }
123
124 if let Some(opt) = column_schema
125 .skipping_index_options()
126 .context(GetSkippingIndexOptionsSnafu)?
127 {
128 let map = HashMap::from([
129 (
130 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
131 opt.granularity.to_string(),
132 ),
133 (
134 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
135 opt.index_type.to_string(),
136 ),
137 ]);
138 extensions.skipping_index_options = Some(map.into());
139 }
140
141 if column_schema.is_inverted_indexed() {
142 extensions.inverted_index_options = Some(HashMap::new().into());
143 }
144
145 Ok(Column {
146 column_def: ColumnDef {
147 name: Ident::with_quote(quote_style, name),
148 data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
149 .with_context(|_| ConvertSqlTypeSnafu {
150 datatype: column_schema.data_type.clone(),
151 })?,
152 collation: None,
153 options,
154 },
155 extensions,
156 })
157}
158
159fn primary_key_columns_for_show_create<'a>(
163 table_meta: &'a TableMeta,
164 engine: &str,
165) -> Vec<&'a String> {
166 let is_metric_engine = is_metric_engine(engine);
167 if is_metric_engine {
168 table_meta
169 .row_key_column_names()
170 .filter(|name| !is_metric_engine_internal_column(name))
171 .collect()
172 } else {
173 table_meta.row_key_column_names().collect()
174 }
175}
176
177fn create_table_constraints(
178 engine: &str,
179 schema: &SchemaRef,
180 table_meta: &TableMeta,
181 quote_style: char,
182) -> Vec<TableConstraint> {
183 let mut constraints = Vec::with_capacity(2);
184 if let Some(timestamp_column) = schema.timestamp_column() {
185 let column_name = ×tamp_column.name;
186 constraints.push(TableConstraint::TimeIndex {
187 column: Ident::with_quote(quote_style, column_name),
188 });
189 }
190 if !table_meta.primary_key_indices.is_empty() {
191 let columns = primary_key_columns_for_show_create(table_meta, engine)
192 .into_iter()
193 .map(|name| Ident::with_quote(quote_style, name))
194 .collect();
195 constraints.push(TableConstraint::PrimaryKey { columns });
196 }
197
198 constraints
199}
200
201pub fn create_table_stmt(
203 table_info: &TableInfoRef,
204 schema_options: Option<SchemaOptions>,
205 quote_style: char,
206) -> Result<CreateTable> {
207 let table_meta = &table_info.meta;
208 let table_name = &table_info.name;
209 let schema = &table_info.meta.schema;
210 let is_metric_engine = is_metric_engine(&table_meta.engine);
211 let columns = schema
212 .column_schemas()
213 .iter()
214 .filter_map(|c| {
215 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
216 None
217 } else {
218 Some(create_column(c, quote_style))
219 }
220 })
221 .collect::<Result<Vec<_>>>()?;
222
223 let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
224
225 Ok(CreateTable {
226 if_not_exists: true,
227 table_id: table_info.ident.table_id,
228 name: ObjectName(vec![Ident::with_quote(quote_style, table_name)]),
229 columns,
230 engine: table_meta.engine.clone(),
231 constraints,
232 options: create_sql_options(table_meta, schema_options),
233 partitions: None,
234 })
235}
236
237#[cfg(test)]
238mod tests {
239 use std::sync::Arc;
240 use std::time::Duration;
241
242 use common_time::timestamp::TimeUnit;
243 use datatypes::prelude::ConcreteDataType;
244 use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
245 use table::metadata::*;
246 use table::requests::{
247 TableOptions, FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY,
248 };
249
250 use super::*;
251
252 #[test]
253 fn test_show_create_table_sql() {
254 let schema = vec![
255 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
256 .with_skipping_options(SkippingIndexOptions {
257 granularity: 4096,
258 ..Default::default()
259 })
260 .unwrap(),
261 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
262 .with_inverted_index(true),
263 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
264 ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
265 ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
266 .with_fulltext_options(FulltextOptions {
267 enable: true,
268 ..Default::default()
269 })
270 .unwrap(),
271 ColumnSchema::new(
272 "ts",
273 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
274 false,
275 )
276 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
277 "current_timestamp()",
278 ))))
279 .unwrap()
280 .with_time_index(true),
281 ];
282
283 let table_schema = SchemaRef::new(Schema::new(schema));
284 let table_name = "system_metrics";
285 let schema_name = "public".to_string();
286 let catalog_name = "greptime".to_string();
287 let regions = vec![0, 1, 2];
288
289 let mut options = table::requests::TableOptions {
290 ttl: Some(Duration::from_secs(30).into()),
291 ..Default::default()
292 };
293
294 let _ = options
295 .extra_options
296 .insert("compaction.type".to_string(), "twcs".to_string());
297
298 let meta = TableMetaBuilder::empty()
299 .schema(table_schema)
300 .primary_key_indices(vec![0, 1])
301 .value_indices(vec![2, 3])
302 .engine("mito".to_string())
303 .next_column_id(0)
304 .options(options)
305 .created_on(Default::default())
306 .region_numbers(regions)
307 .build()
308 .unwrap();
309
310 let info = Arc::new(
311 TableInfoBuilder::default()
312 .table_id(1024)
313 .table_version(0 as TableVersion)
314 .name(table_name)
315 .schema_name(schema_name)
316 .catalog_name(catalog_name)
317 .desc(None)
318 .table_type(TableType::Base)
319 .meta(meta)
320 .build()
321 .unwrap(),
322 );
323
324 let stmt = create_table_stmt(&info, None, '"').unwrap();
325
326 let sql = format!("\n{}", stmt);
327 assert_eq!(
328 r#"
329CREATE TABLE IF NOT EXISTS "system_metrics" (
330 "id" INT UNSIGNED NULL SKIPPING INDEX WITH(granularity = '4096', type = 'BLOOM'),
331 "host" STRING NULL INVERTED INDEX,
332 "cpu" DOUBLE NULL,
333 "disk" FLOAT NULL,
334 "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
335 "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
336 TIME INDEX ("ts"),
337 PRIMARY KEY ("id", "host")
338)
339ENGINE=mito
340WITH(
341 'compaction.type' = 'twcs',
342 ttl = '30s'
343)"#,
344 sql
345 );
346 }
347
348 #[test]
349 fn test_show_create_external_table_sql() {
350 let schema = vec![
351 ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
352 ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
353 ];
354 let table_schema = SchemaRef::new(Schema::new(schema));
355 let table_name = "system_metrics";
356 let schema_name = "public".to_string();
357 let catalog_name = "greptime".to_string();
358 let mut options: TableOptions = Default::default();
359 let _ = options
360 .extra_options
361 .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
362 let _ = options.extra_options.insert(
363 FILE_TABLE_META_KEY.to_string(),
364 "{{\"files\":[\"foo.csv\"]}}".to_string(),
365 );
366 let _ = options
367 .extra_options
368 .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
369 let meta = TableMetaBuilder::empty()
370 .schema(table_schema)
371 .primary_key_indices(vec![])
372 .engine("file".to_string())
373 .next_column_id(0)
374 .options(options)
375 .created_on(Default::default())
376 .build()
377 .unwrap();
378
379 let info = Arc::new(
380 TableInfoBuilder::default()
381 .table_id(1024)
382 .table_version(0 as TableVersion)
383 .name(table_name)
384 .schema_name(schema_name)
385 .catalog_name(catalog_name)
386 .desc(None)
387 .table_type(TableType::Base)
388 .meta(meta)
389 .build()
390 .unwrap(),
391 );
392
393 let stmt = create_table_stmt(&info, None, '"').unwrap();
394
395 let sql = format!("\n{}", stmt);
396 assert_eq!(
397 r#"
398CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
399 "host" STRING NULL,
400 "cpu" DOUBLE NULL,
401
402)
403ENGINE=file
404WITH(
405 format = 'csv',
406 location = 'foo.csv'
407)"#,
408 sql
409 );
410 }
411}