Skip to main content

datatypes/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod column_schema;
16pub mod constraint;
17pub mod ext;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::{fmt, mem};
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use serde::{Deserialize, Deserializer, Serialize};
26use snafu::{ResultExt, ensure};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31    COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
32    COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
33    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
34    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
35    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
36    COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
37    COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
38    ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
39    INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
40    TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
41    VectorIndexOptions,
42};
43pub use crate::schema::constraint::ColumnDefaultConstraint;
44
45/// Key used to store version number of the schema in metadata.
46pub const VERSION_KEY: &str = "greptime:version";
47/// Key used to store actual column type in field metadata.
48pub const TYPE_KEY: &str = "greptime:type";
49
50/// A common schema, should be immutable.
51#[derive(Clone, PartialEq, Eq, Serialize)]
52pub struct Schema {
53    column_schemas: Vec<ColumnSchema>,
54    #[serde(skip_serializing)]
55    name_to_index: HashMap<String, usize>,
56    #[serde(skip_serializing)]
57    arrow_schema: Arc<ArrowSchema>,
58    /// Index of the timestamp key column.
59    ///
60    /// Timestamp key column is the column holds the timestamp and forms part of
61    /// the primary key. None means there is no timestamp key column.
62    #[serde(skip_serializing)]
63    timestamp_index: Option<usize>,
64    /// Version of the schema.
65    ///
66    /// Initial value is zero. The version should bump after altering schema.
67    version: u32,
68}
69
70impl<'de> Deserialize<'de> for Schema {
71    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
72    where
73        D: Deserializer<'de>,
74    {
75        use serde::de::Error;
76
77        #[derive(Deserialize)]
78        struct RawSchema {
79            column_schemas: Vec<ColumnSchema>,
80            version: u32,
81        }
82        let raw = RawSchema::deserialize(deserializer)?;
83
84        SchemaBuilder::try_from(raw.column_schemas)
85            .and_then(|x| x.version(raw.version).build())
86            .map_err(D::Error::custom)
87    }
88}
89
90impl fmt::Debug for Schema {
91    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
92        f.debug_struct("Schema")
93            .field("column_schemas", &self.column_schemas)
94            .field("name_to_index", &self.name_to_index)
95            .field("timestamp_index", &self.timestamp_index)
96            .field("version", &self.version)
97            .finish()
98    }
99}
100
101impl Schema {
102    /// Initial version of the schema.
103    pub const INITIAL_VERSION: u32 = 0;
104
105    /// Create a schema from a vector of [ColumnSchema].
106    ///
107    /// # Panics
108    /// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
109    pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
110        // Builder won't fail in this case
111        Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
112    }
113
114    pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
115        SchemaBuilder::try_from(column_schemas)
116            .unwrap()
117            .version(version)
118            .build()
119            .unwrap()
120    }
121
122    /// Try to Create a schema from a vector of [ColumnSchema].
123    pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
124        SchemaBuilder::try_from(column_schemas)?.build()
125    }
126
127    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
128        &self.arrow_schema
129    }
130
131    pub fn column_schemas(&self) -> &[ColumnSchema] {
132        &self.column_schemas
133    }
134
135    pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
136        self.name_to_index
137            .get(name)
138            .map(|index| &self.column_schemas[*index])
139    }
140
141    /// Retrieve the column's name by index
142    /// # Panics
143    /// This method **may** panic if the index is out of range of column schemas.
144    pub fn column_name_by_index(&self, idx: usize) -> &str {
145        &self.column_schemas[idx].name
146    }
147
148    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
149        self.name_to_index.get(name).copied()
150    }
151
152    pub fn contains_column(&self, name: &str) -> bool {
153        self.name_to_index.contains_key(name)
154    }
155
156    pub fn num_columns(&self) -> usize {
157        self.column_schemas.len()
158    }
159
160    pub fn is_empty(&self) -> bool {
161        self.column_schemas.is_empty()
162    }
163
164    /// Returns index of the timestamp key column.
165    pub fn timestamp_index(&self) -> Option<usize> {
166        self.timestamp_index
167    }
168
169    pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
170        self.timestamp_index.map(|idx| &self.column_schemas[idx])
171    }
172
173    pub fn version(&self) -> u32 {
174        self.version
175    }
176
177    pub fn metadata(&self) -> &HashMap<String, String> {
178        &self.arrow_schema.metadata
179    }
180
181    /// Returns the estimated memory footprint of this schema.
182    pub fn estimated_size(&self) -> usize {
183        mem::size_of_val(self)
184            + mem::size_of::<ColumnSchema>() * self.column_schemas.capacity()
185            + self
186                .column_schemas
187                .iter()
188                .map(|column_schema| {
189                    column_schema.estimated_size() - mem::size_of::<ColumnSchema>()
190                })
191                .sum::<usize>()
192            + mem::size_of::<(String, usize)>() * self.name_to_index.capacity()
193            + self
194                .name_to_index
195                .keys()
196                .map(|name| name.capacity())
197                .sum::<usize>()
198            + arrow_schema_size(self.arrow_schema.as_ref())
199    }
200
201    /// Generate a new projected schema
202    ///
203    /// # Panic
204    ///
205    /// If the index out ouf bound
206    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
207        let mut column_schemas = Vec::with_capacity(indices.len());
208        let mut timestamp_index = None;
209        for index in indices {
210            if let Some(ts_index) = self.timestamp_index
211                && ts_index == *index
212            {
213                timestamp_index = Some(column_schemas.len());
214            }
215            column_schemas.push(self.column_schemas[*index].clone());
216        }
217        let arrow_schema = self
218            .arrow_schema
219            .project(indices)
220            .context(ProjectArrowSchemaSnafu)?;
221        let name_to_index = column_schemas
222            .iter()
223            .enumerate()
224            .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
225            .collect();
226
227        Ok(Self {
228            column_schemas,
229            name_to_index,
230            arrow_schema: Arc::new(arrow_schema),
231            timestamp_index,
232            version: self.version,
233        })
234    }
235}
236
237fn arrow_schema_size(schema: &ArrowSchema) -> usize {
238    mem::size_of_val(schema)
239        + schema.fields.size()
240        + mem::size_of::<(String, String)>() * schema.metadata.capacity()
241        + schema
242            .metadata
243            .iter()
244            .map(|(key, value)| key.capacity() + value.capacity())
245            .sum::<usize>()
246}
247
248#[derive(Default)]
249pub struct SchemaBuilder {
250    column_schemas: Vec<ColumnSchema>,
251    name_to_index: HashMap<String, usize>,
252    fields: Vec<Field>,
253    timestamp_index: Option<usize>,
254    version: u32,
255    metadata: HashMap<String, String>,
256}
257
258impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
259    type Error = Error;
260
261    fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
262        SchemaBuilder::try_from_columns(column_schemas)
263    }
264}
265
266impl SchemaBuilder {
267    pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
268        let FieldsAndIndices {
269            fields,
270            name_to_index,
271            timestamp_index,
272        } = collect_fields(&column_schemas)?;
273
274        Ok(Self {
275            column_schemas,
276            name_to_index,
277            fields,
278            timestamp_index,
279            ..Default::default()
280        })
281    }
282
283    pub fn version(mut self, version: u32) -> Self {
284        self.version = version;
285        self
286    }
287
288    /// Add key value pair to metadata.
289    ///
290    /// Old metadata with same key would be overwritten.
291    pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
292        let _ = self.metadata.insert(key.into(), value.into());
293        self
294    }
295
296    pub fn build(mut self) -> Result<Schema> {
297        if let Some(timestamp_index) = self.timestamp_index {
298            validate_timestamp_index(&self.column_schemas, timestamp_index)?;
299        }
300
301        self.metadata
302            .insert(VERSION_KEY.to_string(), self.version.to_string());
303
304        let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
305
306        Ok(Schema {
307            column_schemas: self.column_schemas,
308            name_to_index: self.name_to_index,
309            arrow_schema: Arc::new(arrow_schema),
310            timestamp_index: self.timestamp_index,
311            version: self.version,
312        })
313    }
314}
315
316struct FieldsAndIndices {
317    fields: Vec<Field>,
318    name_to_index: HashMap<String, usize>,
319    timestamp_index: Option<usize>,
320}
321
322fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
323    let mut fields = Vec::with_capacity(column_schemas.len());
324    let mut name_to_index = HashMap::with_capacity(column_schemas.len());
325    let mut timestamp_index = None;
326    for (index, column_schema) in column_schemas.iter().enumerate() {
327        if column_schema.is_time_index() && timestamp_index.is_none() {
328            timestamp_index = Some(index);
329        }
330        let mut field = Field::try_from(column_schema)?;
331
332        // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it
333        let extype = match column_schema.data_type {
334            ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
335            ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
336            _ => None,
337        };
338        if let Some(extype) = extype {
339            field
340                .metadata_mut()
341                .insert(TYPE_KEY.to_string(), extype.to_string());
342        }
343        fields.push(field);
344        ensure!(
345            name_to_index
346                .insert(column_schema.name.clone(), index)
347                .is_none(),
348            DuplicateColumnSnafu {
349                column: &column_schema.name,
350            }
351        );
352    }
353
354    Ok(FieldsAndIndices {
355        fields,
356        name_to_index,
357        timestamp_index,
358    })
359}
360
361fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
362    ensure!(
363        timestamp_index < column_schemas.len(),
364        error::InvalidTimestampIndexSnafu {
365            index: timestamp_index,
366        }
367    );
368
369    let column_schema = &column_schemas[timestamp_index];
370    ensure!(
371        column_schema.data_type.is_timestamp(),
372        error::InvalidTimestampIndexSnafu {
373            index: timestamp_index,
374        }
375    );
376    ensure!(
377        column_schema.is_time_index(),
378        error::InvalidTimestampIndexSnafu {
379            index: timestamp_index,
380        }
381    );
382
383    Ok(())
384}
385
386pub type SchemaRef = Arc<Schema>;
387
388impl TryFrom<Arc<ArrowSchema>> for Schema {
389    type Error = Error;
390
391    fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
392        let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
393        let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
394        for field in &arrow_schema.fields {
395            let column_schema = ColumnSchema::try_from(field.as_ref())?;
396            let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
397            column_schemas.push(column_schema);
398        }
399
400        let mut timestamp_index = None;
401        for (index, column_schema) in column_schemas.iter().enumerate() {
402            if column_schema.is_time_index() {
403                validate_timestamp_index(&column_schemas, index)?;
404                timestamp_index = Some(index);
405                break;
406            }
407        }
408
409        let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
410
411        Ok(Self {
412            column_schemas,
413            name_to_index,
414            arrow_schema,
415            timestamp_index,
416            version,
417        })
418    }
419}
420
421impl TryFrom<ArrowSchema> for Schema {
422    type Error = Error;
423
424    fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
425        let arrow_schema = Arc::new(arrow_schema);
426
427        Schema::try_from(arrow_schema)
428    }
429}
430
431impl TryFrom<DFSchemaRef> for Schema {
432    type Error = Error;
433
434    fn try_from(value: DFSchemaRef) -> Result<Self> {
435        value.inner().clone().try_into()
436    }
437}
438
439fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
440    if let Some(value) = metadata.get(key) {
441        let version = value
442            .parse()
443            .context(error::ParseSchemaVersionSnafu { value })?;
444
445        Ok(version)
446    } else {
447        Ok(Schema::INITIAL_VERSION)
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::data_type::ConcreteDataType;
455
456    #[test]
457    fn test_build_empty_schema() {
458        let schema = SchemaBuilder::default().build().unwrap();
459        assert_eq!(0, schema.num_columns());
460        assert!(schema.is_empty());
461    }
462
463    #[test]
464    fn test_schema_no_timestamp() {
465        let column_schemas = vec![
466            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
467            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
468        ];
469        let schema = Schema::new(column_schemas.clone());
470
471        assert_eq!(2, schema.num_columns());
472        assert!(!schema.is_empty());
473        assert!(schema.timestamp_index().is_none());
474        assert!(schema.timestamp_column().is_none());
475        assert_eq!(Schema::INITIAL_VERSION, schema.version());
476
477        for column_schema in &column_schemas {
478            let found = schema.column_schema_by_name(&column_schema.name).unwrap();
479            assert_eq!(column_schema, found);
480        }
481        assert!(schema.column_schema_by_name("col3").is_none());
482
483        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
484
485        assert_eq!(schema, new_schema);
486        assert_eq!(column_schemas, schema.column_schemas());
487    }
488
489    #[test]
490    fn test_schema_duplicate_column() {
491        let column_schemas = vec![
492            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
493            ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
494        ];
495        let err = Schema::try_new(column_schemas).unwrap_err();
496
497        assert!(
498            matches!(err, Error::DuplicateColumn { .. }),
499            "expect DuplicateColumn, found {}",
500            err
501        );
502    }
503
504    #[test]
505    fn test_metadata() {
506        let column_schemas = vec![ColumnSchema::new(
507            "col1",
508            ConcreteDataType::int32_datatype(),
509            false,
510        )];
511        let schema = SchemaBuilder::try_from(column_schemas)
512            .unwrap()
513            .add_metadata("k1", "v1")
514            .build()
515            .unwrap();
516
517        assert_eq!("v1", schema.metadata().get("k1").unwrap());
518    }
519
520    #[test]
521    fn test_schema_with_timestamp() {
522        let column_schemas = vec![
523            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
524            ColumnSchema::new(
525                "ts",
526                ConcreteDataType::timestamp_millisecond_datatype(),
527                false,
528            )
529            .with_time_index(true),
530        ];
531        let schema = SchemaBuilder::try_from(column_schemas.clone())
532            .unwrap()
533            .version(123)
534            .build()
535            .unwrap();
536
537        assert_eq!(1, schema.timestamp_index().unwrap());
538        assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
539        assert_eq!(123, schema.version());
540
541        let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
542        assert_eq!(1, schema.timestamp_index().unwrap());
543        assert_eq!(schema, new_schema);
544    }
545
546    #[test]
547    fn test_schema_wrong_timestamp() {
548        let column_schemas = vec![
549            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
550                .with_time_index(true),
551            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
552        ];
553        assert!(
554            SchemaBuilder::try_from(column_schemas)
555                .unwrap()
556                .build()
557                .is_err()
558        );
559
560        let column_schemas = vec![
561            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
562            ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
563                .with_time_index(true),
564        ];
565
566        assert!(
567            SchemaBuilder::try_from(column_schemas)
568                .unwrap()
569                .build()
570                .is_err()
571        );
572    }
573}