datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::{fmt, mem};
21
22use arrow::datatypes::{Field, Schema as ArrowSchema};
23use datafusion_common::DFSchemaRef;
24use serde::{Deserialize, Deserializer, Serialize};
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30    COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35    COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36    COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37    ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38    INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39    TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40    VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43
44/// Key used to store version number of the schema in metadata.
45pub const VERSION_KEY: &str = "greptime:version";
46/// Key used to store actual column type in field metadata.
47pub const TYPE_KEY: &str = "greptime:type";
48
49/// A common schema, should be immutable.
50#[derive(Clone, PartialEq, Eq, Serialize)]
51pub struct Schema {
52    column_schemas: Vec<ColumnSchema>,
53    #[serde(skip_serializing)]
54    name_to_index: HashMap<String, usize>,
55    #[serde(skip_serializing)]
56    arrow_schema: Arc<ArrowSchema>,
57    /// Index of the timestamp key column.
58    ///
59    /// Timestamp key column is the column holds the timestamp and forms part of
60    /// the primary key. None means there is no timestamp key column.
61    #[serde(skip_serializing)]
62    timestamp_index: Option<usize>,
63    /// Version of the schema.
64    ///
65    /// Initial value is zero. The version should bump after altering schema.
66    version: u32,
67}
68
69impl<'de> Deserialize<'de> for Schema {
70    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
71    where
72        D: Deserializer<'de>,
73    {
74        use serde::de::Error;
75
76        #[derive(Deserialize)]
77        struct RawSchema {
78            column_schemas: Vec<ColumnSchema>,
79            version: u32,
80        }
81        let raw = RawSchema::deserialize(deserializer)?;
82
83        SchemaBuilder::try_from(raw.column_schemas)
84            .and_then(|x| x.version(raw.version).build())
85            .map_err(D::Error::custom)
86    }
87}
88
89impl fmt::Debug for Schema {
90    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91        f.debug_struct("Schema")
92            .field("column_schemas", &self.column_schemas)
93            .field("name_to_index", &self.name_to_index)
94            .field("timestamp_index", &self.timestamp_index)
95            .field("version", &self.version)
96            .finish()
97    }
98}
99
100impl Schema {
101    /// Initial version of the schema.
102    pub const INITIAL_VERSION: u32 = 0;
103
104    /// Create a schema from a vector of [ColumnSchema].
105    ///
106    /// # Panics
107    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
108    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
109        // Builder won't fail in this case
110        Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
111    }
112
113    pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
114        SchemaBuilder::try_from(column_schemas)
115            .unwrap()
116            .version(version)
117            .build()
118            .unwrap()
119    }
120
121    /// Try to Create a schema from a vector of [ColumnSchema].
122    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
123        SchemaBuilder::try_from(column_schemas)?.build()
124    }
125
126    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
127        &self.arrow_schema
128    }
129
130    pub fn column_schemas(&self) -> &[ColumnSchema] {
131        &self.column_schemas
132    }
133
134    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
135        self.name_to_index
136            .get(name)
137            .map(|index| &self.column_schemas[*index])
138    }
139
140    /// Retrieve the column's name by index
141    /// # Panics
142    /// This method **may** panic if the index is out of range of column schemas.
143    pub fn column_name_by_index(&self, idx: usize) -> &str {
144        &self.column_schemas[idx].name
145    }
146
147    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
148        self.name_to_index.get(name).copied()
149    }
150
151    pub fn contains_column(&self, name: &str) -> bool {
152        self.name_to_index.contains_key(name)
153    }
154
155    pub fn num_columns(&self) -> usize {
156        self.column_schemas.len()
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.column_schemas.is_empty()
161    }
162
163    /// Returns index of the timestamp key column.
164    pub fn timestamp_index(&self) -> Option<usize> {
165        self.timestamp_index
166    }
167
168    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
169        self.timestamp_index.map(|idx| &self.column_schemas[idx])
170    }
171
172    pub fn version(&self) -> u32 {
173        self.version
174    }
175
176    pub fn metadata(&self) -> &HashMap<String, String> {
177        &self.arrow_schema.metadata
178    }
179
180    /// Returns the estimated memory footprint of this schema.
181    pub fn estimated_size(&self) -> usize {
182        mem::size_of_val(self)
183            + mem::size_of::<ColumnSchema>() * self.column_schemas.capacity()
184            + self
185                .column_schemas
186                .iter()
187                .map(|column_schema| {
188                    column_schema.estimated_size() - mem::size_of::<ColumnSchema>()
189                })
190                .sum::<usize>()
191            + mem::size_of::<(String, usize)>() * self.name_to_index.capacity()
192            + self
193                .name_to_index
194                .keys()
195                .map(|name| name.capacity())
196                .sum::<usize>()
197            + arrow_schema_size(self.arrow_schema.as_ref())
198    }
199
200    /// Generate a new projected schema
201    ///
202    /// # Panic
203    ///
204    /// If the index out ouf bound
205    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
206        let mut column_schemas = Vec::with_capacity(indices.len());
207        let mut timestamp_index = None;
208        for index in indices {
209            if let Some(ts_index) = self.timestamp_index
210                && ts_index == *index
211            {
212                timestamp_index = Some(column_schemas.len());
213            }
214            column_schemas.push(self.column_schemas[*index].clone());
215        }
216        let arrow_schema = self
217            .arrow_schema
218            .project(indices)
219            .context(ProjectArrowSchemaSnafu)?;
220        let name_to_index = column_schemas
221            .iter()
222            .enumerate()
223            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
224            .collect();
225
226        Ok(Self {
227            column_schemas,
228            name_to_index,
229            arrow_schema: Arc::new(arrow_schema),
230            timestamp_index,
231            version: self.version,
232        })
233    }
234}
235
236fn arrow_schema_size(schema: &ArrowSchema) -> usize {
237    mem::size_of_val(schema)
238        + schema.fields.size()
239        + mem::size_of::<(String, String)>() * schema.metadata.capacity()
240        + schema
241            .metadata
242            .iter()
243            .map(|(key, value)| key.capacity() + value.capacity())
244            .sum::<usize>()
245}
246
247#[derive(Default)]
248pub struct SchemaBuilder {
249    column_schemas: Vec<ColumnSchema>,
250    name_to_index: HashMap<String, usize>,
251    fields: Vec<Field>,
252    timestamp_index: Option<usize>,
253    version: u32,
254    metadata: HashMap<String, String>,
255}
256
257impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
258    type Error = Error;
259
260    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
261        SchemaBuilder::try_from_columns(column_schemas)
262    }
263}
264
265impl SchemaBuilder {
266    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
267        let FieldsAndIndices {
268            fields,
269            name_to_index,
270            timestamp_index,
271        } = collect_fields(&column_schemas)?;
272
273        Ok(Self {
274            column_schemas,
275            name_to_index,
276            fields,
277            timestamp_index,
278            ..Default::default()
279        })
280    }
281
282    pub fn version(mut self, version: u32) -> Self {
283        self.version = version;
284        self
285    }
286
287    /// Add key value pair to metadata.
288    ///
289    /// Old metadata with same key would be overwritten.
290    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
291        let _ = self.metadata.insert(key.into(), value.into());
292        self
293    }
294
295    pub fn build(mut self) -> Result<Schema> {
296        if let Some(timestamp_index) = self.timestamp_index {
297            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
298        }
299
300        self.metadata
301            .insert(VERSION_KEY.to_string(), self.version.to_string());
302
303        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
304
305        Ok(Schema {
306            column_schemas: self.column_schemas,
307            name_to_index: self.name_to_index,
308            arrow_schema: Arc::new(arrow_schema),
309            timestamp_index: self.timestamp_index,
310            version: self.version,
311        })
312    }
313}
314
315struct FieldsAndIndices {
316    fields: Vec<Field>,
317    name_to_index: HashMap<String, usize>,
318    timestamp_index: Option<usize>,
319}
320
321fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
322    let mut fields = Vec::with_capacity(column_schemas.len());
323    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
324    let mut timestamp_index = None;
325    for (index, column_schema) in column_schemas.iter().enumerate() {
326        if column_schema.is_time_index() && timestamp_index.is_none() {
327            timestamp_index = Some(index);
328        }
329        let mut field = Field::try_from(column_schema)?;
330
331        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
332        let extype = match column_schema.data_type {
333            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
334            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
335            _ => None,
336        };
337        if let Some(extype) = extype {
338            field
339                .metadata_mut()
340                .insert(TYPE_KEY.to_string(), extype.to_string());
341        }
342        fields.push(field);
343        ensure!(
344            name_to_index
345                .insert(column_schema.name.clone(), index)
346                .is_none(),
347            DuplicateColumnSnafu {
348                column: &column_schema.name,
349            }
350        );
351    }
352
353    Ok(FieldsAndIndices {
354        fields,
355        name_to_index,
356        timestamp_index,
357    })
358}
359
360fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
361    ensure!(
362        timestamp_index < column_schemas.len(),
363        error::InvalidTimestampIndexSnafu {
364            index: timestamp_index,
365        }
366    );
367
368    let column_schema = &column_schemas[timestamp_index];
369    ensure!(
370        column_schema.data_type.is_timestamp(),
371        error::InvalidTimestampIndexSnafu {
372            index: timestamp_index,
373        }
374    );
375    ensure!(
376        column_schema.is_time_index(),
377        error::InvalidTimestampIndexSnafu {
378            index: timestamp_index,
379        }
380    );
381
382    Ok(())
383}
384
385pub type SchemaRef = Arc<Schema>;
386
387impl TryFrom<Arc<ArrowSchema>> for Schema {
388    type Error = Error;
389
390    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
391        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
392        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
393        for field in &arrow_schema.fields {
394            let column_schema = ColumnSchema::try_from(field.as_ref())?;
395            let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
396            column_schemas.push(column_schema);
397        }
398
399        let mut timestamp_index = None;
400        for (index, column_schema) in column_schemas.iter().enumerate() {
401            if column_schema.is_time_index() {
402                validate_timestamp_index(&column_schemas, index)?;
403                timestamp_index = Some(index);
404                break;
405            }
406        }
407
408        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
409
410        Ok(Self {
411            column_schemas,
412            name_to_index,
413            arrow_schema,
414            timestamp_index,
415            version,
416        })
417    }
418}
419
420impl TryFrom<ArrowSchema> for Schema {
421    type Error = Error;
422
423    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
424        let arrow_schema = Arc::new(arrow_schema);
425
426        Schema::try_from(arrow_schema)
427    }
428}
429
430impl TryFrom<DFSchemaRef> for Schema {
431    type Error = Error;
432
433    fn try_from(value: DFSchemaRef) -> Result<Self> {
434        value.inner().clone().try_into()
435    }
436}
437
438fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
439    if let Some(value) = metadata.get(key) {
440        let version = value
441            .parse()
442            .context(error::ParseSchemaVersionSnafu { value })?;
443
444        Ok(version)
445    } else {
446        Ok(Schema::INITIAL_VERSION)
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::data_type::ConcreteDataType;
454
455    #[test]
456    fn test_build_empty_schema() {
457        let schema = SchemaBuilder::default().build().unwrap();
458        assert_eq!(0, schema.num_columns());
459        assert!(schema.is_empty());
460    }
461
462    #[test]
463    fn test_schema_no_timestamp() {
464        let column_schemas = vec![
465            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
466            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
467        ];
468        let schema = Schema::new(column_schemas.clone());
469
470        assert_eq!(2, schema.num_columns());
471        assert!(!schema.is_empty());
472        assert!(schema.timestamp_index().is_none());
473        assert!(schema.timestamp_column().is_none());
474        assert_eq!(Schema::INITIAL_VERSION, schema.version());
475
476        for column_schema in &column_schemas {
477            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
478            assert_eq!(column_schema, found);
479        }
480        assert!(schema.column_schema_by_name("col3").is_none());
481
482        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
483
484        assert_eq!(schema, new_schema);
485        assert_eq!(column_schemas, schema.column_schemas());
486    }
487
488    #[test]
489    fn test_schema_duplicate_column() {
490        let column_schemas = vec![
491            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
492            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
493        ];
494        let err = Schema::try_new(column_schemas).unwrap_err();
495
496        assert!(
497            matches!(err, Error::DuplicateColumn { .. }),
498            "expect DuplicateColumn, found {}",
499            err
500        );
501    }
502
503    #[test]
504    fn test_metadata() {
505        let column_schemas = vec![ColumnSchema::new(
506            "col1",
507            ConcreteDataType::int32_datatype(),
508            false,
509        )];
510        let schema = SchemaBuilder::try_from(column_schemas)
511            .unwrap()
512            .add_metadata("k1", "v1")
513            .build()
514            .unwrap();
515
516        assert_eq!("v1", schema.metadata().get("k1").unwrap());
517    }
518
519    #[test]
520    fn test_schema_with_timestamp() {
521        let column_schemas = vec![
522            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
523            ColumnSchema::new(
524                "ts",
525                ConcreteDataType::timestamp_millisecond_datatype(),
526                false,
527            )
528            .with_time_index(true),
529        ];
530        let schema = SchemaBuilder::try_from(column_schemas.clone())
531            .unwrap()
532            .version(123)
533            .build()
534            .unwrap();
535
536        assert_eq!(1, schema.timestamp_index().unwrap());
537        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
538        assert_eq!(123, schema.version());
539
540        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
541        assert_eq!(1, schema.timestamp_index().unwrap());
542        assert_eq!(schema, new_schema);
543    }
544
545    #[test]
546    fn test_schema_wrong_timestamp() {
547        let column_schemas = vec![
548            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
549                .with_time_index(true),
550            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
551        ];
552        assert!(
553            SchemaBuilder::try_from(column_schemas)
554                .unwrap()
555                .build()
556                .is_err()
557        );
558
559        let column_schemas = vec![
560            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
561            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
562                .with_time_index(true),
563        ];
564
565        assert!(
566            SchemaBuilder::try_from(column_schemas)
567                .unwrap()
568                .build()
569                .is_err()
570        );
571    }
572}