query/sql/
show_create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of `SHOW CREATE TABLE` statement.
16
17use std::collections::HashMap;
18
19use common_meta::SchemaOptions;
20use datatypes::schema::{
21    ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
22    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
23    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
24    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
25    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
26};
27use snafu::ResultExt;
28use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
29use sql::dialect::GreptimeDbDialect;
30use sql::parser::ParserContext;
31use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
32use sql::statements::{self, OptionMap};
33use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
34use table::metadata::{TableInfoRef, TableMeta};
35use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
36
37use crate::error::{
38    ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
39    GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
40};
41
42/// Generates CREATE TABLE options from given table metadata and schema-level options.
43fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
44    let table_opts = &table_meta.options;
45    let mut options = OptionMap::default();
46    if let Some(write_buffer_size) = table_opts.write_buffer_size {
47        options.insert(
48            WRITE_BUFFER_SIZE_KEY.to_string(),
49            write_buffer_size.to_string(),
50        );
51    }
52    if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
53        options.insert(TTL_KEY.to_string(), ttl);
54    } else if let Some(database_ttl) = schema_options
55        .and_then(|o| o.ttl)
56        .map(|ttl| ttl.to_string())
57    {
58        options.insert(TTL_KEY.to_string(), database_ttl);
59    };
60    for (k, v) in table_opts
61        .extra_options
62        .iter()
63        .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
64    {
65        options.insert(k.to_string(), v.to_string());
66    }
67    options
68}
69
70#[inline]
71fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
72    ColumnOptionDef { name: None, option }
73}
74
75fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
76    let name = &column_schema.name;
77    let mut options = Vec::with_capacity(2);
78    let mut extensions = ColumnExtensions::default();
79
80    if column_schema.is_nullable() {
81        options.push(column_option_def(ColumnOption::Null));
82    } else {
83        options.push(column_option_def(ColumnOption::NotNull));
84    }
85
86    if let Some(c) = column_schema.default_constraint() {
87        let expr = match c {
88            ColumnDefaultConstraint::Value(v) => Expr::Value(
89                statements::value_to_sql_value(v)
90                    .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?,
91            ),
92            ColumnDefaultConstraint::Function(expr) => {
93                ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
94            }
95        };
96
97        options.push(column_option_def(ColumnOption::Default(expr)));
98    }
99
100    if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
101        options.push(column_option_def(ColumnOption::Comment(c.to_string())));
102    }
103
104    if let Some(opt) = column_schema
105        .fulltext_options()
106        .context(GetFulltextOptionsSnafu)?
107        && opt.enable
108    {
109        let mut map = HashMap::from([
110            (
111                COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
112                opt.analyzer.to_string(),
113            ),
114            (
115                COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
116                opt.case_sensitive.to_string(),
117            ),
118            (
119                COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
120                opt.backend.to_string(),
121            ),
122        ]);
123        if opt.backend == FulltextBackend::Bloom {
124            map.insert(
125                COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
126                opt.granularity.to_string(),
127            );
128            map.insert(
129                COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
130                opt.false_positive_rate().to_string(),
131            );
132        }
133        extensions.fulltext_index_options = Some(map.into());
134    }
135
136    if let Some(opt) = column_schema
137        .skipping_index_options()
138        .context(GetSkippingIndexOptionsSnafu)?
139    {
140        let map = HashMap::from([
141            (
142                COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
143                opt.granularity.to_string(),
144            ),
145            (
146                COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
147                opt.false_positive_rate().to_string(),
148            ),
149            (
150                COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
151                opt.index_type.to_string(),
152            ),
153        ]);
154        extensions.skipping_index_options = Some(map.into());
155    }
156
157    if column_schema.is_inverted_indexed() {
158        extensions.inverted_index_options = Some(HashMap::new().into());
159    }
160
161    Ok(Column {
162        column_def: ColumnDef {
163            name: Ident::with_quote(quote_style, name),
164            data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
165                .with_context(|_| ConvertSqlTypeSnafu {
166                    datatype: column_schema.data_type.clone(),
167                })?,
168            collation: None,
169            options,
170        },
171        extensions,
172    })
173}
174
175/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
176///
177/// For metric engine, it will only return the primary key columns that are not internal columns.
178fn primary_key_columns_for_show_create<'a>(
179    table_meta: &'a TableMeta,
180    engine: &str,
181) -> Vec<&'a String> {
182    let is_metric_engine = is_metric_engine(engine);
183    if is_metric_engine {
184        table_meta
185            .row_key_column_names()
186            .filter(|name| !is_metric_engine_internal_column(name))
187            .collect()
188    } else {
189        table_meta.row_key_column_names().collect()
190    }
191}
192
193fn create_table_constraints(
194    engine: &str,
195    schema: &SchemaRef,
196    table_meta: &TableMeta,
197    quote_style: char,
198) -> Vec<TableConstraint> {
199    let mut constraints = Vec::with_capacity(2);
200    if let Some(timestamp_column) = schema.timestamp_column() {
201        let column_name = &timestamp_column.name;
202        constraints.push(TableConstraint::TimeIndex {
203            column: Ident::with_quote(quote_style, column_name),
204        });
205    }
206    if !table_meta.primary_key_indices.is_empty() {
207        let columns = primary_key_columns_for_show_create(table_meta, engine)
208            .into_iter()
209            .map(|name| Ident::with_quote(quote_style, name))
210            .collect();
211        constraints.push(TableConstraint::PrimaryKey { columns });
212    }
213
214    constraints
215}
216
217/// Create a CreateTable statement from table info.
218pub fn create_table_stmt(
219    table_info: &TableInfoRef,
220    schema_options: Option<SchemaOptions>,
221    quote_style: char,
222) -> Result<CreateTable> {
223    let table_meta = &table_info.meta;
224    let table_name = &table_info.name;
225    let schema = &table_info.meta.schema;
226    let is_metric_engine = is_metric_engine(&table_meta.engine);
227    let columns = schema
228        .column_schemas()
229        .iter()
230        .filter_map(|c| {
231            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
232                None
233            } else {
234                Some(create_column(c, quote_style))
235            }
236        })
237        .collect::<Result<Vec<_>>>()?;
238
239    let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
240
241    Ok(CreateTable {
242        if_not_exists: true,
243        table_id: table_info.ident.table_id,
244        name: ObjectName(vec![Ident::with_quote(quote_style, table_name)]),
245        columns,
246        engine: table_meta.engine.clone(),
247        constraints,
248        options: create_sql_options(table_meta, schema_options),
249        partitions: None,
250    })
251}
252
253#[cfg(test)]
254mod tests {
255    use std::sync::Arc;
256    use std::time::Duration;
257
258    use common_time::timestamp::TimeUnit;
259    use datatypes::prelude::ConcreteDataType;
260    use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
261    use table::metadata::*;
262    use table::requests::{
263        TableOptions, FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY,
264    };
265
266    use super::*;
267
268    #[test]
269    fn test_show_create_table_sql() {
270        let schema = vec![
271            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
272                .with_skipping_options(SkippingIndexOptions {
273                    granularity: 4096,
274                    ..Default::default()
275                })
276                .unwrap(),
277            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
278                .with_inverted_index(true),
279            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
280            ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
281            ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
282                .with_fulltext_options(FulltextOptions {
283                    enable: true,
284                    ..Default::default()
285                })
286                .unwrap(),
287            ColumnSchema::new(
288                "ts",
289                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
290                false,
291            )
292            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
293                "current_timestamp()",
294            ))))
295            .unwrap()
296            .with_time_index(true),
297        ];
298
299        let table_schema = SchemaRef::new(Schema::new(schema));
300        let table_name = "system_metrics";
301        let schema_name = "public".to_string();
302        let catalog_name = "greptime".to_string();
303        let regions = vec![0, 1, 2];
304
305        let mut options = table::requests::TableOptions {
306            ttl: Some(Duration::from_secs(30).into()),
307            ..Default::default()
308        };
309
310        let _ = options
311            .extra_options
312            .insert("compaction.type".to_string(), "twcs".to_string());
313
314        let meta = TableMetaBuilder::empty()
315            .schema(table_schema)
316            .primary_key_indices(vec![0, 1])
317            .value_indices(vec![2, 3])
318            .engine("mito".to_string())
319            .next_column_id(0)
320            .options(options)
321            .created_on(Default::default())
322            .region_numbers(regions)
323            .build()
324            .unwrap();
325
326        let info = Arc::new(
327            TableInfoBuilder::default()
328                .table_id(1024)
329                .table_version(0 as TableVersion)
330                .name(table_name)
331                .schema_name(schema_name)
332                .catalog_name(catalog_name)
333                .desc(None)
334                .table_type(TableType::Base)
335                .meta(meta)
336                .build()
337                .unwrap(),
338        );
339
340        let stmt = create_table_stmt(&info, None, '"').unwrap();
341
342        let sql = format!("\n{}", stmt);
343        assert_eq!(
344            r#"
345CREATE TABLE IF NOT EXISTS "system_metrics" (
346  "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
347  "host" STRING NULL INVERTED INDEX,
348  "cpu" DOUBLE NULL,
349  "disk" FLOAT NULL,
350  "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
351  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
352  TIME INDEX ("ts"),
353  PRIMARY KEY ("id", "host")
354)
355ENGINE=mito
356WITH(
357  'compaction.type' = 'twcs',
358  ttl = '30s'
359)"#,
360            sql
361        );
362    }
363
364    #[test]
365    fn test_show_create_external_table_sql() {
366        let schema = vec![
367            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
368            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
369        ];
370        let table_schema = SchemaRef::new(Schema::new(schema));
371        let table_name = "system_metrics";
372        let schema_name = "public".to_string();
373        let catalog_name = "greptime".to_string();
374        let mut options: TableOptions = Default::default();
375        let _ = options
376            .extra_options
377            .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
378        let _ = options.extra_options.insert(
379            FILE_TABLE_META_KEY.to_string(),
380            "{{\"files\":[\"foo.csv\"]}}".to_string(),
381        );
382        let _ = options
383            .extra_options
384            .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
385        let meta = TableMetaBuilder::empty()
386            .schema(table_schema)
387            .primary_key_indices(vec![])
388            .engine("file".to_string())
389            .next_column_id(0)
390            .options(options)
391            .created_on(Default::default())
392            .build()
393            .unwrap();
394
395        let info = Arc::new(
396            TableInfoBuilder::default()
397                .table_id(1024)
398                .table_version(0 as TableVersion)
399                .name(table_name)
400                .schema_name(schema_name)
401                .catalog_name(catalog_name)
402                .desc(None)
403                .table_type(TableType::Base)
404                .meta(meta)
405                .build()
406                .unwrap(),
407        );
408
409        let stmt = create_table_stmt(&info, None, '"').unwrap();
410
411        let sql = format!("\n{}", stmt);
412        assert_eq!(
413            r#"
414CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
415  "host" STRING NULL,
416  "cpu" DOUBLE NULL,
417
418)
419ENGINE=file
420WITH(
421  format = 'csv',
422  location = 'foo.csv'
423)"#,
424            sql
425        );
426    }
427}