query/sql/
show_create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of `SHOW CREATE TABLE` statement.
16
17use std::collections::HashMap;
18
19use common_meta::SchemaOptions;
20use datatypes::schema::{
21    ColumnDefaultConstraint, ColumnSchema, SchemaRef, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
22    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
23    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
24};
25use snafu::ResultExt;
26use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
27use sql::dialect::GreptimeDbDialect;
28use sql::parser::ParserContext;
29use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
30use sql::statements::{self, OptionMap};
31use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
32use table::metadata::{TableInfoRef, TableMeta};
33use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
34
35use crate::error::{
36    ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
37    GetSkippingIndexOptionsSnafu, Result, SqlSnafu,
38};
39
40/// Generates CREATE TABLE options from given table metadata and schema-level options.
41fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
42    let table_opts = &table_meta.options;
43    let mut options = OptionMap::default();
44    if let Some(write_buffer_size) = table_opts.write_buffer_size {
45        options.insert(
46            WRITE_BUFFER_SIZE_KEY.to_string(),
47            write_buffer_size.to_string(),
48        );
49    }
50    if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
51        options.insert(TTL_KEY.to_string(), ttl);
52    } else if let Some(database_ttl) = schema_options
53        .and_then(|o| o.ttl)
54        .map(|ttl| ttl.to_string())
55    {
56        options.insert(TTL_KEY.to_string(), database_ttl);
57    };
58    for (k, v) in table_opts
59        .extra_options
60        .iter()
61        .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
62    {
63        options.insert(k.to_string(), v.to_string());
64    }
65    options
66}
67
68#[inline]
69fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
70    ColumnOptionDef { name: None, option }
71}
72
73fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
74    let name = &column_schema.name;
75    let mut options = Vec::with_capacity(2);
76    let mut extensions = ColumnExtensions::default();
77
78    if column_schema.is_nullable() {
79        options.push(column_option_def(ColumnOption::Null));
80    } else {
81        options.push(column_option_def(ColumnOption::NotNull));
82    }
83
84    if let Some(c) = column_schema.default_constraint() {
85        let expr = match c {
86            ColumnDefaultConstraint::Value(v) => Expr::Value(
87                statements::value_to_sql_value(v)
88                    .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?,
89            ),
90            ColumnDefaultConstraint::Function(expr) => {
91                ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
92            }
93        };
94
95        options.push(column_option_def(ColumnOption::Default(expr)));
96    }
97
98    if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
99        options.push(column_option_def(ColumnOption::Comment(c.to_string())));
100    }
101
102    if let Some(opt) = column_schema
103        .fulltext_options()
104        .context(GetFulltextOptionsSnafu)?
105        && opt.enable
106    {
107        let map = HashMap::from([
108            (
109                COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
110                opt.analyzer.to_string(),
111            ),
112            (
113                COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
114                opt.case_sensitive.to_string(),
115            ),
116            (
117                COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
118                opt.backend.to_string(),
119            ),
120        ]);
121        extensions.fulltext_index_options = Some(map.into());
122    }
123
124    if let Some(opt) = column_schema
125        .skipping_index_options()
126        .context(GetSkippingIndexOptionsSnafu)?
127    {
128        let map = HashMap::from([
129            (
130                COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
131                opt.granularity.to_string(),
132            ),
133            (
134                COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
135                opt.index_type.to_string(),
136            ),
137        ]);
138        extensions.skipping_index_options = Some(map.into());
139    }
140
141    if column_schema.is_inverted_indexed() {
142        extensions.inverted_index_options = Some(HashMap::new().into());
143    }
144
145    Ok(Column {
146        column_def: ColumnDef {
147            name: Ident::with_quote(quote_style, name),
148            data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
149                .with_context(|_| ConvertSqlTypeSnafu {
150                    datatype: column_schema.data_type.clone(),
151                })?,
152            collation: None,
153            options,
154        },
155        extensions,
156    })
157}
158
159/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
160///
161/// For metric engine, it will only return the primary key columns that are not internal columns.
162fn primary_key_columns_for_show_create<'a>(
163    table_meta: &'a TableMeta,
164    engine: &str,
165) -> Vec<&'a String> {
166    let is_metric_engine = is_metric_engine(engine);
167    if is_metric_engine {
168        table_meta
169            .row_key_column_names()
170            .filter(|name| !is_metric_engine_internal_column(name))
171            .collect()
172    } else {
173        table_meta.row_key_column_names().collect()
174    }
175}
176
177fn create_table_constraints(
178    engine: &str,
179    schema: &SchemaRef,
180    table_meta: &TableMeta,
181    quote_style: char,
182) -> Vec<TableConstraint> {
183    let mut constraints = Vec::with_capacity(2);
184    if let Some(timestamp_column) = schema.timestamp_column() {
185        let column_name = &timestamp_column.name;
186        constraints.push(TableConstraint::TimeIndex {
187            column: Ident::with_quote(quote_style, column_name),
188        });
189    }
190    if !table_meta.primary_key_indices.is_empty() {
191        let columns = primary_key_columns_for_show_create(table_meta, engine)
192            .into_iter()
193            .map(|name| Ident::with_quote(quote_style, name))
194            .collect();
195        constraints.push(TableConstraint::PrimaryKey { columns });
196    }
197
198    constraints
199}
200
201/// Create a CreateTable statement from table info.
202pub fn create_table_stmt(
203    table_info: &TableInfoRef,
204    schema_options: Option<SchemaOptions>,
205    quote_style: char,
206) -> Result<CreateTable> {
207    let table_meta = &table_info.meta;
208    let table_name = &table_info.name;
209    let schema = &table_info.meta.schema;
210    let is_metric_engine = is_metric_engine(&table_meta.engine);
211    let columns = schema
212        .column_schemas()
213        .iter()
214        .filter_map(|c| {
215            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
216                None
217            } else {
218                Some(create_column(c, quote_style))
219            }
220        })
221        .collect::<Result<Vec<_>>>()?;
222
223    let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
224
225    Ok(CreateTable {
226        if_not_exists: true,
227        table_id: table_info.ident.table_id,
228        name: ObjectName(vec![Ident::with_quote(quote_style, table_name)]),
229        columns,
230        engine: table_meta.engine.clone(),
231        constraints,
232        options: create_sql_options(table_meta, schema_options),
233        partitions: None,
234    })
235}
236
237#[cfg(test)]
238mod tests {
239    use std::sync::Arc;
240    use std::time::Duration;
241
242    use common_time::timestamp::TimeUnit;
243    use datatypes::prelude::ConcreteDataType;
244    use datatypes::schema::{FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
245    use table::metadata::*;
246    use table::requests::{
247        TableOptions, FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY,
248    };
249
250    use super::*;
251
252    #[test]
253    fn test_show_create_table_sql() {
254        let schema = vec![
255            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
256                .with_skipping_options(SkippingIndexOptions {
257                    granularity: 4096,
258                    ..Default::default()
259                })
260                .unwrap(),
261            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
262                .with_inverted_index(true),
263            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
264            ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
265            ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
266                .with_fulltext_options(FulltextOptions {
267                    enable: true,
268                    ..Default::default()
269                })
270                .unwrap(),
271            ColumnSchema::new(
272                "ts",
273                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
274                false,
275            )
276            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
277                "current_timestamp()",
278            ))))
279            .unwrap()
280            .with_time_index(true),
281        ];
282
283        let table_schema = SchemaRef::new(Schema::new(schema));
284        let table_name = "system_metrics";
285        let schema_name = "public".to_string();
286        let catalog_name = "greptime".to_string();
287        let regions = vec![0, 1, 2];
288
289        let mut options = table::requests::TableOptions {
290            ttl: Some(Duration::from_secs(30).into()),
291            ..Default::default()
292        };
293
294        let _ = options
295            .extra_options
296            .insert("compaction.type".to_string(), "twcs".to_string());
297
298        let meta = TableMetaBuilder::empty()
299            .schema(table_schema)
300            .primary_key_indices(vec![0, 1])
301            .value_indices(vec![2, 3])
302            .engine("mito".to_string())
303            .next_column_id(0)
304            .options(options)
305            .created_on(Default::default())
306            .region_numbers(regions)
307            .build()
308            .unwrap();
309
310        let info = Arc::new(
311            TableInfoBuilder::default()
312                .table_id(1024)
313                .table_version(0 as TableVersion)
314                .name(table_name)
315                .schema_name(schema_name)
316                .catalog_name(catalog_name)
317                .desc(None)
318                .table_type(TableType::Base)
319                .meta(meta)
320                .build()
321                .unwrap(),
322        );
323
324        let stmt = create_table_stmt(&info, None, '"').unwrap();
325
326        let sql = format!("\n{}", stmt);
327        assert_eq!(
328            r#"
329CREATE TABLE IF NOT EXISTS "system_metrics" (
330  "id" INT UNSIGNED NULL SKIPPING INDEX WITH(granularity = '4096', type = 'BLOOM'),
331  "host" STRING NULL INVERTED INDEX,
332  "cpu" DOUBLE NULL,
333  "disk" FLOAT NULL,
334  "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
335  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
336  TIME INDEX ("ts"),
337  PRIMARY KEY ("id", "host")
338)
339ENGINE=mito
340WITH(
341  'compaction.type' = 'twcs',
342  ttl = '30s'
343)"#,
344            sql
345        );
346    }
347
348    #[test]
349    fn test_show_create_external_table_sql() {
350        let schema = vec![
351            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
352            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
353        ];
354        let table_schema = SchemaRef::new(Schema::new(schema));
355        let table_name = "system_metrics";
356        let schema_name = "public".to_string();
357        let catalog_name = "greptime".to_string();
358        let mut options: TableOptions = Default::default();
359        let _ = options
360            .extra_options
361            .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
362        let _ = options.extra_options.insert(
363            FILE_TABLE_META_KEY.to_string(),
364            "{{\"files\":[\"foo.csv\"]}}".to_string(),
365        );
366        let _ = options
367            .extra_options
368            .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
369        let meta = TableMetaBuilder::empty()
370            .schema(table_schema)
371            .primary_key_indices(vec![])
372            .engine("file".to_string())
373            .next_column_id(0)
374            .options(options)
375            .created_on(Default::default())
376            .build()
377            .unwrap();
378
379        let info = Arc::new(
380            TableInfoBuilder::default()
381                .table_id(1024)
382                .table_version(0 as TableVersion)
383                .name(table_name)
384                .schema_name(schema_name)
385                .catalog_name(catalog_name)
386                .desc(None)
387                .table_type(TableType::Base)
388                .meta(meta)
389                .build()
390                .unwrap(),
391        );
392
393        let stmt = create_table_stmt(&info, None, '"').unwrap();
394
395        let sql = format!("\n{}", stmt);
396        assert_eq!(
397            r#"
398CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
399  "host" STRING NULL,
400  "cpu" DOUBLE NULL,
401
402)
403ENGINE=file
404WITH(
405  format = 'csv',
406  location = 'foo.csv'
407)"#,
408            sql
409        );
410    }
411}