datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use column_schema::ColumnExtType;
25use datafusion_common::DFSchemaRef;
26use snafu::{ensure, ResultExt};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31    ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
32    SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
33    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
34    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
35    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
36    SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
37};
38pub use crate::schema::constraint::ColumnDefaultConstraint;
39pub use crate::schema::raw::RawSchema;
40
41/// Key used to store version number of the schema in metadata.
42pub const VERSION_KEY: &str = "greptime:version";
43/// Key used to store actual column type in field metadata.
44pub const TYPE_KEY: &str = "greptime:type";
45
46/// A common schema, should be immutable.
47#[derive(Clone, PartialEq, Eq)]
48pub struct Schema {
49    column_schemas: Vec<ColumnSchema>,
50    name_to_index: HashMap<String, usize>,
51    arrow_schema: Arc<ArrowSchema>,
52    /// Index of the timestamp key column.
53    ///
54    /// Timestamp key column is the column holds the timestamp and forms part of
55    /// the primary key. None means there is no timestamp key column.
56    timestamp_index: Option<usize>,
57    /// Version of the schema.
58    ///
59    /// Initial value is zero. The version should bump after altering schema.
60    version: u32,
61}
62
63impl fmt::Debug for Schema {
64    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65        f.debug_struct("Schema")
66            .field("column_schemas", &self.column_schemas)
67            .field("name_to_index", &self.name_to_index)
68            .field("timestamp_index", &self.timestamp_index)
69            .field("version", &self.version)
70            .finish()
71    }
72}
73
74impl Schema {
75    /// Initial version of the schema.
76    pub const INITIAL_VERSION: u32 = 0;
77
78    /// Create a schema from a vector of [ColumnSchema].
79    ///
80    /// # Panics
81    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
82    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
83        // Builder won't fail in this case
84        SchemaBuilder::try_from(column_schemas)
85            .unwrap()
86            .build()
87            .unwrap()
88    }
89
90    /// Try to Create a schema from a vector of [ColumnSchema].
91    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
92        SchemaBuilder::try_from(column_schemas)?.build()
93    }
94
95    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
96        &self.arrow_schema
97    }
98
99    pub fn column_schemas(&self) -> &[ColumnSchema] {
100        &self.column_schemas
101    }
102
103    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
104        self.name_to_index
105            .get(name)
106            .map(|index| &self.column_schemas[*index])
107    }
108
109    /// Retrieve the column's name by index
110    /// # Panics
111    /// This method **may** panic if the index is out of range of column schemas.
112    pub fn column_name_by_index(&self, idx: usize) -> &str {
113        &self.column_schemas[idx].name
114    }
115
116    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
117        self.name_to_index.get(name).copied()
118    }
119
120    pub fn contains_column(&self, name: &str) -> bool {
121        self.name_to_index.contains_key(name)
122    }
123
124    pub fn num_columns(&self) -> usize {
125        self.column_schemas.len()
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.column_schemas.is_empty()
130    }
131
132    /// Returns index of the timestamp key column.
133    pub fn timestamp_index(&self) -> Option<usize> {
134        self.timestamp_index
135    }
136
137    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
138        self.timestamp_index.map(|idx| &self.column_schemas[idx])
139    }
140
141    pub fn version(&self) -> u32 {
142        self.version
143    }
144
145    pub fn metadata(&self) -> &HashMap<String, String> {
146        &self.arrow_schema.metadata
147    }
148
149    /// Generate a new projected schema
150    ///
151    /// # Panic
152    ///
153    /// If the index out ouf bound
154    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
155        let mut column_schemas = Vec::with_capacity(indices.len());
156        let mut timestamp_index = None;
157        for index in indices {
158            if let Some(ts_index) = self.timestamp_index
159                && ts_index == *index
160            {
161                timestamp_index = Some(column_schemas.len());
162            }
163            column_schemas.push(self.column_schemas[*index].clone());
164        }
165        let arrow_schema = self
166            .arrow_schema
167            .project(indices)
168            .context(ProjectArrowSchemaSnafu)?;
169        let name_to_index = column_schemas
170            .iter()
171            .enumerate()
172            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
173            .collect();
174
175        Ok(Self {
176            column_schemas,
177            name_to_index,
178            arrow_schema: Arc::new(arrow_schema),
179            timestamp_index,
180            version: self.version,
181        })
182    }
183}
184
185#[derive(Default)]
186pub struct SchemaBuilder {
187    column_schemas: Vec<ColumnSchema>,
188    name_to_index: HashMap<String, usize>,
189    fields: Vec<Field>,
190    timestamp_index: Option<usize>,
191    version: u32,
192    metadata: HashMap<String, String>,
193}
194
195impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
196    type Error = Error;
197
198    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
199        SchemaBuilder::try_from_columns(column_schemas)
200    }
201}
202
203impl SchemaBuilder {
204    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
205        let FieldsAndIndices {
206            fields,
207            name_to_index,
208            timestamp_index,
209        } = collect_fields(&column_schemas)?;
210
211        Ok(Self {
212            column_schemas,
213            name_to_index,
214            fields,
215            timestamp_index,
216            ..Default::default()
217        })
218    }
219
220    pub fn version(mut self, version: u32) -> Self {
221        self.version = version;
222        self
223    }
224
225    /// Add key value pair to metadata.
226    ///
227    /// Old metadata with same key would be overwritten.
228    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
229        let _ = self.metadata.insert(key.into(), value.into());
230        self
231    }
232
233    pub fn build(mut self) -> Result<Schema> {
234        if let Some(timestamp_index) = self.timestamp_index {
235            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
236        }
237
238        self.metadata
239            .insert(VERSION_KEY.to_string(), self.version.to_string());
240
241        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
242
243        Ok(Schema {
244            column_schemas: self.column_schemas,
245            name_to_index: self.name_to_index,
246            arrow_schema: Arc::new(arrow_schema),
247            timestamp_index: self.timestamp_index,
248            version: self.version,
249        })
250    }
251}
252
253struct FieldsAndIndices {
254    fields: Vec<Field>,
255    name_to_index: HashMap<String, usize>,
256    timestamp_index: Option<usize>,
257}
258
259fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
260    let mut fields = Vec::with_capacity(column_schemas.len());
261    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
262    let mut timestamp_index = None;
263    for (index, column_schema) in column_schemas.iter().enumerate() {
264        if column_schema.is_time_index() && timestamp_index.is_none() {
265            timestamp_index = Some(index);
266        }
267        let mut field = Field::try_from(column_schema)?;
268
269        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
270        let extype = match column_schema.data_type {
271            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
272            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
273            _ => None,
274        };
275        if let Some(extype) = extype {
276            let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
277            field = field.with_metadata(metadata);
278        }
279        fields.push(field);
280        ensure!(
281            name_to_index
282                .insert(column_schema.name.clone(), index)
283                .is_none(),
284            DuplicateColumnSnafu {
285                column: &column_schema.name,
286            }
287        );
288    }
289
290    Ok(FieldsAndIndices {
291        fields,
292        name_to_index,
293        timestamp_index,
294    })
295}
296
297fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
298    ensure!(
299        timestamp_index < column_schemas.len(),
300        error::InvalidTimestampIndexSnafu {
301            index: timestamp_index,
302        }
303    );
304
305    let column_schema = &column_schemas[timestamp_index];
306    ensure!(
307        column_schema.data_type.is_timestamp(),
308        error::InvalidTimestampIndexSnafu {
309            index: timestamp_index,
310        }
311    );
312    ensure!(
313        column_schema.is_time_index(),
314        error::InvalidTimestampIndexSnafu {
315            index: timestamp_index,
316        }
317    );
318
319    Ok(())
320}
321
322pub type SchemaRef = Arc<Schema>;
323
324impl TryFrom<Arc<ArrowSchema>> for Schema {
325    type Error = Error;
326
327    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
328        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
329        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
330        for field in &arrow_schema.fields {
331            let column_schema = ColumnSchema::try_from(field.as_ref())?;
332            let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
333            column_schemas.push(column_schema);
334        }
335
336        let mut timestamp_index = None;
337        for (index, column_schema) in column_schemas.iter().enumerate() {
338            if column_schema.is_time_index() {
339                validate_timestamp_index(&column_schemas, index)?;
340                timestamp_index = Some(index);
341                break;
342            }
343        }
344
345        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
346
347        Ok(Self {
348            column_schemas,
349            name_to_index,
350            arrow_schema,
351            timestamp_index,
352            version,
353        })
354    }
355}
356
357impl TryFrom<ArrowSchema> for Schema {
358    type Error = Error;
359
360    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
361        let arrow_schema = Arc::new(arrow_schema);
362
363        Schema::try_from(arrow_schema)
364    }
365}
366
367impl TryFrom<DFSchemaRef> for Schema {
368    type Error = Error;
369
370    fn try_from(value: DFSchemaRef) -> Result<Self> {
371        let s: ArrowSchema = value.as_ref().into();
372        s.try_into()
373    }
374}
375
376fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
377    if let Some(value) = metadata.get(key) {
378        let version = value
379            .parse()
380            .context(error::ParseSchemaVersionSnafu { value })?;
381
382        Ok(version)
383    } else {
384        Ok(Schema::INITIAL_VERSION)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::data_type::ConcreteDataType;
392
393    #[test]
394    fn test_build_empty_schema() {
395        let schema = SchemaBuilder::default().build().unwrap();
396        assert_eq!(0, schema.num_columns());
397        assert!(schema.is_empty());
398    }
399
400    #[test]
401    fn test_schema_no_timestamp() {
402        let column_schemas = vec![
403            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
404            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
405        ];
406        let schema = Schema::new(column_schemas.clone());
407
408        assert_eq!(2, schema.num_columns());
409        assert!(!schema.is_empty());
410        assert!(schema.timestamp_index().is_none());
411        assert!(schema.timestamp_column().is_none());
412        assert_eq!(Schema::INITIAL_VERSION, schema.version());
413
414        for column_schema in &column_schemas {
415            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
416            assert_eq!(column_schema, found);
417        }
418        assert!(schema.column_schema_by_name("col3").is_none());
419
420        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
421
422        assert_eq!(schema, new_schema);
423        assert_eq!(column_schemas, schema.column_schemas());
424    }
425
426    #[test]
427    fn test_schema_duplicate_column() {
428        let column_schemas = vec![
429            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
430            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
431        ];
432        let err = Schema::try_new(column_schemas).unwrap_err();
433
434        assert!(
435            matches!(err, Error::DuplicateColumn { .. }),
436            "expect DuplicateColumn, found {}",
437            err
438        );
439    }
440
441    #[test]
442    fn test_metadata() {
443        let column_schemas = vec![ColumnSchema::new(
444            "col1",
445            ConcreteDataType::int32_datatype(),
446            false,
447        )];
448        let schema = SchemaBuilder::try_from(column_schemas)
449            .unwrap()
450            .add_metadata("k1", "v1")
451            .build()
452            .unwrap();
453
454        assert_eq!("v1", schema.metadata().get("k1").unwrap());
455    }
456
457    #[test]
458    fn test_schema_with_timestamp() {
459        let column_schemas = vec![
460            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
461            ColumnSchema::new(
462                "ts",
463                ConcreteDataType::timestamp_millisecond_datatype(),
464                false,
465            )
466            .with_time_index(true),
467        ];
468        let schema = SchemaBuilder::try_from(column_schemas.clone())
469            .unwrap()
470            .version(123)
471            .build()
472            .unwrap();
473
474        assert_eq!(1, schema.timestamp_index().unwrap());
475        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
476        assert_eq!(123, schema.version());
477
478        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
479        assert_eq!(1, schema.timestamp_index().unwrap());
480        assert_eq!(schema, new_schema);
481    }
482
483    #[test]
484    fn test_schema_wrong_timestamp() {
485        let column_schemas = vec![
486            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
487                .with_time_index(true),
488            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
489        ];
490        assert!(SchemaBuilder::try_from(column_schemas)
491            .unwrap()
492            .build()
493            .is_err());
494
495        let column_schemas = vec![
496            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
497            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
498                .with_time_index(true),
499        ];
500
501        assert!(SchemaBuilder::try_from(column_schemas)
502            .unwrap()
503            .build()
504            .is_err());
505    }
506}