datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30    COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35    COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36    COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37    ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38    INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39    TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40    VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43pub use crate::schema::raw::RawSchema;
44
45/// Key used to store version number of the schema in metadata.
46pub const VERSION_KEY: &str = "greptime:version";
47/// Key used to store actual column type in field metadata.
48pub const TYPE_KEY: &str = "greptime:type";
49
50/// A common schema, should be immutable.
51#[derive(Clone, PartialEq, Eq)]
52pub struct Schema {
53    column_schemas: Vec<ColumnSchema>,
54    name_to_index: HashMap<String, usize>,
55    arrow_schema: Arc<ArrowSchema>,
56    /// Index of the timestamp key column.
57    ///
58    /// Timestamp key column is the column holds the timestamp and forms part of
59    /// the primary key. None means there is no timestamp key column.
60    timestamp_index: Option<usize>,
61    /// Version of the schema.
62    ///
63    /// Initial value is zero. The version should bump after altering schema.
64    version: u32,
65}
66
67impl fmt::Debug for Schema {
68    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69        f.debug_struct("Schema")
70            .field("column_schemas", &self.column_schemas)
71            .field("name_to_index", &self.name_to_index)
72            .field("timestamp_index", &self.timestamp_index)
73            .field("version", &self.version)
74            .finish()
75    }
76}
77
78impl Schema {
79    /// Initial version of the schema.
80    pub const INITIAL_VERSION: u32 = 0;
81
82    /// Create a schema from a vector of [ColumnSchema].
83    ///
84    /// # Panics
85    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
86    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
87        // Builder won't fail in this case
88        SchemaBuilder::try_from(column_schemas)
89            .unwrap()
90            .build()
91            .unwrap()
92    }
93
94    /// Try to Create a schema from a vector of [ColumnSchema].
95    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
96        SchemaBuilder::try_from(column_schemas)?.build()
97    }
98
99    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
100        &self.arrow_schema
101    }
102
103    pub fn column_schemas(&self) -> &[ColumnSchema] {
104        &self.column_schemas
105    }
106
107    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
108        self.name_to_index
109            .get(name)
110            .map(|index| &self.column_schemas[*index])
111    }
112
113    /// Retrieve the column's name by index
114    /// # Panics
115    /// This method **may** panic if the index is out of range of column schemas.
116    pub fn column_name_by_index(&self, idx: usize) -> &str {
117        &self.column_schemas[idx].name
118    }
119
120    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
121        self.name_to_index.get(name).copied()
122    }
123
124    pub fn contains_column(&self, name: &str) -> bool {
125        self.name_to_index.contains_key(name)
126    }
127
128    pub fn num_columns(&self) -> usize {
129        self.column_schemas.len()
130    }
131
132    pub fn is_empty(&self) -> bool {
133        self.column_schemas.is_empty()
134    }
135
136    /// Returns index of the timestamp key column.
137    pub fn timestamp_index(&self) -> Option<usize> {
138        self.timestamp_index
139    }
140
141    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
142        self.timestamp_index.map(|idx| &self.column_schemas[idx])
143    }
144
145    pub fn version(&self) -> u32 {
146        self.version
147    }
148
149    pub fn metadata(&self) -> &HashMap<String, String> {
150        &self.arrow_schema.metadata
151    }
152
153    /// Generate a new projected schema
154    ///
155    /// # Panic
156    ///
157    /// If the index out ouf bound
158    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
159        let mut column_schemas = Vec::with_capacity(indices.len());
160        let mut timestamp_index = None;
161        for index in indices {
162            if let Some(ts_index) = self.timestamp_index
163                && ts_index == *index
164            {
165                timestamp_index = Some(column_schemas.len());
166            }
167            column_schemas.push(self.column_schemas[*index].clone());
168        }
169        let arrow_schema = self
170            .arrow_schema
171            .project(indices)
172            .context(ProjectArrowSchemaSnafu)?;
173        let name_to_index = column_schemas
174            .iter()
175            .enumerate()
176            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
177            .collect();
178
179        Ok(Self {
180            column_schemas,
181            name_to_index,
182            arrow_schema: Arc::new(arrow_schema),
183            timestamp_index,
184            version: self.version,
185        })
186    }
187}
188
189#[derive(Default)]
190pub struct SchemaBuilder {
191    column_schemas: Vec<ColumnSchema>,
192    name_to_index: HashMap<String, usize>,
193    fields: Vec<Field>,
194    timestamp_index: Option<usize>,
195    version: u32,
196    metadata: HashMap<String, String>,
197}
198
199impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
200    type Error = Error;
201
202    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
203        SchemaBuilder::try_from_columns(column_schemas)
204    }
205}
206
207impl SchemaBuilder {
208    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
209        let FieldsAndIndices {
210            fields,
211            name_to_index,
212            timestamp_index,
213        } = collect_fields(&column_schemas)?;
214
215        Ok(Self {
216            column_schemas,
217            name_to_index,
218            fields,
219            timestamp_index,
220            ..Default::default()
221        })
222    }
223
224    pub fn version(mut self, version: u32) -> Self {
225        self.version = version;
226        self
227    }
228
229    /// Add key value pair to metadata.
230    ///
231    /// Old metadata with same key would be overwritten.
232    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
233        let _ = self.metadata.insert(key.into(), value.into());
234        self
235    }
236
237    pub fn build(mut self) -> Result<Schema> {
238        if let Some(timestamp_index) = self.timestamp_index {
239            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
240        }
241
242        self.metadata
243            .insert(VERSION_KEY.to_string(), self.version.to_string());
244
245        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
246
247        Ok(Schema {
248            column_schemas: self.column_schemas,
249            name_to_index: self.name_to_index,
250            arrow_schema: Arc::new(arrow_schema),
251            timestamp_index: self.timestamp_index,
252            version: self.version,
253        })
254    }
255}
256
257struct FieldsAndIndices {
258    fields: Vec<Field>,
259    name_to_index: HashMap<String, usize>,
260    timestamp_index: Option<usize>,
261}
262
263fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
264    let mut fields = Vec::with_capacity(column_schemas.len());
265    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
266    let mut timestamp_index = None;
267    for (index, column_schema) in column_schemas.iter().enumerate() {
268        if column_schema.is_time_index() && timestamp_index.is_none() {
269            timestamp_index = Some(index);
270        }
271        let mut field = Field::try_from(column_schema)?;
272
273        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
274        let extype = match column_schema.data_type {
275            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
276            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
277            _ => None,
278        };
279        if let Some(extype) = extype {
280            field
281                .metadata_mut()
282                .insert(TYPE_KEY.to_string(), extype.to_string());
283        }
284        fields.push(field);
285        ensure!(
286            name_to_index
287                .insert(column_schema.name.clone(), index)
288                .is_none(),
289            DuplicateColumnSnafu {
290                column: &column_schema.name,
291            }
292        );
293    }
294
295    Ok(FieldsAndIndices {
296        fields,
297        name_to_index,
298        timestamp_index,
299    })
300}
301
302fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
303    ensure!(
304        timestamp_index < column_schemas.len(),
305        error::InvalidTimestampIndexSnafu {
306            index: timestamp_index,
307        }
308    );
309
310    let column_schema = &column_schemas[timestamp_index];
311    ensure!(
312        column_schema.data_type.is_timestamp(),
313        error::InvalidTimestampIndexSnafu {
314            index: timestamp_index,
315        }
316    );
317    ensure!(
318        column_schema.is_time_index(),
319        error::InvalidTimestampIndexSnafu {
320            index: timestamp_index,
321        }
322    );
323
324    Ok(())
325}
326
327pub type SchemaRef = Arc<Schema>;
328
329impl TryFrom<Arc<ArrowSchema>> for Schema {
330    type Error = Error;
331
332    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
333        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
334        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
335        for field in &arrow_schema.fields {
336            let column_schema = ColumnSchema::try_from(field.as_ref())?;
337            let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
338            column_schemas.push(column_schema);
339        }
340
341        let mut timestamp_index = None;
342        for (index, column_schema) in column_schemas.iter().enumerate() {
343            if column_schema.is_time_index() {
344                validate_timestamp_index(&column_schemas, index)?;
345                timestamp_index = Some(index);
346                break;
347            }
348        }
349
350        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
351
352        Ok(Self {
353            column_schemas,
354            name_to_index,
355            arrow_schema,
356            timestamp_index,
357            version,
358        })
359    }
360}
361
362impl TryFrom<ArrowSchema> for Schema {
363    type Error = Error;
364
365    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
366        let arrow_schema = Arc::new(arrow_schema);
367
368        Schema::try_from(arrow_schema)
369    }
370}
371
372impl TryFrom<DFSchemaRef> for Schema {
373    type Error = Error;
374
375    fn try_from(value: DFSchemaRef) -> Result<Self> {
376        value.inner().clone().try_into()
377    }
378}
379
380fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
381    if let Some(value) = metadata.get(key) {
382        let version = value
383            .parse()
384            .context(error::ParseSchemaVersionSnafu { value })?;
385
386        Ok(version)
387    } else {
388        Ok(Schema::INITIAL_VERSION)
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::data_type::ConcreteDataType;
396
397    #[test]
398    fn test_build_empty_schema() {
399        let schema = SchemaBuilder::default().build().unwrap();
400        assert_eq!(0, schema.num_columns());
401        assert!(schema.is_empty());
402    }
403
404    #[test]
405    fn test_schema_no_timestamp() {
406        let column_schemas = vec![
407            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
408            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
409        ];
410        let schema = Schema::new(column_schemas.clone());
411
412        assert_eq!(2, schema.num_columns());
413        assert!(!schema.is_empty());
414        assert!(schema.timestamp_index().is_none());
415        assert!(schema.timestamp_column().is_none());
416        assert_eq!(Schema::INITIAL_VERSION, schema.version());
417
418        for column_schema in &column_schemas {
419            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
420            assert_eq!(column_schema, found);
421        }
422        assert!(schema.column_schema_by_name("col3").is_none());
423
424        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
425
426        assert_eq!(schema, new_schema);
427        assert_eq!(column_schemas, schema.column_schemas());
428    }
429
430    #[test]
431    fn test_schema_duplicate_column() {
432        let column_schemas = vec![
433            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
434            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
435        ];
436        let err = Schema::try_new(column_schemas).unwrap_err();
437
438        assert!(
439            matches!(err, Error::DuplicateColumn { .. }),
440            "expect DuplicateColumn, found {}",
441            err
442        );
443    }
444
445    #[test]
446    fn test_metadata() {
447        let column_schemas = vec![ColumnSchema::new(
448            "col1",
449            ConcreteDataType::int32_datatype(),
450            false,
451        )];
452        let schema = SchemaBuilder::try_from(column_schemas)
453            .unwrap()
454            .add_metadata("k1", "v1")
455            .build()
456            .unwrap();
457
458        assert_eq!("v1", schema.metadata().get("k1").unwrap());
459    }
460
461    #[test]
462    fn test_schema_with_timestamp() {
463        let column_schemas = vec![
464            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
465            ColumnSchema::new(
466                "ts",
467                ConcreteDataType::timestamp_millisecond_datatype(),
468                false,
469            )
470            .with_time_index(true),
471        ];
472        let schema = SchemaBuilder::try_from(column_schemas.clone())
473            .unwrap()
474            .version(123)
475            .build()
476            .unwrap();
477
478        assert_eq!(1, schema.timestamp_index().unwrap());
479        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
480        assert_eq!(123, schema.version());
481
482        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
483        assert_eq!(1, schema.timestamp_index().unwrap());
484        assert_eq!(schema, new_schema);
485    }
486
487    #[test]
488    fn test_schema_wrong_timestamp() {
489        let column_schemas = vec![
490            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
491                .with_time_index(true),
492            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
493        ];
494        assert!(
495            SchemaBuilder::try_from(column_schemas)
496                .unwrap()
497                .build()
498                .is_err()
499        );
500
501        let column_schemas = vec![
502            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
503            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
504                .with_time_index(true),
505        ];
506
507        assert!(
508            SchemaBuilder::try_from(column_schemas)
509                .unwrap()
510                .build()
511                .is_err()
512        );
513    }
514}