query/sql/
show_create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of `SHOW CREATE TABLE` statement.
16
17use std::collections::HashMap;
18
19use arrow_schema::extension::ExtensionType;
20use common_meta::SchemaOptions;
21use datatypes::extension::json::JsonExtensionType;
22use datatypes::schema::{
23    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
24    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
25    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
26    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
27    ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
28};
29use snafu::ResultExt;
30use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
31use sql::dialect::GreptimeDbDialect;
32use sql::parser::ParserContext;
33use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
34use sql::statements::{self, OptionMap};
35use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
36use table::metadata::{TableInfoRef, TableMeta};
37use table::requests::{
38    COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY,
39};
40
41use crate::error::{
42    ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
43    GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
44};
45
46/// Generates CREATE TABLE options from given table metadata and schema-level options.
47fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
48    let table_opts = &table_meta.options;
49    let mut options = OptionMap::default();
50    if let Some(write_buffer_size) = table_opts.write_buffer_size {
51        options.insert(
52            WRITE_BUFFER_SIZE_KEY.to_string(),
53            write_buffer_size.to_string(),
54        );
55    }
56    if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
57        options.insert(TTL_KEY.to_string(), ttl);
58    } else if let Some(database_ttl) = schema_options
59        .and_then(|o| o.ttl)
60        .map(|ttl| ttl.to_string())
61    {
62        options.insert(TTL_KEY.to_string(), database_ttl);
63    };
64    for (k, v) in table_opts
65        .extra_options
66        .iter()
67        .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
68    {
69        options.insert(k.clone(), v.clone());
70    }
71    options
72}
73
74#[inline]
75fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
76    ColumnOptionDef { name: None, option }
77}
78
79fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
80    let name = &column_schema.name;
81    let mut options = Vec::with_capacity(2);
82    let mut extensions = ColumnExtensions::default();
83
84    if column_schema.is_nullable() {
85        options.push(column_option_def(ColumnOption::Null));
86    } else {
87        options.push(column_option_def(ColumnOption::NotNull));
88    }
89
90    if let Some(c) = column_schema.default_constraint() {
91        let expr = match c {
92            ColumnDefaultConstraint::Value(v) => Expr::Value(
93                statements::value_to_sql_value(v)
94                    .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?
95                    .into(),
96            ),
97            ColumnDefaultConstraint::Function(expr) => {
98                ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
99            }
100        };
101
102        options.push(column_option_def(ColumnOption::Default(expr)));
103    }
104
105    if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
106        options.push(column_option_def(ColumnOption::Comment(c.clone())));
107    }
108
109    if let Some(opt) = column_schema
110        .fulltext_options()
111        .context(GetFulltextOptionsSnafu)?
112        && opt.enable
113    {
114        let mut map = HashMap::from([
115            (
116                COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
117                opt.analyzer.to_string(),
118            ),
119            (
120                COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
121                opt.case_sensitive.to_string(),
122            ),
123            (
124                COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
125                opt.backend.to_string(),
126            ),
127        ]);
128        if opt.backend == FulltextBackend::Bloom {
129            map.insert(
130                COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
131                opt.granularity.to_string(),
132            );
133            map.insert(
134                COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
135                opt.false_positive_rate().to_string(),
136            );
137        }
138        extensions.fulltext_index_options = Some(map.into());
139    }
140
141    if let Some(opt) = column_schema
142        .skipping_index_options()
143        .context(GetSkippingIndexOptionsSnafu)?
144    {
145        let map = HashMap::from([
146            (
147                COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
148                opt.granularity.to_string(),
149            ),
150            (
151                COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
152                opt.false_positive_rate().to_string(),
153            ),
154            (
155                COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
156                opt.index_type.to_string(),
157            ),
158        ]);
159        extensions.skipping_index_options = Some(map.into());
160    }
161
162    if column_schema.is_inverted_indexed() {
163        extensions.inverted_index_options = Some(HashMap::new().into());
164    }
165
166    if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
167        let settings = json_extension
168            .metadata()
169            .json_structure_settings
170            .clone()
171            .unwrap_or_default();
172        extensions.set_json_structure_settings(settings);
173    }
174
175    Ok(Column {
176        column_def: ColumnDef {
177            name: Ident::with_quote(quote_style, name),
178            data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
179                .with_context(|_| ConvertSqlTypeSnafu {
180                    datatype: column_schema.data_type.clone(),
181                })?,
182            options,
183        },
184        extensions,
185    })
186}
187
188/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
189///
190/// For metric engine, it will only return the primary key columns that are not internal columns.
191fn primary_key_columns_for_show_create<'a>(
192    table_meta: &'a TableMeta,
193    engine: &str,
194) -> Vec<&'a String> {
195    let is_metric_engine = is_metric_engine(engine);
196    if is_metric_engine {
197        table_meta
198            .row_key_column_names()
199            .filter(|name| !is_metric_engine_internal_column(name))
200            .collect()
201    } else {
202        table_meta.row_key_column_names().collect()
203    }
204}
205
206fn create_table_constraints(
207    engine: &str,
208    schema: &SchemaRef,
209    table_meta: &TableMeta,
210    quote_style: char,
211) -> Vec<TableConstraint> {
212    let mut constraints = Vec::with_capacity(2);
213    if let Some(timestamp_column) = schema.timestamp_column() {
214        let column_name = &timestamp_column.name;
215        constraints.push(TableConstraint::TimeIndex {
216            column: Ident::with_quote(quote_style, column_name),
217        });
218    }
219    if !table_meta.primary_key_indices.is_empty() {
220        let columns = primary_key_columns_for_show_create(table_meta, engine)
221            .into_iter()
222            .map(|name| Ident::with_quote(quote_style, name))
223            .collect();
224        constraints.push(TableConstraint::PrimaryKey { columns });
225    }
226
227    constraints
228}
229
230/// Create a CreateTable statement from table info.
231pub fn create_table_stmt(
232    table_info: &TableInfoRef,
233    schema_options: Option<SchemaOptions>,
234    quote_style: char,
235) -> Result<CreateTable> {
236    let table_meta = &table_info.meta;
237    let table_name = &table_info.name;
238    let schema = &table_info.meta.schema;
239    let is_metric_engine = is_metric_engine(&table_meta.engine);
240    let columns = schema
241        .column_schemas()
242        .iter()
243        .filter_map(|c| {
244            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
245                None
246            } else {
247                Some(create_column(c, quote_style))
248            }
249        })
250        .collect::<Result<Vec<_>>>()?;
251
252    let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
253
254    let mut options = create_sql_options(table_meta, schema_options);
255    if let Some(comment) = &table_info.desc
256        && options.get(TABLE_COMMENT_KEY).is_none()
257    {
258        options.insert(format!("'{TABLE_COMMENT_KEY}'"), comment.clone());
259    }
260
261    Ok(CreateTable {
262        if_not_exists: true,
263        table_id: table_info.ident.table_id,
264        name: ObjectName::from(vec![Ident::with_quote(quote_style, table_name)]),
265        columns,
266        engine: table_meta.engine.clone(),
267        constraints,
268        options,
269        partitions: None,
270    })
271}
272
273#[cfg(test)]
274mod tests {
275    use std::sync::Arc;
276    use std::time::Duration;
277
278    use common_time::timestamp::TimeUnit;
279    use datatypes::prelude::ConcreteDataType;
280    use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
281    use table::metadata::*;
282    use table::requests::{
283        FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
284    };
285
286    use super::*;
287
288    #[test]
289    fn test_show_create_table_sql() {
290        let schema = vec![
291            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
292                .with_skipping_options(SkippingIndexOptions {
293                    granularity: 4096,
294                    ..Default::default()
295                })
296                .unwrap(),
297            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
298                .with_inverted_index(true),
299            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
300            ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
301            ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
302                .with_fulltext_options(FulltextOptions {
303                    enable: true,
304                    ..Default::default()
305                })
306                .unwrap(),
307            ColumnSchema::new(
308                "ts",
309                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
310                false,
311            )
312            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
313                "current_timestamp()",
314            ))))
315            .unwrap()
316            .with_time_index(true),
317        ];
318
319        let table_schema = SchemaRef::new(Schema::new(schema));
320        let table_name = "system_metrics";
321        let schema_name = "public".to_string();
322        let catalog_name = "greptime".to_string();
323        let regions = vec![0, 1, 2];
324
325        let mut options = table::requests::TableOptions {
326            ttl: Some(Duration::from_secs(30).into()),
327            ..Default::default()
328        };
329
330        let _ = options
331            .extra_options
332            .insert("compaction.type".to_string(), "twcs".to_string());
333
334        let meta = TableMetaBuilder::empty()
335            .schema(table_schema)
336            .primary_key_indices(vec![0, 1])
337            .value_indices(vec![2, 3])
338            .engine("mito".to_string())
339            .next_column_id(0)
340            .options(options)
341            .created_on(Default::default())
342            .region_numbers(regions)
343            .build()
344            .unwrap();
345
346        let info = Arc::new(
347            TableInfoBuilder::default()
348                .table_id(1024)
349                .table_version(0 as TableVersion)
350                .name(table_name)
351                .schema_name(schema_name)
352                .catalog_name(catalog_name)
353                .desc(None)
354                .table_type(TableType::Base)
355                .meta(meta)
356                .build()
357                .unwrap(),
358        );
359
360        let stmt = create_table_stmt(&info, None, '"').unwrap();
361
362        let sql = format!("\n{}", stmt);
363        assert_eq!(
364            r#"
365CREATE TABLE IF NOT EXISTS "system_metrics" (
366  "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
367  "host" STRING NULL INVERTED INDEX,
368  "cpu" DOUBLE NULL,
369  "disk" FLOAT NULL,
370  "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
371  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
372  TIME INDEX ("ts"),
373  PRIMARY KEY ("id", "host")
374)
375ENGINE=mito
376WITH(
377  'compaction.type' = 'twcs',
378  ttl = '30s'
379)"#,
380            sql
381        );
382    }
383
384    #[test]
385    fn test_show_create_external_table_sql() {
386        let schema = vec![
387            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
388            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
389        ];
390        let table_schema = SchemaRef::new(Schema::new(schema));
391        let table_name = "system_metrics";
392        let schema_name = "public".to_string();
393        let catalog_name = "greptime".to_string();
394        let mut options: TableOptions = Default::default();
395        let _ = options
396            .extra_options
397            .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
398        let _ = options.extra_options.insert(
399            FILE_TABLE_META_KEY.to_string(),
400            "{{\"files\":[\"foo.csv\"]}}".to_string(),
401        );
402        let _ = options
403            .extra_options
404            .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
405        let meta = TableMetaBuilder::empty()
406            .schema(table_schema)
407            .primary_key_indices(vec![])
408            .engine("file".to_string())
409            .next_column_id(0)
410            .options(options)
411            .created_on(Default::default())
412            .build()
413            .unwrap();
414
415        let info = Arc::new(
416            TableInfoBuilder::default()
417                .table_id(1024)
418                .table_version(0 as TableVersion)
419                .name(table_name)
420                .schema_name(schema_name)
421                .catalog_name(catalog_name)
422                .desc(None)
423                .table_type(TableType::Base)
424                .meta(meta)
425                .build()
426                .unwrap(),
427        );
428
429        let stmt = create_table_stmt(&info, None, '"').unwrap();
430
431        let sql = format!("\n{}", stmt);
432        assert_eq!(
433            r#"
434CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
435  "host" STRING NULL,
436  "cpu" DOUBLE NULL,
437
438)
439ENGINE=file
440WITH(
441  format = 'csv',
442  location = 'foo.csv'
443)"#,
444            sql
445        );
446    }
447}