query/sql/
show_create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of `SHOW CREATE TABLE` statement.
16
17use std::collections::HashMap;
18
19use arrow_schema::extension::ExtensionType;
20use common_meta::SchemaOptions;
21use datatypes::extension::json::JsonExtensionType;
22use datatypes::schema::{
23    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
24    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
25    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
26    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
27    ColumnDefaultConstraint, ColumnSchema, FulltextBackend, SchemaRef,
28};
29use snafu::ResultExt;
30use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
31use sql::dialect::GreptimeDbDialect;
32use sql::parser::ParserContext;
33use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
34use sql::statements::{self, OptionMap};
35use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
36use table::metadata::{TableInfoRef, TableMeta};
37use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
38
39use crate::error::{
40    ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
41    GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
42};
43
44/// Generates CREATE TABLE options from given table metadata and schema-level options.
45fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
46    let table_opts = &table_meta.options;
47    let mut options = OptionMap::default();
48    if let Some(write_buffer_size) = table_opts.write_buffer_size {
49        options.insert(
50            WRITE_BUFFER_SIZE_KEY.to_string(),
51            write_buffer_size.to_string(),
52        );
53    }
54    if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
55        options.insert(TTL_KEY.to_string(), ttl);
56    } else if let Some(database_ttl) = schema_options
57        .and_then(|o| o.ttl)
58        .map(|ttl| ttl.to_string())
59    {
60        options.insert(TTL_KEY.to_string(), database_ttl);
61    };
62    for (k, v) in table_opts
63        .extra_options
64        .iter()
65        .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
66    {
67        options.insert(k.clone(), v.clone());
68    }
69    options
70}
71
72#[inline]
73fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
74    ColumnOptionDef { name: None, option }
75}
76
77fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
78    let name = &column_schema.name;
79    let mut options = Vec::with_capacity(2);
80    let mut extensions = ColumnExtensions::default();
81
82    if column_schema.is_nullable() {
83        options.push(column_option_def(ColumnOption::Null));
84    } else {
85        options.push(column_option_def(ColumnOption::NotNull));
86    }
87
88    if let Some(c) = column_schema.default_constraint() {
89        let expr = match c {
90            ColumnDefaultConstraint::Value(v) => Expr::Value(
91                statements::value_to_sql_value(v)
92                    .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?
93                    .into(),
94            ),
95            ColumnDefaultConstraint::Function(expr) => {
96                ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
97            }
98        };
99
100        options.push(column_option_def(ColumnOption::Default(expr)));
101    }
102
103    if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
104        options.push(column_option_def(ColumnOption::Comment(c.clone())));
105    }
106
107    if let Some(opt) = column_schema
108        .fulltext_options()
109        .context(GetFulltextOptionsSnafu)?
110        && opt.enable
111    {
112        let mut map = HashMap::from([
113            (
114                COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
115                opt.analyzer.to_string(),
116            ),
117            (
118                COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
119                opt.case_sensitive.to_string(),
120            ),
121            (
122                COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
123                opt.backend.to_string(),
124            ),
125        ]);
126        if opt.backend == FulltextBackend::Bloom {
127            map.insert(
128                COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
129                opt.granularity.to_string(),
130            );
131            map.insert(
132                COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
133                opt.false_positive_rate().to_string(),
134            );
135        }
136        extensions.fulltext_index_options = Some(map.into());
137    }
138
139    if let Some(opt) = column_schema
140        .skipping_index_options()
141        .context(GetSkippingIndexOptionsSnafu)?
142    {
143        let map = HashMap::from([
144            (
145                COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
146                opt.granularity.to_string(),
147            ),
148            (
149                COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
150                opt.false_positive_rate().to_string(),
151            ),
152            (
153                COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
154                opt.index_type.to_string(),
155            ),
156        ]);
157        extensions.skipping_index_options = Some(map.into());
158    }
159
160    if column_schema.is_inverted_indexed() {
161        extensions.inverted_index_options = Some(HashMap::new().into());
162    }
163
164    if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
165        let settings = json_extension
166            .metadata()
167            .json_structure_settings
168            .clone()
169            .unwrap_or_default();
170        extensions.set_json_structure_settings(settings);
171    }
172
173    Ok(Column {
174        column_def: ColumnDef {
175            name: Ident::with_quote(quote_style, name),
176            data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
177                .with_context(|_| ConvertSqlTypeSnafu {
178                    datatype: column_schema.data_type.clone(),
179                })?,
180            options,
181        },
182        extensions,
183    })
184}
185
186/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
187///
188/// For metric engine, it will only return the primary key columns that are not internal columns.
189fn primary_key_columns_for_show_create<'a>(
190    table_meta: &'a TableMeta,
191    engine: &str,
192) -> Vec<&'a String> {
193    let is_metric_engine = is_metric_engine(engine);
194    if is_metric_engine {
195        table_meta
196            .row_key_column_names()
197            .filter(|name| !is_metric_engine_internal_column(name))
198            .collect()
199    } else {
200        table_meta.row_key_column_names().collect()
201    }
202}
203
204fn create_table_constraints(
205    engine: &str,
206    schema: &SchemaRef,
207    table_meta: &TableMeta,
208    quote_style: char,
209) -> Vec<TableConstraint> {
210    let mut constraints = Vec::with_capacity(2);
211    if let Some(timestamp_column) = schema.timestamp_column() {
212        let column_name = &timestamp_column.name;
213        constraints.push(TableConstraint::TimeIndex {
214            column: Ident::with_quote(quote_style, column_name),
215        });
216    }
217    if !table_meta.primary_key_indices.is_empty() {
218        let columns = primary_key_columns_for_show_create(table_meta, engine)
219            .into_iter()
220            .map(|name| Ident::with_quote(quote_style, name))
221            .collect();
222        constraints.push(TableConstraint::PrimaryKey { columns });
223    }
224
225    constraints
226}
227
228/// Create a CreateTable statement from table info.
229pub fn create_table_stmt(
230    table_info: &TableInfoRef,
231    schema_options: Option<SchemaOptions>,
232    quote_style: char,
233) -> Result<CreateTable> {
234    let table_meta = &table_info.meta;
235    let table_name = &table_info.name;
236    let schema = &table_info.meta.schema;
237    let is_metric_engine = is_metric_engine(&table_meta.engine);
238    let columns = schema
239        .column_schemas()
240        .iter()
241        .filter_map(|c| {
242            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
243                None
244            } else {
245                Some(create_column(c, quote_style))
246            }
247        })
248        .collect::<Result<Vec<_>>>()?;
249
250    let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
251
252    Ok(CreateTable {
253        if_not_exists: true,
254        table_id: table_info.ident.table_id,
255        name: ObjectName::from(vec![Ident::with_quote(quote_style, table_name)]),
256        columns,
257        engine: table_meta.engine.clone(),
258        constraints,
259        options: create_sql_options(table_meta, schema_options),
260        partitions: None,
261    })
262}
263
264#[cfg(test)]
265mod tests {
266    use std::sync::Arc;
267    use std::time::Duration;
268
269    use common_time::timestamp::TimeUnit;
270    use datatypes::prelude::ConcreteDataType;
271    use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
272    use table::metadata::*;
273    use table::requests::{
274        FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
275    };
276
277    use super::*;
278
279    #[test]
280    fn test_show_create_table_sql() {
281        let schema = vec![
282            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
283                .with_skipping_options(SkippingIndexOptions {
284                    granularity: 4096,
285                    ..Default::default()
286                })
287                .unwrap(),
288            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
289                .with_inverted_index(true),
290            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
291            ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
292            ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
293                .with_fulltext_options(FulltextOptions {
294                    enable: true,
295                    ..Default::default()
296                })
297                .unwrap(),
298            ColumnSchema::new(
299                "ts",
300                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
301                false,
302            )
303            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
304                "current_timestamp()",
305            ))))
306            .unwrap()
307            .with_time_index(true),
308        ];
309
310        let table_schema = SchemaRef::new(Schema::new(schema));
311        let table_name = "system_metrics";
312        let schema_name = "public".to_string();
313        let catalog_name = "greptime".to_string();
314        let regions = vec![0, 1, 2];
315
316        let mut options = table::requests::TableOptions {
317            ttl: Some(Duration::from_secs(30).into()),
318            ..Default::default()
319        };
320
321        let _ = options
322            .extra_options
323            .insert("compaction.type".to_string(), "twcs".to_string());
324
325        let meta = TableMetaBuilder::empty()
326            .schema(table_schema)
327            .primary_key_indices(vec![0, 1])
328            .value_indices(vec![2, 3])
329            .engine("mito".to_string())
330            .next_column_id(0)
331            .options(options)
332            .created_on(Default::default())
333            .region_numbers(regions)
334            .build()
335            .unwrap();
336
337        let info = Arc::new(
338            TableInfoBuilder::default()
339                .table_id(1024)
340                .table_version(0 as TableVersion)
341                .name(table_name)
342                .schema_name(schema_name)
343                .catalog_name(catalog_name)
344                .desc(None)
345                .table_type(TableType::Base)
346                .meta(meta)
347                .build()
348                .unwrap(),
349        );
350
351        let stmt = create_table_stmt(&info, None, '"').unwrap();
352
353        let sql = format!("\n{}", stmt);
354        assert_eq!(
355            r#"
356CREATE TABLE IF NOT EXISTS "system_metrics" (
357  "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
358  "host" STRING NULL INVERTED INDEX,
359  "cpu" DOUBLE NULL,
360  "disk" FLOAT NULL,
361  "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
362  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
363  TIME INDEX ("ts"),
364  PRIMARY KEY ("id", "host")
365)
366ENGINE=mito
367WITH(
368  'compaction.type' = 'twcs',
369  ttl = '30s'
370)"#,
371            sql
372        );
373    }
374
375    #[test]
376    fn test_show_create_external_table_sql() {
377        let schema = vec![
378            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
379            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
380        ];
381        let table_schema = SchemaRef::new(Schema::new(schema));
382        let table_name = "system_metrics";
383        let schema_name = "public".to_string();
384        let catalog_name = "greptime".to_string();
385        let mut options: TableOptions = Default::default();
386        let _ = options
387            .extra_options
388            .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
389        let _ = options.extra_options.insert(
390            FILE_TABLE_META_KEY.to_string(),
391            "{{\"files\":[\"foo.csv\"]}}".to_string(),
392        );
393        let _ = options
394            .extra_options
395            .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
396        let meta = TableMetaBuilder::empty()
397            .schema(table_schema)
398            .primary_key_indices(vec![])
399            .engine("file".to_string())
400            .next_column_id(0)
401            .options(options)
402            .created_on(Default::default())
403            .build()
404            .unwrap();
405
406        let info = Arc::new(
407            TableInfoBuilder::default()
408                .table_id(1024)
409                .table_version(0 as TableVersion)
410                .name(table_name)
411                .schema_name(schema_name)
412                .catalog_name(catalog_name)
413                .desc(None)
414                .table_type(TableType::Base)
415                .meta(meta)
416                .build()
417                .unwrap(),
418        );
419
420        let stmt = create_table_stmt(&info, None, '"').unwrap();
421
422        let sql = format!("\n{}", stmt);
423        assert_eq!(
424            r#"
425CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
426  "host" STRING NULL,
427  "cpu" DOUBLE NULL,
428
429)
430ENGINE=file
431WITH(
432  format = 'csv',
433  location = 'foo.csv'
434)"#,
435            sql
436        );
437    }
438}