datatypes/schema/
column_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18
19use arrow::datatypes::Field;
20use arrow_schema::extension::{
21    EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY, ExtensionType,
22};
23use serde::{Deserialize, Serialize};
24use snafu::{ResultExt, ensure};
25use sqlparser_derive::{Visit, VisitMut};
26
27use crate::data_type::{ConcreteDataType, DataType};
28use crate::error::{
29    self, ArrowMetadataSnafu, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result,
30};
31use crate::schema::TYPE_KEY;
32use crate::schema::constraint::ColumnDefaultConstraint;
33use crate::value::Value;
34use crate::vectors::VectorRef;
35
36pub type Metadata = HashMap<String, String>;
37
38/// Key used to store whether the column is time index in arrow field's metadata.
39pub const TIME_INDEX_KEY: &str = "greptime:time_index";
40pub const COMMENT_KEY: &str = "greptime:storage:comment";
41/// Key used to store default constraint in arrow field's metadata.
42const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
43/// Key used to store fulltext options in arrow field's metadata.
44pub const FULLTEXT_KEY: &str = "greptime:fulltext";
45/// Key used to store whether the column has inverted index in arrow field's metadata.
46pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
47/// Key used to store skip options in arrow field's metadata.
48pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
49
50/// Keys used in fulltext options
51pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
52pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
53pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
54pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
55pub const COLUMN_FULLTEXT_OPT_KEY_GRANULARITY: &str = "granularity";
56pub const COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
57
58/// Keys used in SKIPPING index options
59pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
60pub const COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
61pub const COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE: &str = "type";
62
63pub const DEFAULT_GRANULARITY: u32 = 10240;
64
65pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
66
67/// Schema of a column, used as an immutable struct.
68#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct ColumnSchema {
70    pub name: String,
71    pub data_type: ConcreteDataType,
72    is_nullable: bool,
73    is_time_index: bool,
74    default_constraint: Option<ColumnDefaultConstraint>,
75    metadata: Metadata,
76}
77
78impl fmt::Debug for ColumnSchema {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        write!(
81            f,
82            "{} {} {}",
83            self.name,
84            self.data_type,
85            if self.is_nullable { "null" } else { "not null" },
86        )?;
87
88        if self.is_time_index {
89            write!(f, " time_index")?;
90        }
91
92        // Add default constraint if present
93        if let Some(default_constraint) = &self.default_constraint {
94            write!(f, " default={:?}", default_constraint)?;
95        }
96
97        // Add metadata if present
98        if !self.metadata.is_empty() {
99            write!(f, " metadata={:?}", self.metadata)?;
100        }
101
102        Ok(())
103    }
104}
105
106impl ColumnSchema {
107    pub fn new<T: Into<String>>(
108        name: T,
109        data_type: ConcreteDataType,
110        is_nullable: bool,
111    ) -> ColumnSchema {
112        ColumnSchema {
113            name: name.into(),
114            data_type,
115            is_nullable,
116            is_time_index: false,
117            default_constraint: None,
118            metadata: Metadata::new(),
119        }
120    }
121
122    #[inline]
123    pub fn is_time_index(&self) -> bool {
124        self.is_time_index
125    }
126
127    #[inline]
128    pub fn is_nullable(&self) -> bool {
129        self.is_nullable
130    }
131
132    #[inline]
133    pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
134        self.default_constraint.as_ref()
135    }
136
137    /// Check if the default constraint is a impure function.
138    pub fn is_default_impure(&self) -> bool {
139        self.default_constraint
140            .as_ref()
141            .map(|c| c.is_function())
142            .unwrap_or(false)
143    }
144
145    #[inline]
146    pub fn metadata(&self) -> &Metadata {
147        &self.metadata
148    }
149
150    #[inline]
151    pub fn mut_metadata(&mut self) -> &mut Metadata {
152        &mut self.metadata
153    }
154
155    /// Retrieve the column comment
156    pub fn column_comment(&self) -> Option<&String> {
157        self.metadata.get(COMMENT_KEY)
158    }
159
160    pub fn with_time_index(mut self, is_time_index: bool) -> Self {
161        self.is_time_index = is_time_index;
162        if is_time_index {
163            let _ = self
164                .metadata
165                .insert(TIME_INDEX_KEY.to_string(), "true".to_string());
166        } else {
167            let _ = self.metadata.remove(TIME_INDEX_KEY);
168        }
169        self
170    }
171
172    /// Set the inverted index for the column.
173    /// Similar to [with_inverted_index] but don't take the ownership.
174    ///
175    /// [with_inverted_index]: Self::with_inverted_index
176    pub fn set_inverted_index(&mut self, value: bool) {
177        match value {
178            true => {
179                self.metadata
180                    .insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
181            }
182            false => {
183                self.metadata.remove(INVERTED_INDEX_KEY);
184            }
185        }
186    }
187
188    /// Set the inverted index for the column.
189    /// Similar to [set_inverted_index] but take the ownership and return a owned value.
190    ///
191    /// [set_inverted_index]: Self::set_inverted_index
192    pub fn with_inverted_index(mut self, value: bool) -> Self {
193        self.set_inverted_index(value);
194        self
195    }
196
197    pub fn is_inverted_indexed(&self) -> bool {
198        self.metadata
199            .get(INVERTED_INDEX_KEY)
200            .map(|v| v.eq_ignore_ascii_case("true"))
201            .unwrap_or(false)
202    }
203
204    pub fn is_fulltext_indexed(&self) -> bool {
205        self.fulltext_options()
206            .unwrap_or_default()
207            .map(|option| option.enable)
208            .unwrap_or_default()
209    }
210
211    pub fn is_skipping_indexed(&self) -> bool {
212        self.skipping_index_options().unwrap_or_default().is_some()
213    }
214
215    pub fn has_inverted_index_key(&self) -> bool {
216        self.metadata.contains_key(INVERTED_INDEX_KEY)
217    }
218
219    /// Set default constraint.
220    ///
221    /// If a default constraint exists for the column, this method will
222    /// validate it against the column's data type and nullability.
223    pub fn with_default_constraint(
224        mut self,
225        default_constraint: Option<ColumnDefaultConstraint>,
226    ) -> Result<Self> {
227        if let Some(constraint) = &default_constraint {
228            constraint.validate(&self.data_type, self.is_nullable)?;
229        }
230
231        self.default_constraint = default_constraint;
232        Ok(self)
233    }
234
235    /// Set the nullablity to `true` of the column.
236    /// Similar to [set_nullable] but take the ownership and return a owned value.
237    ///
238    /// [set_nullable]: Self::set_nullable
239    pub fn with_nullable_set(mut self) -> Self {
240        self.is_nullable = true;
241        self
242    }
243
244    /// Set the nullability to `true` of the column.
245    /// Similar to [with_nullable_set] but don't take the ownership
246    ///
247    /// [with_nullable_set]: Self::with_nullable_set
248    pub fn set_nullable(&mut self) {
249        self.is_nullable = true;
250    }
251
252    /// Set the `is_time_index` to `true` of the column.
253    /// Similar to [with_time_index] but don't take the ownership.
254    ///
255    /// [with_time_index]: Self::with_time_index
256    pub fn set_time_index(&mut self) {
257        self.is_time_index = true;
258    }
259
260    /// Creates a new [`ColumnSchema`] with given metadata.
261    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
262        self.metadata = metadata;
263        self
264    }
265
266    /// Creates a vector with default value for this column.
267    ///
268    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
269    pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
270        match &self.default_constraint {
271            Some(c) => c
272                .create_default_vector(&self.data_type, self.is_nullable, num_rows)
273                .map(Some),
274            None => {
275                if self.is_nullable {
276                    // No default constraint, use null as default value.
277                    // TODO(yingwen): Use NullVector once it supports setting logical type.
278                    ColumnDefaultConstraint::null_value()
279                        .create_default_vector(&self.data_type, self.is_nullable, num_rows)
280                        .map(Some)
281                } else {
282                    Ok(None)
283                }
284            }
285        }
286    }
287
288    /// Creates a vector for padding.
289    ///
290    /// This method always returns a vector since it uses [DataType::default_value]
291    /// to fill the vector. Callers should only use the created vector for padding
292    /// and never read its content.
293    pub fn create_default_vector_for_padding(&self, num_rows: usize) -> VectorRef {
294        let padding_value = if self.is_nullable {
295            Value::Null
296        } else {
297            // If the column is not null, use the data type's default value as it is
298            // more efficient to acquire.
299            self.data_type.default_value()
300        };
301        let value_ref = padding_value.as_value_ref();
302        let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
303        for _ in 0..num_rows {
304            mutable_vector.push_value_ref(&value_ref);
305        }
306        mutable_vector.to_vector()
307    }
308
309    /// Creates a default value for this column.
310    ///
311    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
312    pub fn create_default(&self) -> Result<Option<Value>> {
313        match &self.default_constraint {
314            Some(c) => c
315                .create_default(&self.data_type, self.is_nullable)
316                .map(Some),
317            None => {
318                if self.is_nullable {
319                    // No default constraint, use null as default value.
320                    ColumnDefaultConstraint::null_value()
321                        .create_default(&self.data_type, self.is_nullable)
322                        .map(Some)
323                } else {
324                    Ok(None)
325                }
326            }
327        }
328    }
329
330    /// Creates an impure default value for this column, only if it have a impure default constraint.
331    /// Otherwise, returns `Ok(None)`.
332    pub fn create_impure_default(&self) -> Result<Option<Value>> {
333        match &self.default_constraint {
334            Some(c) => c.create_impure_default(&self.data_type),
335            None => Ok(None),
336        }
337    }
338
339    /// Retrieves the fulltext options for the column.
340    pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
341        match self.metadata.get(FULLTEXT_KEY) {
342            None => Ok(None),
343            Some(json) => {
344                let options =
345                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
346                Ok(Some(options))
347            }
348        }
349    }
350
351    pub fn with_fulltext_options(mut self, options: FulltextOptions) -> Result<Self> {
352        self.metadata.insert(
353            FULLTEXT_KEY.to_string(),
354            serde_json::to_string(&options).context(error::SerializeSnafu)?,
355        );
356        Ok(self)
357    }
358
359    pub fn set_fulltext_options(&mut self, options: &FulltextOptions) -> Result<()> {
360        self.metadata.insert(
361            FULLTEXT_KEY.to_string(),
362            serde_json::to_string(options).context(error::SerializeSnafu)?,
363        );
364        Ok(())
365    }
366
367    /// Retrieves the skipping index options for the column.
368    pub fn skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
369        match self.metadata.get(SKIPPING_INDEX_KEY) {
370            None => Ok(None),
371            Some(json) => {
372                let options =
373                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
374                Ok(Some(options))
375            }
376        }
377    }
378
379    pub fn with_skipping_options(mut self, options: SkippingIndexOptions) -> Result<Self> {
380        self.metadata.insert(
381            SKIPPING_INDEX_KEY.to_string(),
382            serde_json::to_string(&options).context(error::SerializeSnafu)?,
383        );
384        Ok(self)
385    }
386
387    pub fn set_skipping_options(&mut self, options: &SkippingIndexOptions) -> Result<()> {
388        self.metadata.insert(
389            SKIPPING_INDEX_KEY.to_string(),
390            serde_json::to_string(options).context(error::SerializeSnafu)?,
391        );
392        Ok(())
393    }
394
395    pub fn unset_skipping_options(&mut self) -> Result<()> {
396        self.metadata.remove(SKIPPING_INDEX_KEY);
397        Ok(())
398    }
399
400    pub fn extension_type<E>(&self) -> Result<Option<E>>
401    where
402        E: ExtensionType,
403    {
404        let extension_type_name = self.metadata.get(EXTENSION_TYPE_NAME_KEY);
405
406        if extension_type_name.map(|s| s.as_str()) == Some(E::NAME) {
407            let extension_metadata = self.metadata.get(EXTENSION_TYPE_METADATA_KEY);
408            let extension_metadata =
409                E::deserialize_metadata(extension_metadata.map(|s| s.as_str()))
410                    .context(ArrowMetadataSnafu)?;
411
412            let extension = E::try_new(&self.data_type.as_arrow_type(), extension_metadata)
413                .context(ArrowMetadataSnafu)?;
414            Ok(Some(extension))
415        } else {
416            Ok(None)
417        }
418    }
419
420    pub fn with_extension_type<E>(&mut self, extension_type: &E) -> Result<()>
421    where
422        E: ExtensionType,
423    {
424        self.metadata
425            .insert(EXTENSION_TYPE_NAME_KEY.to_string(), E::NAME.to_string());
426
427        if let Some(extension_metadata) = extension_type.serialize_metadata() {
428            self.metadata
429                .insert(EXTENSION_TYPE_METADATA_KEY.to_string(), extension_metadata);
430        }
431
432        Ok(())
433    }
434
435    pub fn is_indexed(&self) -> bool {
436        self.is_inverted_indexed() || self.is_fulltext_indexed() || self.is_skipping_indexed()
437    }
438}
439
440/// Column extended type set in column schema's metadata.
441#[derive(Debug, Clone, PartialEq, Eq)]
442pub enum ColumnExtType {
443    /// Json type.
444    Json,
445
446    /// Vector type with dimension.
447    Vector(u32),
448}
449
450impl fmt::Display for ColumnExtType {
451    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
452        match self {
453            ColumnExtType::Json => write!(f, "Json"),
454            ColumnExtType::Vector(dim) => write!(f, "Vector({})", dim),
455        }
456    }
457}
458
459impl FromStr for ColumnExtType {
460    type Err = String;
461
462    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
463        match s {
464            "Json" => Ok(ColumnExtType::Json),
465            _ if s.starts_with("Vector(") && s.ends_with(')') => s[7..s.len() - 1]
466                .parse::<u32>()
467                .map(ColumnExtType::Vector)
468                .map_err(|_| "Invalid dimension for Vector".to_string()),
469            _ => Err("Unknown variant".to_string()),
470        }
471    }
472}
473
474impl TryFrom<&Field> for ColumnSchema {
475    type Error = Error;
476
477    fn try_from(field: &Field) -> Result<ColumnSchema> {
478        let mut data_type = ConcreteDataType::try_from(field.data_type())?;
479        // Override the data type if it is specified in the metadata.
480        if let Some(s) = field.metadata().get(TYPE_KEY) {
481            let extype = ColumnExtType::from_str(s)
482                .map_err(|_| ParseExtendedTypeSnafu { value: s }.build())?;
483            match extype {
484                ColumnExtType::Json => {
485                    data_type = ConcreteDataType::json_datatype();
486                }
487                ColumnExtType::Vector(dim) => {
488                    data_type = ConcreteDataType::vector_datatype(dim);
489                }
490            }
491        }
492        let mut metadata = field.metadata().clone();
493        let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
494            Some(json) => {
495                Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
496            }
497            None => None,
498        };
499        let mut is_time_index = metadata.contains_key(TIME_INDEX_KEY);
500        if is_time_index && !data_type.is_timestamp() {
501            // If the column is time index but the data type is not timestamp, it is invalid.
502            // We set the time index to false and remove the metadata.
503            // This is possible if we cast the time index column to another type. DataFusion will
504            // keep the metadata:
505            // https://github.com/apache/datafusion/pull/12951
506            is_time_index = false;
507            metadata.remove(TIME_INDEX_KEY);
508            common_telemetry::debug!(
509                "Column {} is not timestamp ({:?}) but has time index metadata",
510                data_type,
511                field.name(),
512            );
513        }
514
515        Ok(ColumnSchema {
516            name: field.name().clone(),
517            data_type,
518            is_nullable: field.is_nullable(),
519            is_time_index,
520            default_constraint,
521            metadata,
522        })
523    }
524}
525
526impl TryFrom<&ColumnSchema> for Field {
527    type Error = Error;
528
529    fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
530        let mut metadata = column_schema.metadata.clone();
531        if let Some(value) = &column_schema.default_constraint {
532            // Adds an additional metadata to store the default constraint.
533            let old = metadata.insert(
534                DEFAULT_CONSTRAINT_KEY.to_string(),
535                serde_json::to_string(&value).context(error::SerializeSnafu)?,
536            );
537
538            ensure!(
539                old.is_none(),
540                error::DuplicateMetaSnafu {
541                    key: DEFAULT_CONSTRAINT_KEY,
542                }
543            );
544        }
545
546        Ok(Field::new(
547            &column_schema.name,
548            column_schema.data_type.as_arrow_type(),
549            column_schema.is_nullable(),
550        )
551        .with_metadata(metadata))
552    }
553}
554
555/// Fulltext options for a column.
556#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
557#[serde(rename_all = "kebab-case")]
558pub struct FulltextOptions {
559    /// Whether the fulltext index is enabled.
560    pub enable: bool,
561    /// The fulltext analyzer to use.
562    #[serde(default)]
563    pub analyzer: FulltextAnalyzer,
564    /// Whether the fulltext index is case-sensitive.
565    #[serde(default)]
566    pub case_sensitive: bool,
567    /// The fulltext backend to use.
568    #[serde(default)]
569    pub backend: FulltextBackend,
570    /// The granularity of the fulltext index (for bloom backend only)
571    #[serde(default = "fulltext_options_default_granularity")]
572    pub granularity: u32,
573    /// The false positive rate of the fulltext index (for bloom backend only)
574    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
575    pub false_positive_rate_in_10000: u32,
576}
577
578fn fulltext_options_default_granularity() -> u32 {
579    DEFAULT_GRANULARITY
580}
581
582fn index_options_default_false_positive_rate_in_10000() -> u32 {
583    (DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
584}
585
586impl FulltextOptions {
587    /// Creates a new fulltext options.
588    pub fn new(
589        enable: bool,
590        analyzer: FulltextAnalyzer,
591        case_sensitive: bool,
592        backend: FulltextBackend,
593        granularity: u32,
594        false_positive_rate: f64,
595    ) -> Result<Self> {
596        ensure!(
597            0.0 < false_positive_rate && false_positive_rate <= 1.0,
598            error::InvalidFulltextOptionSnafu {
599                msg: format!(
600                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
601                ),
602            }
603        );
604        ensure!(
605            granularity > 0,
606            error::InvalidFulltextOptionSnafu {
607                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
608            }
609        );
610        Ok(Self::new_unchecked(
611            enable,
612            analyzer,
613            case_sensitive,
614            backend,
615            granularity,
616            false_positive_rate,
617        ))
618    }
619
620    /// Creates a new fulltext options without checking `false_positive_rate` and `granularity`.
621    pub fn new_unchecked(
622        enable: bool,
623        analyzer: FulltextAnalyzer,
624        case_sensitive: bool,
625        backend: FulltextBackend,
626        granularity: u32,
627        false_positive_rate: f64,
628    ) -> Self {
629        Self {
630            enable,
631            analyzer,
632            case_sensitive,
633            backend,
634            granularity,
635            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
636        }
637    }
638
639    /// Gets the false positive rate.
640    pub fn false_positive_rate(&self) -> f64 {
641        self.false_positive_rate_in_10000 as f64 / 10000.0
642    }
643}
644
645impl Default for FulltextOptions {
646    fn default() -> Self {
647        Self::new_unchecked(
648            false,
649            FulltextAnalyzer::default(),
650            false,
651            FulltextBackend::default(),
652            DEFAULT_GRANULARITY,
653            DEFAULT_FALSE_POSITIVE_RATE,
654        )
655    }
656}
657
658impl fmt::Display for FulltextOptions {
659    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660        write!(f, "enable={}", self.enable)?;
661        if self.enable {
662            write!(f, ", analyzer={}", self.analyzer)?;
663            write!(f, ", case_sensitive={}", self.case_sensitive)?;
664            write!(f, ", backend={}", self.backend)?;
665            if self.backend == FulltextBackend::Bloom {
666                write!(f, ", granularity={}", self.granularity)?;
667                write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
668            }
669        }
670        Ok(())
671    }
672}
673
674/// The backend of the fulltext index.
675#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
676#[serde(rename_all = "kebab-case")]
677pub enum FulltextBackend {
678    #[default]
679    Bloom,
680    Tantivy,
681}
682
683impl fmt::Display for FulltextBackend {
684    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
685        match self {
686            FulltextBackend::Tantivy => write!(f, "tantivy"),
687            FulltextBackend::Bloom => write!(f, "bloom"),
688        }
689    }
690}
691
692impl TryFrom<HashMap<String, String>> for FulltextOptions {
693    type Error = Error;
694
695    fn try_from(options: HashMap<String, String>) -> Result<Self> {
696        let mut fulltext_options = FulltextOptions {
697            enable: true,
698            ..Default::default()
699        };
700
701        if let Some(enable) = options.get(COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE) {
702            match enable.to_ascii_lowercase().as_str() {
703                "true" => fulltext_options.enable = true,
704                "false" => fulltext_options.enable = false,
705                _ => {
706                    return InvalidFulltextOptionSnafu {
707                        msg: format!("{enable}, expected: 'true' | 'false'"),
708                    }
709                    .fail();
710                }
711            }
712        };
713
714        if let Some(analyzer) = options.get(COLUMN_FULLTEXT_OPT_KEY_ANALYZER) {
715            match analyzer.to_ascii_lowercase().as_str() {
716                "english" => fulltext_options.analyzer = FulltextAnalyzer::English,
717                "chinese" => fulltext_options.analyzer = FulltextAnalyzer::Chinese,
718                _ => {
719                    return InvalidFulltextOptionSnafu {
720                        msg: format!("{analyzer}, expected: 'English' | 'Chinese'"),
721                    }
722                    .fail();
723                }
724            }
725        };
726
727        if let Some(case_sensitive) = options.get(COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE) {
728            match case_sensitive.to_ascii_lowercase().as_str() {
729                "true" => fulltext_options.case_sensitive = true,
730                "false" => fulltext_options.case_sensitive = false,
731                _ => {
732                    return InvalidFulltextOptionSnafu {
733                        msg: format!("{case_sensitive}, expected: 'true' | 'false'"),
734                    }
735                    .fail();
736                }
737            }
738        }
739
740        if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
741            match backend.to_ascii_lowercase().as_str() {
742                "bloom" => fulltext_options.backend = FulltextBackend::Bloom,
743                "tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
744                _ => {
745                    return InvalidFulltextOptionSnafu {
746                        msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
747                    }
748                    .fail();
749                }
750            }
751        }
752
753        if fulltext_options.backend == FulltextBackend::Bloom {
754            // Parse granularity with default value 10240
755            let granularity = match options.get(COLUMN_FULLTEXT_OPT_KEY_GRANULARITY) {
756                Some(value) => value
757                    .parse::<u32>()
758                    .ok()
759                    .filter(|&v| v > 0)
760                    .ok_or_else(|| {
761                        error::InvalidFulltextOptionSnafu {
762                            msg: format!(
763                                "Invalid granularity: {value}, expected: positive integer"
764                            ),
765                        }
766                        .build()
767                    })?,
768                None => DEFAULT_GRANULARITY,
769            };
770            fulltext_options.granularity = granularity;
771
772            // Parse false positive rate with default value 0.01
773            let false_positive_rate = match options.get(COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE)
774            {
775                Some(value) => value
776                    .parse::<f64>()
777                    .ok()
778                    .filter(|&v| v > 0.0 && v <= 1.0)
779                    .ok_or_else(|| {
780                        error::InvalidFulltextOptionSnafu {
781                            msg: format!(
782                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
783                            ),
784                        }
785                        .build()
786                    })?,
787                None => DEFAULT_FALSE_POSITIVE_RATE,
788            };
789            fulltext_options.false_positive_rate_in_10000 = (false_positive_rate * 10000.0) as u32;
790        }
791
792        Ok(fulltext_options)
793    }
794}
795
796/// Fulltext analyzer.
797#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
798pub enum FulltextAnalyzer {
799    #[default]
800    English,
801    Chinese,
802}
803
804impl fmt::Display for FulltextAnalyzer {
805    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806        match self {
807            FulltextAnalyzer::English => write!(f, "English"),
808            FulltextAnalyzer::Chinese => write!(f, "Chinese"),
809        }
810    }
811}
812
813/// Skipping options for a column.
814#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
815#[serde(rename_all = "kebab-case")]
816pub struct SkippingIndexOptions {
817    /// The granularity of the skip index.
818    pub granularity: u32,
819    /// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
820    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
821    pub false_positive_rate_in_10000: u32,
822    /// The type of the skip index.
823    #[serde(default)]
824    pub index_type: SkippingIndexType,
825}
826
827impl SkippingIndexOptions {
828    /// Creates a new skipping index options without checking `false_positive_rate` and `granularity`.
829    pub fn new_unchecked(
830        granularity: u32,
831        false_positive_rate: f64,
832        index_type: SkippingIndexType,
833    ) -> Self {
834        Self {
835            granularity,
836            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
837            index_type,
838        }
839    }
840
841    /// Creates a new skipping index options.
842    pub fn new(
843        granularity: u32,
844        false_positive_rate: f64,
845        index_type: SkippingIndexType,
846    ) -> Result<Self> {
847        ensure!(
848            0.0 < false_positive_rate && false_positive_rate <= 1.0,
849            error::InvalidSkippingIndexOptionSnafu {
850                msg: format!(
851                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
852                ),
853            }
854        );
855        ensure!(
856            granularity > 0,
857            error::InvalidSkippingIndexOptionSnafu {
858                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
859            }
860        );
861        Ok(Self::new_unchecked(
862            granularity,
863            false_positive_rate,
864            index_type,
865        ))
866    }
867
868    /// Gets the false positive rate.
869    pub fn false_positive_rate(&self) -> f64 {
870        self.false_positive_rate_in_10000 as f64 / 10000.0
871    }
872}
873
874impl Default for SkippingIndexOptions {
875    fn default() -> Self {
876        Self::new_unchecked(
877            DEFAULT_GRANULARITY,
878            DEFAULT_FALSE_POSITIVE_RATE,
879            SkippingIndexType::default(),
880        )
881    }
882}
883
884impl fmt::Display for SkippingIndexOptions {
885    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
886        write!(f, "granularity={}", self.granularity)?;
887        write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
888        write!(f, ", index_type={}", self.index_type)?;
889        Ok(())
890    }
891}
892
893/// Skip index types.
894#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
895pub enum SkippingIndexType {
896    #[default]
897    BloomFilter,
898}
899
900impl fmt::Display for SkippingIndexType {
901    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902        match self {
903            SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
904        }
905    }
906}
907
908impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
909    type Error = Error;
910
911    fn try_from(options: HashMap<String, String>) -> Result<Self> {
912        // Parse granularity with default value 1
913        let granularity = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY) {
914            Some(value) => value
915                .parse::<u32>()
916                .ok()
917                .filter(|&v| v > 0)
918                .ok_or_else(|| {
919                    error::InvalidSkippingIndexOptionSnafu {
920                        msg: format!("Invalid granularity: {value}, expected: positive integer"),
921                    }
922                    .build()
923                })?,
924            None => DEFAULT_GRANULARITY,
925        };
926
927        // Parse false positive rate with default value 100
928        let false_positive_rate =
929            match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE) {
930                Some(value) => value
931                    .parse::<f64>()
932                    .ok()
933                    .filter(|&v| v > 0.0 && v <= 1.0)
934                    .ok_or_else(|| {
935                        error::InvalidSkippingIndexOptionSnafu {
936                            msg: format!(
937                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
938                            ),
939                        }
940                        .build()
941                    })?,
942                None => DEFAULT_FALSE_POSITIVE_RATE,
943            };
944
945        // Parse index type with default value BloomFilter
946        let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
947            Some(typ) => match typ.to_ascii_uppercase().as_str() {
948                "BLOOM" => SkippingIndexType::BloomFilter,
949                _ => {
950                    return error::InvalidSkippingIndexOptionSnafu {
951                        msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
952                    }
953                    .fail();
954                }
955            },
956            None => SkippingIndexType::default(),
957        };
958
959        Ok(SkippingIndexOptions::new_unchecked(
960            granularity,
961            false_positive_rate,
962            index_type,
963        ))
964    }
965}
966
967#[cfg(test)]
968mod tests {
969    use std::sync::Arc;
970
971    use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
972
973    use super::*;
974    use crate::value::Value;
975    use crate::vectors::Int32Vector;
976
977    #[test]
978    fn test_column_schema() {
979        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
980        let field = Field::try_from(&column_schema).unwrap();
981        assert_eq!("test", field.name());
982        assert_eq!(ArrowDataType::Int32, *field.data_type());
983        assert!(field.is_nullable());
984
985        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
986        assert_eq!(column_schema, new_column_schema);
987    }
988
989    #[test]
990    fn test_column_schema_with_default_constraint() {
991        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
992            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
993            .unwrap();
994        assert!(
995            column_schema
996                .metadata()
997                .get(DEFAULT_CONSTRAINT_KEY)
998                .is_none()
999        );
1000
1001        let field = Field::try_from(&column_schema).unwrap();
1002        assert_eq!("test", field.name());
1003        assert_eq!(ArrowDataType::Int32, *field.data_type());
1004        assert!(field.is_nullable());
1005        assert_eq!(
1006            "{\"Value\":{\"Int32\":99}}",
1007            field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
1008        );
1009
1010        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
1011        assert_eq!(column_schema, new_column_schema);
1012    }
1013
1014    #[test]
1015    fn test_column_schema_with_metadata() {
1016        let metadata = Metadata::from([
1017            ("k1".to_string(), "v1".to_string()),
1018            (COMMENT_KEY.to_string(), "test comment".to_string()),
1019        ]);
1020        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1021            .with_metadata(metadata)
1022            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1023            .unwrap();
1024        assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
1025        assert_eq!("test comment", column_schema.column_comment().unwrap());
1026        assert!(
1027            column_schema
1028                .metadata()
1029                .get(DEFAULT_CONSTRAINT_KEY)
1030                .is_none()
1031        );
1032
1033        let field = Field::try_from(&column_schema).unwrap();
1034        assert_eq!("v1", field.metadata().get("k1").unwrap());
1035        let _ = field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap();
1036
1037        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
1038        assert_eq!(column_schema, new_column_schema);
1039    }
1040
1041    #[test]
1042    fn test_column_schema_with_duplicate_metadata() {
1043        let metadata = Metadata::from([(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string())]);
1044        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1045            .with_metadata(metadata)
1046            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1047            .unwrap();
1048        assert!(Field::try_from(&column_schema).is_err());
1049    }
1050
1051    #[test]
1052    fn test_column_schema_invalid_default_constraint() {
1053        assert!(
1054            ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
1055                .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1056                .is_err()
1057        );
1058    }
1059
1060    #[test]
1061    fn test_column_default_constraint_try_into_from() {
1062        let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
1063
1064        let bytes: Vec<u8> = default_constraint.clone().try_into().unwrap();
1065        let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap();
1066
1067        assert_eq!(default_constraint, from_value);
1068    }
1069
1070    #[test]
1071    fn test_column_schema_create_default_null() {
1072        // Implicit default null.
1073        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1074        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1075        assert_eq!(5, v.len());
1076        assert!(v.only_null());
1077
1078        // Explicit default null.
1079        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1080            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1081            .unwrap();
1082        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1083        assert_eq!(5, v.len());
1084        assert!(v.only_null());
1085    }
1086
1087    #[test]
1088    fn test_column_schema_no_default() {
1089        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1090        assert!(column_schema.create_default_vector(5).unwrap().is_none());
1091    }
1092
1093    #[test]
1094    fn test_create_default_vector_for_padding() {
1095        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1096        let vector = column_schema.create_default_vector_for_padding(4);
1097        assert!(vector.only_null());
1098        assert_eq!(4, vector.len());
1099
1100        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1101        let vector = column_schema.create_default_vector_for_padding(4);
1102        assert_eq!(4, vector.len());
1103        let expect: VectorRef = Arc::new(Int32Vector::from_slice([0, 0, 0, 0]));
1104        assert_eq!(expect, vector);
1105    }
1106
1107    #[test]
1108    fn test_column_schema_single_create_default_null() {
1109        // Implicit default null.
1110        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1111        let v = column_schema.create_default().unwrap().unwrap();
1112        assert!(v.is_null());
1113
1114        // Explicit default null.
1115        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1116            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1117            .unwrap();
1118        let v = column_schema.create_default().unwrap().unwrap();
1119        assert!(v.is_null());
1120    }
1121
1122    #[test]
1123    fn test_column_schema_single_create_default_not_null() {
1124        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1125            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(6))))
1126            .unwrap();
1127        let v = column_schema.create_default().unwrap().unwrap();
1128        assert_eq!(v, Value::Int32(6));
1129    }
1130
1131    #[test]
1132    fn test_column_schema_single_no_default() {
1133        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1134        assert!(column_schema.create_default().unwrap().is_none());
1135    }
1136
1137    #[test]
1138    fn test_debug_for_column_schema() {
1139        let column_schema_int8 =
1140            ColumnSchema::new("test_column_1", ConcreteDataType::int8_datatype(), true);
1141
1142        let column_schema_int32 =
1143            ColumnSchema::new("test_column_2", ConcreteDataType::int32_datatype(), false);
1144
1145        let formatted_int8 = format!("{:?}", column_schema_int8);
1146        let formatted_int32 = format!("{:?}", column_schema_int32);
1147        assert_eq!(formatted_int8, "test_column_1 Int8 null");
1148        assert_eq!(formatted_int32, "test_column_2 Int32 not null");
1149    }
1150
1151    #[test]
1152    fn test_from_field_to_column_schema() {
1153        let field = Field::new("test", ArrowDataType::Int32, true);
1154        let column_schema = ColumnSchema::try_from(&field).unwrap();
1155        assert_eq!("test", column_schema.name);
1156        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1157        assert!(column_schema.is_nullable);
1158        assert!(!column_schema.is_time_index);
1159        assert!(column_schema.default_constraint.is_none());
1160        assert!(column_schema.metadata.is_empty());
1161
1162        let field = Field::new("test", ArrowDataType::Binary, true);
1163        let field = field.with_metadata(Metadata::from([(
1164            TYPE_KEY.to_string(),
1165            ConcreteDataType::json_datatype().name(),
1166        )]));
1167        let column_schema = ColumnSchema::try_from(&field).unwrap();
1168        assert_eq!("test", column_schema.name);
1169        assert_eq!(ConcreteDataType::json_datatype(), column_schema.data_type);
1170        assert!(column_schema.is_nullable);
1171        assert!(!column_schema.is_time_index);
1172        assert!(column_schema.default_constraint.is_none());
1173        assert_eq!(
1174            column_schema.metadata.get(TYPE_KEY).unwrap(),
1175            &ConcreteDataType::json_datatype().name()
1176        );
1177
1178        let field = Field::new("test", ArrowDataType::Binary, true);
1179        let field = field.with_metadata(Metadata::from([(
1180            TYPE_KEY.to_string(),
1181            ConcreteDataType::vector_datatype(3).name(),
1182        )]));
1183        let column_schema = ColumnSchema::try_from(&field).unwrap();
1184        assert_eq!("test", column_schema.name);
1185        assert_eq!(
1186            ConcreteDataType::vector_datatype(3),
1187            column_schema.data_type
1188        );
1189        assert!(column_schema.is_nullable);
1190        assert!(!column_schema.is_time_index);
1191        assert!(column_schema.default_constraint.is_none());
1192        assert_eq!(
1193            column_schema.metadata.get(TYPE_KEY).unwrap(),
1194            &ConcreteDataType::vector_datatype(3).name()
1195        );
1196    }
1197
1198    #[test]
1199    fn test_column_schema_fix_time_index() {
1200        let field = Field::new(
1201            "test",
1202            ArrowDataType::Timestamp(TimeUnit::Second, None),
1203            false,
1204        );
1205        let field = field.with_metadata(Metadata::from([(
1206            TIME_INDEX_KEY.to_string(),
1207            "true".to_string(),
1208        )]));
1209        let column_schema = ColumnSchema::try_from(&field).unwrap();
1210        assert_eq!("test", column_schema.name);
1211        assert_eq!(
1212            ConcreteDataType::timestamp_second_datatype(),
1213            column_schema.data_type
1214        );
1215        assert!(!column_schema.is_nullable);
1216        assert!(column_schema.is_time_index);
1217        assert!(column_schema.default_constraint.is_none());
1218        assert_eq!(1, column_schema.metadata().len());
1219
1220        let field = Field::new("test", ArrowDataType::Int32, false);
1221        let field = field.with_metadata(Metadata::from([(
1222            TIME_INDEX_KEY.to_string(),
1223            "true".to_string(),
1224        )]));
1225        let column_schema = ColumnSchema::try_from(&field).unwrap();
1226        assert_eq!("test", column_schema.name);
1227        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1228        assert!(!column_schema.is_nullable);
1229        assert!(!column_schema.is_time_index);
1230        assert!(column_schema.default_constraint.is_none());
1231        assert!(column_schema.metadata.is_empty());
1232    }
1233
1234    #[test]
1235    fn test_skipping_index_options_deserialization() {
1236        let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
1237        let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
1238        assert_eq!(1024, options.granularity);
1239        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1240        assert_eq!(0.001, options.false_positive_rate());
1241
1242        let options_str = serde_json::to_string(&options).unwrap();
1243        assert_eq!(options_str, original_options);
1244    }
1245
1246    #[test]
1247    fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
1248        let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
1249        let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
1250        assert_eq!(10240, options.granularity);
1251        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1252        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1253
1254        let options_str = serde_json::to_string(&options).unwrap();
1255        assert_eq!(
1256            options_str,
1257            "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}"
1258        );
1259    }
1260
1261    #[test]
1262    fn test_fulltext_options_deserialization() {
1263        let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
1264        let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
1265        assert!(!options.case_sensitive);
1266        assert!(options.enable);
1267        assert_eq!(FulltextBackend::Bloom, options.backend);
1268        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1269        assert_eq!(1024, options.granularity);
1270        assert_eq!(0.001, options.false_positive_rate());
1271
1272        let options_str = serde_json::to_string(&options).unwrap();
1273        assert_eq!(options_str, original_options);
1274    }
1275
1276    #[test]
1277    fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
1278        // 0.14 to 0.15
1279        let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
1280        let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
1281        assert!(!options.case_sensitive);
1282        assert!(options.enable);
1283        assert_eq!(FulltextBackend::Bloom, options.backend);
1284        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1285        assert_eq!(DEFAULT_GRANULARITY, options.granularity);
1286        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1287
1288        let options_str = serde_json::to_string(&options).unwrap();
1289        assert_eq!(
1290            options_str,
1291            "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}"
1292        );
1293    }
1294}