query/sql/
show_create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of `SHOW CREATE TABLE` statement.
16
17use std::collections::HashMap;
18
19use arrow_schema::extension::ExtensionType;
20use common_meta::SchemaOptions;
21use datatypes::extension::json::JsonExtensionType;
22use datatypes::schema::{
23    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
24    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
25    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
26    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE,
27    COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY, COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE,
28    COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH,
29    COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema,
30    FulltextBackend, SchemaRef,
31};
32use snafu::ResultExt;
33use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::ParserContext;
36use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstraint};
37use sql::statements::{self, OptionMap};
38use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
39use table::metadata::{TableInfoRef, TableMeta};
40use table::requests::{
41    COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY,
42};
43
44use crate::error::{
45    ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
46    GetSkippingIndexOptionsSnafu, GetVectorIndexOptionsSnafu, Result, SqlSnafu,
47};
48
49/// Generates CREATE TABLE options from given table metadata and schema-level options.
50fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
51    let table_opts = &table_meta.options;
52    let mut options = OptionMap::default();
53    if let Some(write_buffer_size) = table_opts.write_buffer_size {
54        options.insert(
55            WRITE_BUFFER_SIZE_KEY.to_string(),
56            write_buffer_size.to_string(),
57        );
58    }
59    if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
60        options.insert(TTL_KEY.to_string(), ttl);
61    } else if let Some(database_ttl) = schema_options
62        .as_ref()
63        .and_then(|o| o.ttl)
64        .map(|ttl| ttl.to_string())
65    {
66        options.insert(TTL_KEY.to_string(), database_ttl);
67    };
68
69    for (k, v) in table_opts
70        .extra_options
71        .iter()
72        .filter(|(k, _)| k != &FILE_TABLE_META_KEY)
73    {
74        options.insert(k.clone(), v.clone());
75    }
76    options
77}
78
79#[inline]
80fn column_option_def(option: ColumnOption) -> ColumnOptionDef {
81    ColumnOptionDef { name: None, option }
82}
83
84fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Column> {
85    let name = &column_schema.name;
86    let mut options = Vec::with_capacity(2);
87    let mut extensions = ColumnExtensions::default();
88
89    if column_schema.is_nullable() {
90        options.push(column_option_def(ColumnOption::Null));
91    } else {
92        options.push(column_option_def(ColumnOption::NotNull));
93    }
94
95    if let Some(c) = column_schema.default_constraint() {
96        let expr = match c {
97            ColumnDefaultConstraint::Value(v) => Expr::Value(
98                statements::value_to_sql_value(v)
99                    .with_context(|_| ConvertSqlValueSnafu { value: v.clone() })?
100                    .into(),
101            ),
102            ColumnDefaultConstraint::Function(expr) => {
103                ParserContext::parse_function(expr, &GreptimeDbDialect {}).context(SqlSnafu)?
104            }
105        };
106
107        options.push(column_option_def(ColumnOption::Default(expr)));
108    }
109
110    if let Some(c) = column_schema.metadata().get(COMMENT_KEY) {
111        options.push(column_option_def(ColumnOption::Comment(c.clone())));
112    }
113
114    if let Some(opt) = column_schema
115        .fulltext_options()
116        .context(GetFulltextOptionsSnafu)?
117        && opt.enable
118    {
119        let mut map = HashMap::from([
120            (
121                COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
122                opt.analyzer.to_string(),
123            ),
124            (
125                COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
126                opt.case_sensitive.to_string(),
127            ),
128            (
129                COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
130                opt.backend.to_string(),
131            ),
132        ]);
133        if opt.backend == FulltextBackend::Bloom {
134            map.insert(
135                COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
136                opt.granularity.to_string(),
137            );
138            map.insert(
139                COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
140                opt.false_positive_rate().to_string(),
141            );
142        }
143        extensions.fulltext_index_options = Some(map.into());
144    }
145
146    if let Some(opt) = column_schema
147        .skipping_index_options()
148        .context(GetSkippingIndexOptionsSnafu)?
149    {
150        let map = HashMap::from([
151            (
152                COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
153                opt.granularity.to_string(),
154            ),
155            (
156                COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
157                opt.false_positive_rate().to_string(),
158            ),
159            (
160                COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
161                opt.index_type.to_string(),
162            ),
163        ]);
164        extensions.skipping_index_options = Some(map.into());
165    }
166
167    if let Some(opt) = column_schema
168        .vector_index_options()
169        .context(GetVectorIndexOptionsSnafu)?
170    {
171        let map = HashMap::from([
172            (
173                COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE.to_string(),
174                opt.engine.to_string(),
175            ),
176            (
177                COLUMN_VECTOR_INDEX_OPT_KEY_METRIC.to_string(),
178                opt.metric.to_string(),
179            ),
180            (
181                COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY.to_string(),
182                opt.connectivity.to_string(),
183            ),
184            (
185                COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD.to_string(),
186                opt.expansion_add.to_string(),
187            ),
188            (
189                COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH.to_string(),
190                opt.expansion_search.to_string(),
191            ),
192        ]);
193        extensions.vector_index_options = Some(map.into());
194    }
195
196    if column_schema.is_inverted_indexed() {
197        extensions.inverted_index_options = Some(HashMap::new().into());
198    }
199
200    if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
201        let settings = json_extension
202            .metadata()
203            .json_structure_settings
204            .clone()
205            .unwrap_or_default();
206        extensions.set_json_structure_settings(settings);
207    }
208
209    Ok(Column {
210        column_def: ColumnDef {
211            name: Ident::with_quote(quote_style, name),
212            data_type: statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
213                .with_context(|_| ConvertSqlTypeSnafu {
214                    datatype: column_schema.data_type.clone(),
215                })?,
216            options,
217        },
218        extensions,
219    })
220}
221
222/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
223///
224/// For metric engine, it will only return the primary key columns that are not internal columns.
225fn primary_key_columns_for_show_create<'a>(
226    table_meta: &'a TableMeta,
227    engine: &str,
228) -> Vec<&'a String> {
229    let is_metric_engine = is_metric_engine(engine);
230    if is_metric_engine {
231        table_meta
232            .row_key_column_names()
233            .filter(|name| !is_metric_engine_internal_column(name))
234            .collect()
235    } else {
236        table_meta.row_key_column_names().collect()
237    }
238}
239
240fn create_table_constraints(
241    engine: &str,
242    schema: &SchemaRef,
243    table_meta: &TableMeta,
244    quote_style: char,
245) -> Vec<TableConstraint> {
246    let mut constraints = Vec::with_capacity(2);
247    if let Some(timestamp_column) = schema.timestamp_column() {
248        let column_name = &timestamp_column.name;
249        constraints.push(TableConstraint::TimeIndex {
250            column: Ident::with_quote(quote_style, column_name),
251        });
252    }
253    if !table_meta.primary_key_indices.is_empty() {
254        let columns = primary_key_columns_for_show_create(table_meta, engine)
255            .into_iter()
256            .map(|name| Ident::with_quote(quote_style, name))
257            .collect();
258        constraints.push(TableConstraint::PrimaryKey { columns });
259    }
260
261    constraints
262}
263
264/// Create a CreateTable statement from table info.
265pub fn create_table_stmt(
266    table_info: &TableInfoRef,
267    schema_options: Option<SchemaOptions>,
268    quote_style: char,
269) -> Result<CreateTable> {
270    let table_meta = &table_info.meta;
271    let table_name = &table_info.name;
272    let schema = &table_info.meta.schema;
273    let is_metric_engine = is_metric_engine(&table_meta.engine);
274    let columns = schema
275        .column_schemas()
276        .iter()
277        .filter_map(|c| {
278            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
279                None
280            } else {
281                Some(create_column(c, quote_style))
282            }
283        })
284        .collect::<Result<Vec<_>>>()?;
285
286    let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
287
288    let mut options = create_sql_options(table_meta, schema_options);
289    if let Some(comment) = &table_info.desc
290        && options.get(TABLE_COMMENT_KEY).is_none()
291    {
292        options.insert(format!("'{TABLE_COMMENT_KEY}'"), comment.clone());
293    }
294
295    Ok(CreateTable {
296        if_not_exists: true,
297        table_id: table_info.ident.table_id,
298        name: ObjectName::from(vec![Ident::with_quote(quote_style, table_name)]),
299        columns,
300        engine: table_meta.engine.clone(),
301        constraints,
302        options,
303        partitions: None,
304    })
305}
306
307#[cfg(test)]
308mod tests {
309    use std::sync::Arc;
310    use std::time::Duration;
311
312    use common_time::timestamp::TimeUnit;
313    use datatypes::prelude::ConcreteDataType;
314    use datatypes::schema::{
315        FulltextOptions, Schema, SchemaRef, SkippingIndexOptions, VectorIndexOptions,
316    };
317    use table::metadata::*;
318    use table::requests::{
319        FILE_TABLE_FORMAT_KEY, FILE_TABLE_LOCATION_KEY, FILE_TABLE_META_KEY, TableOptions,
320    };
321
322    use super::*;
323
324    #[test]
325    fn test_show_create_table_sql() {
326        let schema = vec![
327            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
328                .with_skipping_options(SkippingIndexOptions {
329                    granularity: 4096,
330                    ..Default::default()
331                })
332                .unwrap(),
333            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
334                .with_inverted_index(true),
335            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
336            ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
337            ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
338                .with_fulltext_options(FulltextOptions {
339                    enable: true,
340                    ..Default::default()
341                })
342                .unwrap(),
343            ColumnSchema::new("embedding", ConcreteDataType::vector_datatype(4), true)
344                .with_vector_index_options(&VectorIndexOptions::default())
345                .unwrap(),
346            ColumnSchema::new(
347                "ts",
348                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
349                false,
350            )
351            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
352                "current_timestamp()",
353            ))))
354            .unwrap()
355            .with_time_index(true),
356        ];
357
358        let table_schema = SchemaRef::new(Schema::new(schema));
359        let table_name = "system_metrics";
360        let schema_name = "public".to_string();
361        let catalog_name = "greptime".to_string();
362
363        let mut options = table::requests::TableOptions {
364            ttl: Some(Duration::from_secs(30).into()),
365            ..Default::default()
366        };
367
368        let _ = options
369            .extra_options
370            .insert("compaction.type".to_string(), "twcs".to_string());
371
372        let meta = TableMetaBuilder::empty()
373            .schema(table_schema)
374            .primary_key_indices(vec![0, 1])
375            .value_indices(vec![2, 3])
376            .engine("mito".to_string())
377            .next_column_id(0)
378            .options(options)
379            .created_on(Default::default())
380            .build()
381            .unwrap();
382
383        let info = Arc::new(
384            TableInfoBuilder::default()
385                .table_id(1024)
386                .table_version(0 as TableVersion)
387                .name(table_name)
388                .schema_name(schema_name)
389                .catalog_name(catalog_name)
390                .desc(None)
391                .table_type(TableType::Base)
392                .meta(meta)
393                .build()
394                .unwrap(),
395        );
396
397        let stmt = create_table_stmt(&info, None, '"').unwrap();
398
399        let sql = format!("\n{}", stmt);
400        assert_eq!(
401            r#"
402CREATE TABLE IF NOT EXISTS "system_metrics" (
403  "id" INT UNSIGNED NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '4096', type = 'BLOOM'),
404  "host" STRING NULL INVERTED INDEX,
405  "cpu" DOUBLE NULL,
406  "disk" FLOAT NULL,
407  "msg" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
408  "embedding" VECTOR(4) NULL VECTOR INDEX WITH(connectivity = '16', engine = 'usearch', expansion_add = '128', expansion_search = '64', metric = 'l2sq'),
409  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
410  TIME INDEX ("ts"),
411  PRIMARY KEY ("id", "host")
412)
413ENGINE=mito
414WITH(
415  'compaction.type' = 'twcs',
416  ttl = '30s'
417)"#,
418            sql
419        );
420    }
421
422    #[test]
423    fn test_show_create_external_table_sql() {
424        let schema = vec![
425            ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
426            ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
427        ];
428        let table_schema = SchemaRef::new(Schema::new(schema));
429        let table_name = "system_metrics";
430        let schema_name = "public".to_string();
431        let catalog_name = "greptime".to_string();
432        let mut options: TableOptions = Default::default();
433        let _ = options
434            .extra_options
435            .insert(FILE_TABLE_LOCATION_KEY.to_string(), "foo.csv".to_string());
436        let _ = options.extra_options.insert(
437            FILE_TABLE_META_KEY.to_string(),
438            "{{\"files\":[\"foo.csv\"]}}".to_string(),
439        );
440        let _ = options
441            .extra_options
442            .insert(FILE_TABLE_FORMAT_KEY.to_string(), "csv".to_string());
443        let meta = TableMetaBuilder::empty()
444            .schema(table_schema)
445            .primary_key_indices(vec![])
446            .engine("file".to_string())
447            .next_column_id(0)
448            .options(options)
449            .created_on(Default::default())
450            .build()
451            .unwrap();
452
453        let info = Arc::new(
454            TableInfoBuilder::default()
455                .table_id(1024)
456                .table_version(0 as TableVersion)
457                .name(table_name)
458                .schema_name(schema_name)
459                .catalog_name(catalog_name)
460                .desc(None)
461                .table_type(TableType::Base)
462                .meta(meta)
463                .build()
464                .unwrap(),
465        );
466
467        let stmt = create_table_stmt(&info, None, '"').unwrap();
468
469        let sql = format!("\n{}", stmt);
470        assert_eq!(
471            r#"
472CREATE EXTERNAL TABLE IF NOT EXISTS "system_metrics" (
473  "host" STRING NULL,
474  "cpu" DOUBLE NULL,
475
476)
477ENGINE=file
478WITH(
479  format = 'csv',
480  location = 'foo.csv'
481)"#,
482            sql
483        );
484    }
485}