datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17
18use std::collections::HashMap;
19use std::fmt;
20use std::sync::Arc;
21
22use arrow::datatypes::{Field, Schema as ArrowSchema};
23use datafusion_common::DFSchemaRef;
24use serde::{Deserialize, Deserializer, Serialize};
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30    COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35    COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36    COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37    ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38    INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39    TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40    VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43
44/// Key used to store version number of the schema in metadata.
45pub const VERSION_KEY: &str = "greptime:version";
46/// Key used to store actual column type in field metadata.
47pub const TYPE_KEY: &str = "greptime:type";
48
49/// A common schema, should be immutable.
50#[derive(Clone, PartialEq, Eq, Serialize)]
51pub struct Schema {
52    column_schemas: Vec<ColumnSchema>,
53    #[serde(skip_serializing)]
54    name_to_index: HashMap<String, usize>,
55    #[serde(skip_serializing)]
56    arrow_schema: Arc<ArrowSchema>,
57    /// Index of the timestamp key column.
58    ///
59    /// Timestamp key column is the column holds the timestamp and forms part of
60    /// the primary key. None means there is no timestamp key column.
61    #[serde(skip_serializing)]
62    timestamp_index: Option<usize>,
63    /// Version of the schema.
64    ///
65    /// Initial value is zero. The version should bump after altering schema.
66    version: u32,
67}
68
69impl<'de> Deserialize<'de> for Schema {
70    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
71    where
72        D: Deserializer<'de>,
73    {
74        use serde::de::Error;
75
76        #[derive(Deserialize)]
77        struct RawSchema {
78            column_schemas: Vec<ColumnSchema>,
79            version: u32,
80        }
81        let raw = RawSchema::deserialize(deserializer)?;
82
83        SchemaBuilder::try_from(raw.column_schemas)
84            .and_then(|x| x.version(raw.version).build())
85            .map_err(D::Error::custom)
86    }
87}
88
89impl fmt::Debug for Schema {
90    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91        f.debug_struct("Schema")
92            .field("column_schemas", &self.column_schemas)
93            .field("name_to_index", &self.name_to_index)
94            .field("timestamp_index", &self.timestamp_index)
95            .field("version", &self.version)
96            .finish()
97    }
98}
99
100impl Schema {
101    /// Initial version of the schema.
102    pub const INITIAL_VERSION: u32 = 0;
103
104    /// Create a schema from a vector of [ColumnSchema].
105    ///
106    /// # Panics
107    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
108    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
109        // Builder won't fail in this case
110        Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
111    }
112
113    pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
114        SchemaBuilder::try_from(column_schemas)
115            .unwrap()
116            .version(version)
117            .build()
118            .unwrap()
119    }
120
121    /// Try to Create a schema from a vector of [ColumnSchema].
122    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
123        SchemaBuilder::try_from(column_schemas)?.build()
124    }
125
126    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
127        &self.arrow_schema
128    }
129
130    pub fn column_schemas(&self) -> &[ColumnSchema] {
131        &self.column_schemas
132    }
133
134    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
135        self.name_to_index
136            .get(name)
137            .map(|index| &self.column_schemas[*index])
138    }
139
140    /// Retrieve the column's name by index
141    /// # Panics
142    /// This method **may** panic if the index is out of range of column schemas.
143    pub fn column_name_by_index(&self, idx: usize) -> &str {
144        &self.column_schemas[idx].name
145    }
146
147    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
148        self.name_to_index.get(name).copied()
149    }
150
151    pub fn contains_column(&self, name: &str) -> bool {
152        self.name_to_index.contains_key(name)
153    }
154
155    pub fn num_columns(&self) -> usize {
156        self.column_schemas.len()
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.column_schemas.is_empty()
161    }
162
163    /// Returns index of the timestamp key column.
164    pub fn timestamp_index(&self) -> Option<usize> {
165        self.timestamp_index
166    }
167
168    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
169        self.timestamp_index.map(|idx| &self.column_schemas[idx])
170    }
171
172    pub fn version(&self) -> u32 {
173        self.version
174    }
175
176    pub fn metadata(&self) -> &HashMap<String, String> {
177        &self.arrow_schema.metadata
178    }
179
180    /// Generate a new projected schema
181    ///
182    /// # Panic
183    ///
184    /// If the index out ouf bound
185    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
186        let mut column_schemas = Vec::with_capacity(indices.len());
187        let mut timestamp_index = None;
188        for index in indices {
189            if let Some(ts_index) = self.timestamp_index
190                && ts_index == *index
191            {
192                timestamp_index = Some(column_schemas.len());
193            }
194            column_schemas.push(self.column_schemas[*index].clone());
195        }
196        let arrow_schema = self
197            .arrow_schema
198            .project(indices)
199            .context(ProjectArrowSchemaSnafu)?;
200        let name_to_index = column_schemas
201            .iter()
202            .enumerate()
203            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
204            .collect();
205
206        Ok(Self {
207            column_schemas,
208            name_to_index,
209            arrow_schema: Arc::new(arrow_schema),
210            timestamp_index,
211            version: self.version,
212        })
213    }
214}
215
216#[derive(Default)]
217pub struct SchemaBuilder {
218    column_schemas: Vec<ColumnSchema>,
219    name_to_index: HashMap<String, usize>,
220    fields: Vec<Field>,
221    timestamp_index: Option<usize>,
222    version: u32,
223    metadata: HashMap<String, String>,
224}
225
226impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
227    type Error = Error;
228
229    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
230        SchemaBuilder::try_from_columns(column_schemas)
231    }
232}
233
234impl SchemaBuilder {
235    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
236        let FieldsAndIndices {
237            fields,
238            name_to_index,
239            timestamp_index,
240        } = collect_fields(&column_schemas)?;
241
242        Ok(Self {
243            column_schemas,
244            name_to_index,
245            fields,
246            timestamp_index,
247            ..Default::default()
248        })
249    }
250
251    pub fn version(mut self, version: u32) -> Self {
252        self.version = version;
253        self
254    }
255
256    /// Add key value pair to metadata.
257    ///
258    /// Old metadata with same key would be overwritten.
259    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
260        let _ = self.metadata.insert(key.into(), value.into());
261        self
262    }
263
264    pub fn build(mut self) -> Result<Schema> {
265        if let Some(timestamp_index) = self.timestamp_index {
266            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
267        }
268
269        self.metadata
270            .insert(VERSION_KEY.to_string(), self.version.to_string());
271
272        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
273
274        Ok(Schema {
275            column_schemas: self.column_schemas,
276            name_to_index: self.name_to_index,
277            arrow_schema: Arc::new(arrow_schema),
278            timestamp_index: self.timestamp_index,
279            version: self.version,
280        })
281    }
282}
283
284struct FieldsAndIndices {
285    fields: Vec<Field>,
286    name_to_index: HashMap<String, usize>,
287    timestamp_index: Option<usize>,
288}
289
290fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
291    let mut fields = Vec::with_capacity(column_schemas.len());
292    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
293    let mut timestamp_index = None;
294    for (index, column_schema) in column_schemas.iter().enumerate() {
295        if column_schema.is_time_index() && timestamp_index.is_none() {
296            timestamp_index = Some(index);
297        }
298        let mut field = Field::try_from(column_schema)?;
299
300        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
301        let extype = match column_schema.data_type {
302            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
303            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
304            _ => None,
305        };
306        if let Some(extype) = extype {
307            field
308                .metadata_mut()
309                .insert(TYPE_KEY.to_string(), extype.to_string());
310        }
311        fields.push(field);
312        ensure!(
313            name_to_index
314                .insert(column_schema.name.clone(), index)
315                .is_none(),
316            DuplicateColumnSnafu {
317                column: &column_schema.name,
318            }
319        );
320    }
321
322    Ok(FieldsAndIndices {
323        fields,
324        name_to_index,
325        timestamp_index,
326    })
327}
328
329fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
330    ensure!(
331        timestamp_index < column_schemas.len(),
332        error::InvalidTimestampIndexSnafu {
333            index: timestamp_index,
334        }
335    );
336
337    let column_schema = &column_schemas[timestamp_index];
338    ensure!(
339        column_schema.data_type.is_timestamp(),
340        error::InvalidTimestampIndexSnafu {
341            index: timestamp_index,
342        }
343    );
344    ensure!(
345        column_schema.is_time_index(),
346        error::InvalidTimestampIndexSnafu {
347            index: timestamp_index,
348        }
349    );
350
351    Ok(())
352}
353
354pub type SchemaRef = Arc<Schema>;
355
356impl TryFrom<Arc<ArrowSchema>> for Schema {
357    type Error = Error;
358
359    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
360        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
361        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
362        for field in &arrow_schema.fields {
363            let column_schema = ColumnSchema::try_from(field.as_ref())?;
364            let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
365            column_schemas.push(column_schema);
366        }
367
368        let mut timestamp_index = None;
369        for (index, column_schema) in column_schemas.iter().enumerate() {
370            if column_schema.is_time_index() {
371                validate_timestamp_index(&column_schemas, index)?;
372                timestamp_index = Some(index);
373                break;
374            }
375        }
376
377        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
378
379        Ok(Self {
380            column_schemas,
381            name_to_index,
382            arrow_schema,
383            timestamp_index,
384            version,
385        })
386    }
387}
388
389impl TryFrom<ArrowSchema> for Schema {
390    type Error = Error;
391
392    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
393        let arrow_schema = Arc::new(arrow_schema);
394
395        Schema::try_from(arrow_schema)
396    }
397}
398
399impl TryFrom<DFSchemaRef> for Schema {
400    type Error = Error;
401
402    fn try_from(value: DFSchemaRef) -> Result<Self> {
403        value.inner().clone().try_into()
404    }
405}
406
407fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
408    if let Some(value) = metadata.get(key) {
409        let version = value
410            .parse()
411            .context(error::ParseSchemaVersionSnafu { value })?;
412
413        Ok(version)
414    } else {
415        Ok(Schema::INITIAL_VERSION)
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use crate::data_type::ConcreteDataType;
423
424    #[test]
425    fn test_build_empty_schema() {
426        let schema = SchemaBuilder::default().build().unwrap();
427        assert_eq!(0, schema.num_columns());
428        assert!(schema.is_empty());
429    }
430
431    #[test]
432    fn test_schema_no_timestamp() {
433        let column_schemas = vec![
434            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
435            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
436        ];
437        let schema = Schema::new(column_schemas.clone());
438
439        assert_eq!(2, schema.num_columns());
440        assert!(!schema.is_empty());
441        assert!(schema.timestamp_index().is_none());
442        assert!(schema.timestamp_column().is_none());
443        assert_eq!(Schema::INITIAL_VERSION, schema.version());
444
445        for column_schema in &column_schemas {
446            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
447            assert_eq!(column_schema, found);
448        }
449        assert!(schema.column_schema_by_name("col3").is_none());
450
451        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
452
453        assert_eq!(schema, new_schema);
454        assert_eq!(column_schemas, schema.column_schemas());
455    }
456
457    #[test]
458    fn test_schema_duplicate_column() {
459        let column_schemas = vec![
460            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
461            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
462        ];
463        let err = Schema::try_new(column_schemas).unwrap_err();
464
465        assert!(
466            matches!(err, Error::DuplicateColumn { .. }),
467            "expect DuplicateColumn, found {}",
468            err
469        );
470    }
471
472    #[test]
473    fn test_metadata() {
474        let column_schemas = vec![ColumnSchema::new(
475            "col1",
476            ConcreteDataType::int32_datatype(),
477            false,
478        )];
479        let schema = SchemaBuilder::try_from(column_schemas)
480            .unwrap()
481            .add_metadata("k1", "v1")
482            .build()
483            .unwrap();
484
485        assert_eq!("v1", schema.metadata().get("k1").unwrap());
486    }
487
488    #[test]
489    fn test_schema_with_timestamp() {
490        let column_schemas = vec![
491            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
492            ColumnSchema::new(
493                "ts",
494                ConcreteDataType::timestamp_millisecond_datatype(),
495                false,
496            )
497            .with_time_index(true),
498        ];
499        let schema = SchemaBuilder::try_from(column_schemas.clone())
500            .unwrap()
501            .version(123)
502            .build()
503            .unwrap();
504
505        assert_eq!(1, schema.timestamp_index().unwrap());
506        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
507        assert_eq!(123, schema.version());
508
509        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
510        assert_eq!(1, schema.timestamp_index().unwrap());
511        assert_eq!(schema, new_schema);
512    }
513
514    #[test]
515    fn test_schema_wrong_timestamp() {
516        let column_schemas = vec![
517            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
518                .with_time_index(true),
519            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
520        ];
521        assert!(
522            SchemaBuilder::try_from(column_schemas)
523                .unwrap()
524                .build()
525                .is_err()
526        );
527
528        let column_schemas = vec![
529            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
530            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
531                .with_time_index(true),
532        ];
533
534        assert!(
535            SchemaBuilder::try_from(column_schemas)
536                .unwrap()
537                .build()
538                .is_err()
539        );
540    }
541}