datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use column_schema::ColumnExtType;
25use datafusion_common::DFSchemaRef;
26use snafu::{ensure, ResultExt};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31    ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
32    SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
33    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
34    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
35    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
36    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
37    FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
38};
39pub use crate::schema::constraint::ColumnDefaultConstraint;
40pub use crate::schema::raw::RawSchema;
41
42/// Key used to store version number of the schema in metadata.
43pub const VERSION_KEY: &str = "greptime:version";
44/// Key used to store actual column type in field metadata.
45pub const TYPE_KEY: &str = "greptime:type";
46
47/// A common schema, should be immutable.
48#[derive(Clone, PartialEq, Eq)]
49pub struct Schema {
50    column_schemas: Vec<ColumnSchema>,
51    name_to_index: HashMap<String, usize>,
52    arrow_schema: Arc<ArrowSchema>,
53    /// Index of the timestamp key column.
54    ///
55    /// Timestamp key column is the column holds the timestamp and forms part of
56    /// the primary key. None means there is no timestamp key column.
57    timestamp_index: Option<usize>,
58    /// Version of the schema.
59    ///
60    /// Initial value is zero. The version should bump after altering schema.
61    version: u32,
62}
63
64impl fmt::Debug for Schema {
65    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66        f.debug_struct("Schema")
67            .field("column_schemas", &self.column_schemas)
68            .field("name_to_index", &self.name_to_index)
69            .field("timestamp_index", &self.timestamp_index)
70            .field("version", &self.version)
71            .finish()
72    }
73}
74
75impl Schema {
76    /// Initial version of the schema.
77    pub const INITIAL_VERSION: u32 = 0;
78
79    /// Create a schema from a vector of [ColumnSchema].
80    ///
81    /// # Panics
82    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
83    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
84        // Builder won't fail in this case
85        SchemaBuilder::try_from(column_schemas)
86            .unwrap()
87            .build()
88            .unwrap()
89    }
90
91    /// Try to Create a schema from a vector of [ColumnSchema].
92    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
93        SchemaBuilder::try_from(column_schemas)?.build()
94    }
95
96    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
97        &self.arrow_schema
98    }
99
100    pub fn column_schemas(&self) -> &[ColumnSchema] {
101        &self.column_schemas
102    }
103
104    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
105        self.name_to_index
106            .get(name)
107            .map(|index| &self.column_schemas[*index])
108    }
109
110    /// Retrieve the column's name by index
111    /// # Panics
112    /// This method **may** panic if the index is out of range of column schemas.
113    pub fn column_name_by_index(&self, idx: usize) -> &str {
114        &self.column_schemas[idx].name
115    }
116
117    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
118        self.name_to_index.get(name).copied()
119    }
120
121    pub fn contains_column(&self, name: &str) -> bool {
122        self.name_to_index.contains_key(name)
123    }
124
125    pub fn num_columns(&self) -> usize {
126        self.column_schemas.len()
127    }
128
129    pub fn is_empty(&self) -> bool {
130        self.column_schemas.is_empty()
131    }
132
133    /// Returns index of the timestamp key column.
134    pub fn timestamp_index(&self) -> Option<usize> {
135        self.timestamp_index
136    }
137
138    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
139        self.timestamp_index.map(|idx| &self.column_schemas[idx])
140    }
141
142    pub fn version(&self) -> u32 {
143        self.version
144    }
145
146    pub fn metadata(&self) -> &HashMap<String, String> {
147        &self.arrow_schema.metadata
148    }
149
150    /// Generate a new projected schema
151    ///
152    /// # Panic
153    ///
154    /// If the index out ouf bound
155    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
156        let mut column_schemas = Vec::with_capacity(indices.len());
157        let mut timestamp_index = None;
158        for index in indices {
159            if let Some(ts_index) = self.timestamp_index
160                && ts_index == *index
161            {
162                timestamp_index = Some(column_schemas.len());
163            }
164            column_schemas.push(self.column_schemas[*index].clone());
165        }
166        let arrow_schema = self
167            .arrow_schema
168            .project(indices)
169            .context(ProjectArrowSchemaSnafu)?;
170        let name_to_index = column_schemas
171            .iter()
172            .enumerate()
173            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
174            .collect();
175
176        Ok(Self {
177            column_schemas,
178            name_to_index,
179            arrow_schema: Arc::new(arrow_schema),
180            timestamp_index,
181            version: self.version,
182        })
183    }
184}
185
186#[derive(Default)]
187pub struct SchemaBuilder {
188    column_schemas: Vec<ColumnSchema>,
189    name_to_index: HashMap<String, usize>,
190    fields: Vec<Field>,
191    timestamp_index: Option<usize>,
192    version: u32,
193    metadata: HashMap<String, String>,
194}
195
196impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
197    type Error = Error;
198
199    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
200        SchemaBuilder::try_from_columns(column_schemas)
201    }
202}
203
204impl SchemaBuilder {
205    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
206        let FieldsAndIndices {
207            fields,
208            name_to_index,
209            timestamp_index,
210        } = collect_fields(&column_schemas)?;
211
212        Ok(Self {
213            column_schemas,
214            name_to_index,
215            fields,
216            timestamp_index,
217            ..Default::default()
218        })
219    }
220
221    pub fn version(mut self, version: u32) -> Self {
222        self.version = version;
223        self
224    }
225
226    /// Add key value pair to metadata.
227    ///
228    /// Old metadata with same key would be overwritten.
229    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
230        let _ = self.metadata.insert(key.into(), value.into());
231        self
232    }
233
234    pub fn build(mut self) -> Result<Schema> {
235        if let Some(timestamp_index) = self.timestamp_index {
236            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
237        }
238
239        self.metadata
240            .insert(VERSION_KEY.to_string(), self.version.to_string());
241
242        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
243
244        Ok(Schema {
245            column_schemas: self.column_schemas,
246            name_to_index: self.name_to_index,
247            arrow_schema: Arc::new(arrow_schema),
248            timestamp_index: self.timestamp_index,
249            version: self.version,
250        })
251    }
252}
253
254struct FieldsAndIndices {
255    fields: Vec<Field>,
256    name_to_index: HashMap<String, usize>,
257    timestamp_index: Option<usize>,
258}
259
260fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
261    let mut fields = Vec::with_capacity(column_schemas.len());
262    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
263    let mut timestamp_index = None;
264    for (index, column_schema) in column_schemas.iter().enumerate() {
265        if column_schema.is_time_index() && timestamp_index.is_none() {
266            timestamp_index = Some(index);
267        }
268        let mut field = Field::try_from(column_schema)?;
269
270        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
271        let extype = match column_schema.data_type {
272            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
273            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
274            _ => None,
275        };
276        if let Some(extype) = extype {
277            let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
278            field = field.with_metadata(metadata);
279        }
280        fields.push(field);
281        ensure!(
282            name_to_index
283                .insert(column_schema.name.clone(), index)
284                .is_none(),
285            DuplicateColumnSnafu {
286                column: &column_schema.name,
287            }
288        );
289    }
290
291    Ok(FieldsAndIndices {
292        fields,
293        name_to_index,
294        timestamp_index,
295    })
296}
297
298fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
299    ensure!(
300        timestamp_index < column_schemas.len(),
301        error::InvalidTimestampIndexSnafu {
302            index: timestamp_index,
303        }
304    );
305
306    let column_schema = &column_schemas[timestamp_index];
307    ensure!(
308        column_schema.data_type.is_timestamp(),
309        error::InvalidTimestampIndexSnafu {
310            index: timestamp_index,
311        }
312    );
313    ensure!(
314        column_schema.is_time_index(),
315        error::InvalidTimestampIndexSnafu {
316            index: timestamp_index,
317        }
318    );
319
320    Ok(())
321}
322
323pub type SchemaRef = Arc<Schema>;
324
325impl TryFrom<Arc<ArrowSchema>> for Schema {
326    type Error = Error;
327
328    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
329        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
330        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
331        for field in &arrow_schema.fields {
332            let column_schema = ColumnSchema::try_from(field.as_ref())?;
333            let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
334            column_schemas.push(column_schema);
335        }
336
337        let mut timestamp_index = None;
338        for (index, column_schema) in column_schemas.iter().enumerate() {
339            if column_schema.is_time_index() {
340                validate_timestamp_index(&column_schemas, index)?;
341                timestamp_index = Some(index);
342                break;
343            }
344        }
345
346        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
347
348        Ok(Self {
349            column_schemas,
350            name_to_index,
351            arrow_schema,
352            timestamp_index,
353            version,
354        })
355    }
356}
357
358impl TryFrom<ArrowSchema> for Schema {
359    type Error = Error;
360
361    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
362        let arrow_schema = Arc::new(arrow_schema);
363
364        Schema::try_from(arrow_schema)
365    }
366}
367
368impl TryFrom<DFSchemaRef> for Schema {
369    type Error = Error;
370
371    fn try_from(value: DFSchemaRef) -> Result<Self> {
372        let s: ArrowSchema = value.as_ref().into();
373        s.try_into()
374    }
375}
376
377fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
378    if let Some(value) = metadata.get(key) {
379        let version = value
380            .parse()
381            .context(error::ParseSchemaVersionSnafu { value })?;
382
383        Ok(version)
384    } else {
385        Ok(Schema::INITIAL_VERSION)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::data_type::ConcreteDataType;
393
394    #[test]
395    fn test_build_empty_schema() {
396        let schema = SchemaBuilder::default().build().unwrap();
397        assert_eq!(0, schema.num_columns());
398        assert!(schema.is_empty());
399    }
400
401    #[test]
402    fn test_schema_no_timestamp() {
403        let column_schemas = vec![
404            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
405            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
406        ];
407        let schema = Schema::new(column_schemas.clone());
408
409        assert_eq!(2, schema.num_columns());
410        assert!(!schema.is_empty());
411        assert!(schema.timestamp_index().is_none());
412        assert!(schema.timestamp_column().is_none());
413        assert_eq!(Schema::INITIAL_VERSION, schema.version());
414
415        for column_schema in &column_schemas {
416            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
417            assert_eq!(column_schema, found);
418        }
419        assert!(schema.column_schema_by_name("col3").is_none());
420
421        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
422
423        assert_eq!(schema, new_schema);
424        assert_eq!(column_schemas, schema.column_schemas());
425    }
426
427    #[test]
428    fn test_schema_duplicate_column() {
429        let column_schemas = vec![
430            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
431            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
432        ];
433        let err = Schema::try_new(column_schemas).unwrap_err();
434
435        assert!(
436            matches!(err, Error::DuplicateColumn { .. }),
437            "expect DuplicateColumn, found {}",
438            err
439        );
440    }
441
442    #[test]
443    fn test_metadata() {
444        let column_schemas = vec![ColumnSchema::new(
445            "col1",
446            ConcreteDataType::int32_datatype(),
447            false,
448        )];
449        let schema = SchemaBuilder::try_from(column_schemas)
450            .unwrap()
451            .add_metadata("k1", "v1")
452            .build()
453            .unwrap();
454
455        assert_eq!("v1", schema.metadata().get("k1").unwrap());
456    }
457
458    #[test]
459    fn test_schema_with_timestamp() {
460        let column_schemas = vec![
461            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
462            ColumnSchema::new(
463                "ts",
464                ConcreteDataType::timestamp_millisecond_datatype(),
465                false,
466            )
467            .with_time_index(true),
468        ];
469        let schema = SchemaBuilder::try_from(column_schemas.clone())
470            .unwrap()
471            .version(123)
472            .build()
473            .unwrap();
474
475        assert_eq!(1, schema.timestamp_index().unwrap());
476        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
477        assert_eq!(123, schema.version());
478
479        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
480        assert_eq!(1, schema.timestamp_index().unwrap());
481        assert_eq!(schema, new_schema);
482    }
483
484    #[test]
485    fn test_schema_wrong_timestamp() {
486        let column_schemas = vec![
487            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
488                .with_time_index(true),
489            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
490        ];
491        assert!(SchemaBuilder::try_from(column_schemas)
492            .unwrap()
493            .build()
494            .is_err());
495
496        let column_schemas = vec![
497            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
498            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
499                .with_time_index(true),
500        ];
501
502        assert!(SchemaBuilder::try_from(column_schemas)
503            .unwrap()
504            .build()
505            .is_err());
506    }
507}