datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30    COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
35    FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
36    SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY,
37};
38pub use crate::schema::constraint::ColumnDefaultConstraint;
39pub use crate::schema::raw::RawSchema;
40
41/// Key used to store version number of the schema in metadata.
42pub const VERSION_KEY: &str = "greptime:version";
43/// Key used to store actual column type in field metadata.
44pub const TYPE_KEY: &str = "greptime:type";
45
46/// A common schema, should be immutable.
47#[derive(Clone, PartialEq, Eq)]
48pub struct Schema {
49    column_schemas: Vec<ColumnSchema>,
50    name_to_index: HashMap<String, usize>,
51    arrow_schema: Arc<ArrowSchema>,
52    /// Index of the timestamp key column.
53    ///
54    /// Timestamp key column is the column holds the timestamp and forms part of
55    /// the primary key. None means there is no timestamp key column.
56    timestamp_index: Option<usize>,
57    /// Version of the schema.
58    ///
59    /// Initial value is zero. The version should bump after altering schema.
60    version: u32,
61}
62
63impl fmt::Debug for Schema {
64    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65        f.debug_struct("Schema")
66            .field("column_schemas", &self.column_schemas)
67            .field("name_to_index", &self.name_to_index)
68            .field("timestamp_index", &self.timestamp_index)
69            .field("version", &self.version)
70            .finish()
71    }
72}
73
74impl Schema {
75    /// Initial version of the schema.
76    pub const INITIAL_VERSION: u32 = 0;
77
78    /// Create a schema from a vector of [ColumnSchema].
79    ///
80    /// # Panics
81    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
82    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
83        // Builder won't fail in this case
84        SchemaBuilder::try_from(column_schemas)
85            .unwrap()
86            .build()
87            .unwrap()
88    }
89
90    /// Try to Create a schema from a vector of [ColumnSchema].
91    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
92        SchemaBuilder::try_from(column_schemas)?.build()
93    }
94
95    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
96        &self.arrow_schema
97    }
98
99    pub fn column_schemas(&self) -> &[ColumnSchema] {
100        &self.column_schemas
101    }
102
103    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
104        self.name_to_index
105            .get(name)
106            .map(|index| &self.column_schemas[*index])
107    }
108
109    /// Retrieve the column's name by index
110    /// # Panics
111    /// This method **may** panic if the index is out of range of column schemas.
112    pub fn column_name_by_index(&self, idx: usize) -> &str {
113        &self.column_schemas[idx].name
114    }
115
116    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
117        self.name_to_index.get(name).copied()
118    }
119
120    pub fn contains_column(&self, name: &str) -> bool {
121        self.name_to_index.contains_key(name)
122    }
123
124    pub fn num_columns(&self) -> usize {
125        self.column_schemas.len()
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.column_schemas.is_empty()
130    }
131
132    /// Returns index of the timestamp key column.
133    pub fn timestamp_index(&self) -> Option<usize> {
134        self.timestamp_index
135    }
136
137    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
138        self.timestamp_index.map(|idx| &self.column_schemas[idx])
139    }
140
141    pub fn version(&self) -> u32 {
142        self.version
143    }
144
145    pub fn metadata(&self) -> &HashMap<String, String> {
146        &self.arrow_schema.metadata
147    }
148
149    /// Generate a new projected schema
150    ///
151    /// # Panic
152    ///
153    /// If the index out ouf bound
154    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
155        let mut column_schemas = Vec::with_capacity(indices.len());
156        let mut timestamp_index = None;
157        for index in indices {
158            if let Some(ts_index) = self.timestamp_index
159                && ts_index == *index
160            {
161                timestamp_index = Some(column_schemas.len());
162            }
163            column_schemas.push(self.column_schemas[*index].clone());
164        }
165        let arrow_schema = self
166            .arrow_schema
167            .project(indices)
168            .context(ProjectArrowSchemaSnafu)?;
169        let name_to_index = column_schemas
170            .iter()
171            .enumerate()
172            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
173            .collect();
174
175        Ok(Self {
176            column_schemas,
177            name_to_index,
178            arrow_schema: Arc::new(arrow_schema),
179            timestamp_index,
180            version: self.version,
181        })
182    }
183}
184
185#[derive(Default)]
186pub struct SchemaBuilder {
187    column_schemas: Vec<ColumnSchema>,
188    name_to_index: HashMap<String, usize>,
189    fields: Vec<Field>,
190    timestamp_index: Option<usize>,
191    version: u32,
192    metadata: HashMap<String, String>,
193}
194
195impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
196    type Error = Error;
197
198    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
199        SchemaBuilder::try_from_columns(column_schemas)
200    }
201}
202
203impl SchemaBuilder {
204    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
205        let FieldsAndIndices {
206            fields,
207            name_to_index,
208            timestamp_index,
209        } = collect_fields(&column_schemas)?;
210
211        Ok(Self {
212            column_schemas,
213            name_to_index,
214            fields,
215            timestamp_index,
216            ..Default::default()
217        })
218    }
219
220    pub fn version(mut self, version: u32) -> Self {
221        self.version = version;
222        self
223    }
224
225    /// Add key value pair to metadata.
226    ///
227    /// Old metadata with same key would be overwritten.
228    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
229        let _ = self.metadata.insert(key.into(), value.into());
230        self
231    }
232
233    pub fn build(mut self) -> Result<Schema> {
234        if let Some(timestamp_index) = self.timestamp_index {
235            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
236        }
237
238        self.metadata
239            .insert(VERSION_KEY.to_string(), self.version.to_string());
240
241        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
242
243        Ok(Schema {
244            column_schemas: self.column_schemas,
245            name_to_index: self.name_to_index,
246            arrow_schema: Arc::new(arrow_schema),
247            timestamp_index: self.timestamp_index,
248            version: self.version,
249        })
250    }
251}
252
253struct FieldsAndIndices {
254    fields: Vec<Field>,
255    name_to_index: HashMap<String, usize>,
256    timestamp_index: Option<usize>,
257}
258
259fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
260    let mut fields = Vec::with_capacity(column_schemas.len());
261    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
262    let mut timestamp_index = None;
263    for (index, column_schema) in column_schemas.iter().enumerate() {
264        if column_schema.is_time_index() && timestamp_index.is_none() {
265            timestamp_index = Some(index);
266        }
267        let mut field = Field::try_from(column_schema)?;
268
269        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
270        let extype = match column_schema.data_type {
271            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
272            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
273            _ => None,
274        };
275        if let Some(extype) = extype {
276            let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
277            field = field.with_metadata(metadata);
278        }
279        fields.push(field);
280        ensure!(
281            name_to_index
282                .insert(column_schema.name.clone(), index)
283                .is_none(),
284            DuplicateColumnSnafu {
285                column: &column_schema.name,
286            }
287        );
288    }
289
290    Ok(FieldsAndIndices {
291        fields,
292        name_to_index,
293        timestamp_index,
294    })
295}
296
297fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
298    ensure!(
299        timestamp_index < column_schemas.len(),
300        error::InvalidTimestampIndexSnafu {
301            index: timestamp_index,
302        }
303    );
304
305    let column_schema = &column_schemas[timestamp_index];
306    ensure!(
307        column_schema.data_type.is_timestamp(),
308        error::InvalidTimestampIndexSnafu {
309            index: timestamp_index,
310        }
311    );
312    ensure!(
313        column_schema.is_time_index(),
314        error::InvalidTimestampIndexSnafu {
315            index: timestamp_index,
316        }
317    );
318
319    Ok(())
320}
321
322pub type SchemaRef = Arc<Schema>;
323
324impl TryFrom<Arc<ArrowSchema>> for Schema {
325    type Error = Error;
326
327    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
328        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
329        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
330        for field in &arrow_schema.fields {
331            let column_schema = ColumnSchema::try_from(field.as_ref())?;
332            let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
333            column_schemas.push(column_schema);
334        }
335
336        let mut timestamp_index = None;
337        for (index, column_schema) in column_schemas.iter().enumerate() {
338            if column_schema.is_time_index() {
339                validate_timestamp_index(&column_schemas, index)?;
340                timestamp_index = Some(index);
341                break;
342            }
343        }
344
345        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
346
347        Ok(Self {
348            column_schemas,
349            name_to_index,
350            arrow_schema,
351            timestamp_index,
352            version,
353        })
354    }
355}
356
357impl TryFrom<ArrowSchema> for Schema {
358    type Error = Error;
359
360    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
361        let arrow_schema = Arc::new(arrow_schema);
362
363        Schema::try_from(arrow_schema)
364    }
365}
366
367impl TryFrom<DFSchemaRef> for Schema {
368    type Error = Error;
369
370    fn try_from(value: DFSchemaRef) -> Result<Self> {
371        value.inner().clone().try_into()
372    }
373}
374
375fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
376    if let Some(value) = metadata.get(key) {
377        let version = value
378            .parse()
379            .context(error::ParseSchemaVersionSnafu { value })?;
380
381        Ok(version)
382    } else {
383        Ok(Schema::INITIAL_VERSION)
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::data_type::ConcreteDataType;
391
392    #[test]
393    fn test_build_empty_schema() {
394        let schema = SchemaBuilder::default().build().unwrap();
395        assert_eq!(0, schema.num_columns());
396        assert!(schema.is_empty());
397    }
398
399    #[test]
400    fn test_schema_no_timestamp() {
401        let column_schemas = vec![
402            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
403            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
404        ];
405        let schema = Schema::new(column_schemas.clone());
406
407        assert_eq!(2, schema.num_columns());
408        assert!(!schema.is_empty());
409        assert!(schema.timestamp_index().is_none());
410        assert!(schema.timestamp_column().is_none());
411        assert_eq!(Schema::INITIAL_VERSION, schema.version());
412
413        for column_schema in &column_schemas {
414            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
415            assert_eq!(column_schema, found);
416        }
417        assert!(schema.column_schema_by_name("col3").is_none());
418
419        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
420
421        assert_eq!(schema, new_schema);
422        assert_eq!(column_schemas, schema.column_schemas());
423    }
424
425    #[test]
426    fn test_schema_duplicate_column() {
427        let column_schemas = vec![
428            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
429            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
430        ];
431        let err = Schema::try_new(column_schemas).unwrap_err();
432
433        assert!(
434            matches!(err, Error::DuplicateColumn { .. }),
435            "expect DuplicateColumn, found {}",
436            err
437        );
438    }
439
440    #[test]
441    fn test_metadata() {
442        let column_schemas = vec![ColumnSchema::new(
443            "col1",
444            ConcreteDataType::int32_datatype(),
445            false,
446        )];
447        let schema = SchemaBuilder::try_from(column_schemas)
448            .unwrap()
449            .add_metadata("k1", "v1")
450            .build()
451            .unwrap();
452
453        assert_eq!("v1", schema.metadata().get("k1").unwrap());
454    }
455
456    #[test]
457    fn test_schema_with_timestamp() {
458        let column_schemas = vec![
459            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
460            ColumnSchema::new(
461                "ts",
462                ConcreteDataType::timestamp_millisecond_datatype(),
463                false,
464            )
465            .with_time_index(true),
466        ];
467        let schema = SchemaBuilder::try_from(column_schemas.clone())
468            .unwrap()
469            .version(123)
470            .build()
471            .unwrap();
472
473        assert_eq!(1, schema.timestamp_index().unwrap());
474        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
475        assert_eq!(123, schema.version());
476
477        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
478        assert_eq!(1, schema.timestamp_index().unwrap());
479        assert_eq!(schema, new_schema);
480    }
481
482    #[test]
483    fn test_schema_wrong_timestamp() {
484        let column_schemas = vec![
485            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
486                .with_time_index(true),
487            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
488        ];
489        assert!(
490            SchemaBuilder::try_from(column_schemas)
491                .unwrap()
492                .build()
493                .is_err()
494        );
495
496        let column_schemas = vec![
497            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
498            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
499                .with_time_index(true),
500        ];
501
502        assert!(
503            SchemaBuilder::try_from(column_schemas)
504                .unwrap()
505                .build()
506                .is_err()
507        );
508    }
509}