datatypes/schema/
column_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18
19use arrow::datatypes::Field;
20use arrow_schema::extension::{
21    EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY, ExtensionType,
22};
23use serde::{Deserialize, Serialize};
24use snafu::{ResultExt, ensure};
25use sqlparser_derive::{Visit, VisitMut};
26
27use crate::data_type::{ConcreteDataType, DataType};
28use crate::error::{
29    self, ArrowMetadataSnafu, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result,
30};
31use crate::schema::TYPE_KEY;
32use crate::schema::constraint::ColumnDefaultConstraint;
33use crate::value::Value;
34use crate::vectors::VectorRef;
35
36pub type Metadata = HashMap<String, String>;
37
38/// Key used to store whether the column is time index in arrow field's metadata.
39pub const TIME_INDEX_KEY: &str = "greptime:time_index";
40pub const COMMENT_KEY: &str = "greptime:storage:comment";
41/// Key used to store default constraint in arrow field's metadata.
42const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
43/// Key used to store fulltext options in arrow field's metadata.
44pub const FULLTEXT_KEY: &str = "greptime:fulltext";
45/// Key used to store whether the column has inverted index in arrow field's metadata.
46pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
47/// Key used to store skip options in arrow field's metadata.
48pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
49
50/// Keys used in fulltext options
51pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
52pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
53pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
54pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
55pub const COLUMN_FULLTEXT_OPT_KEY_GRANULARITY: &str = "granularity";
56pub const COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
57
58/// Keys used in SKIPPING index options
59pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
60pub const COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
61pub const COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE: &str = "type";
62
63pub const DEFAULT_GRANULARITY: u32 = 10240;
64
65pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
66
67/// Schema of a column, used as an immutable struct.
68#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct ColumnSchema {
70    pub name: String,
71    pub data_type: ConcreteDataType,
72    is_nullable: bool,
73    is_time_index: bool,
74    default_constraint: Option<ColumnDefaultConstraint>,
75    metadata: Metadata,
76}
77
78impl fmt::Debug for ColumnSchema {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        write!(
81            f,
82            "{} {} {}",
83            self.name,
84            self.data_type,
85            if self.is_nullable { "null" } else { "not null" },
86        )?;
87
88        if self.is_time_index {
89            write!(f, " time_index")?;
90        }
91
92        // Add default constraint if present
93        if let Some(default_constraint) = &self.default_constraint {
94            write!(f, " default={:?}", default_constraint)?;
95        }
96
97        // Add metadata if present
98        if !self.metadata.is_empty() {
99            write!(f, " metadata={:?}", self.metadata)?;
100        }
101
102        Ok(())
103    }
104}
105
106impl ColumnSchema {
107    pub fn new<T: Into<String>>(
108        name: T,
109        data_type: ConcreteDataType,
110        is_nullable: bool,
111    ) -> ColumnSchema {
112        ColumnSchema {
113            name: name.into(),
114            data_type,
115            is_nullable,
116            is_time_index: false,
117            default_constraint: None,
118            metadata: Metadata::new(),
119        }
120    }
121
122    #[inline]
123    pub fn is_time_index(&self) -> bool {
124        self.is_time_index
125    }
126
127    #[inline]
128    pub fn is_nullable(&self) -> bool {
129        self.is_nullable
130    }
131
132    #[inline]
133    pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
134        self.default_constraint.as_ref()
135    }
136
137    /// Check if the default constraint is a impure function.
138    pub fn is_default_impure(&self) -> bool {
139        self.default_constraint
140            .as_ref()
141            .map(|c| c.is_function())
142            .unwrap_or(false)
143    }
144
145    #[inline]
146    pub fn metadata(&self) -> &Metadata {
147        &self.metadata
148    }
149
150    #[inline]
151    pub fn mut_metadata(&mut self) -> &mut Metadata {
152        &mut self.metadata
153    }
154
155    /// Retrieve the column comment
156    pub fn column_comment(&self) -> Option<&String> {
157        self.metadata.get(COMMENT_KEY)
158    }
159
160    pub fn with_time_index(mut self, is_time_index: bool) -> Self {
161        self.is_time_index = is_time_index;
162        if is_time_index {
163            let _ = self
164                .metadata
165                .insert(TIME_INDEX_KEY.to_string(), "true".to_string());
166        } else {
167            let _ = self.metadata.remove(TIME_INDEX_KEY);
168        }
169        self
170    }
171
172    /// Set the inverted index for the column.
173    /// Similar to [with_inverted_index] but don't take the ownership.
174    ///
175    /// [with_inverted_index]: Self::with_inverted_index
176    pub fn set_inverted_index(&mut self, value: bool) {
177        match value {
178            true => {
179                self.metadata
180                    .insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
181            }
182            false => {
183                self.metadata.remove(INVERTED_INDEX_KEY);
184            }
185        }
186    }
187
188    /// Set the inverted index for the column.
189    /// Similar to [set_inverted_index] but take the ownership and return a owned value.
190    ///
191    /// [set_inverted_index]: Self::set_inverted_index
192    pub fn with_inverted_index(mut self, value: bool) -> Self {
193        self.set_inverted_index(value);
194        self
195    }
196
197    pub fn is_inverted_indexed(&self) -> bool {
198        self.metadata
199            .get(INVERTED_INDEX_KEY)
200            .map(|v| v.eq_ignore_ascii_case("true"))
201            .unwrap_or(false)
202    }
203
204    pub fn is_fulltext_indexed(&self) -> bool {
205        self.fulltext_options()
206            .unwrap_or_default()
207            .map(|option| option.enable)
208            .unwrap_or_default()
209    }
210
211    pub fn is_skipping_indexed(&self) -> bool {
212        self.skipping_index_options().unwrap_or_default().is_some()
213    }
214
215    pub fn has_inverted_index_key(&self) -> bool {
216        self.metadata.contains_key(INVERTED_INDEX_KEY)
217    }
218
219    /// Set default constraint.
220    ///
221    /// If a default constraint exists for the column, this method will
222    /// validate it against the column's data type and nullability.
223    pub fn with_default_constraint(
224        mut self,
225        default_constraint: Option<ColumnDefaultConstraint>,
226    ) -> Result<Self> {
227        if let Some(constraint) = &default_constraint {
228            constraint.validate(&self.data_type, self.is_nullable)?;
229        }
230
231        self.default_constraint = default_constraint;
232        Ok(self)
233    }
234
235    /// Set the nullablity to `true` of the column.
236    /// Similar to [set_nullable] but take the ownership and return a owned value.
237    ///
238    /// [set_nullable]: Self::set_nullable
239    pub fn with_nullable_set(mut self) -> Self {
240        self.is_nullable = true;
241        self
242    }
243
244    /// Set the nullability to `true` of the column.
245    /// Similar to [with_nullable_set] but don't take the ownership
246    ///
247    /// [with_nullable_set]: Self::with_nullable_set
248    pub fn set_nullable(&mut self) {
249        self.is_nullable = true;
250    }
251
252    /// Set the `is_time_index` to `true` of the column.
253    /// Similar to [with_time_index] but don't take the ownership.
254    ///
255    /// [with_time_index]: Self::with_time_index
256    pub fn set_time_index(&mut self) {
257        self.is_time_index = true;
258    }
259
260    /// Creates a new [`ColumnSchema`] with given metadata.
261    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
262        self.metadata = metadata;
263        self
264    }
265
266    /// Creates a vector with default value for this column.
267    ///
268    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
269    pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
270        match &self.default_constraint {
271            Some(c) => c
272                .create_default_vector(&self.data_type, self.is_nullable, num_rows)
273                .map(Some),
274            None => {
275                if self.is_nullable {
276                    // No default constraint, use null as default value.
277                    // TODO(yingwen): Use NullVector once it supports setting logical type.
278                    ColumnDefaultConstraint::null_value()
279                        .create_default_vector(&self.data_type, self.is_nullable, num_rows)
280                        .map(Some)
281                } else {
282                    Ok(None)
283                }
284            }
285        }
286    }
287
288    /// Creates a vector for padding.
289    ///
290    /// This method always returns a vector since it uses [DataType::default_value]
291    /// to fill the vector. Callers should only use the created vector for padding
292    /// and never read its content.
293    pub fn create_default_vector_for_padding(&self, num_rows: usize) -> VectorRef {
294        let padding_value = if self.is_nullable {
295            Value::Null
296        } else {
297            // If the column is not null, use the data type's default value as it is
298            // more efficient to acquire.
299            self.data_type.default_value()
300        };
301        let value_ref = padding_value.as_value_ref();
302        let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
303        for _ in 0..num_rows {
304            mutable_vector.push_value_ref(&value_ref);
305        }
306        mutable_vector.to_vector()
307    }
308
309    /// Creates a default value for this column.
310    ///
311    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
312    pub fn create_default(&self) -> Result<Option<Value>> {
313        match &self.default_constraint {
314            Some(c) => c
315                .create_default(&self.data_type, self.is_nullable)
316                .map(Some),
317            None => {
318                if self.is_nullable {
319                    // No default constraint, use null as default value.
320                    ColumnDefaultConstraint::null_value()
321                        .create_default(&self.data_type, self.is_nullable)
322                        .map(Some)
323                } else {
324                    Ok(None)
325                }
326            }
327        }
328    }
329
330    /// Creates an impure default value for this column, only if it have a impure default constraint.
331    /// Otherwise, returns `Ok(None)`.
332    pub fn create_impure_default(&self) -> Result<Option<Value>> {
333        match &self.default_constraint {
334            Some(c) => c.create_impure_default(&self.data_type),
335            None => Ok(None),
336        }
337    }
338
339    /// Retrieves the fulltext options for the column.
340    pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
341        match self.metadata.get(FULLTEXT_KEY) {
342            None => Ok(None),
343            Some(json) => {
344                let options =
345                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
346                Ok(Some(options))
347            }
348        }
349    }
350
351    pub fn with_fulltext_options(mut self, options: FulltextOptions) -> Result<Self> {
352        self.metadata.insert(
353            FULLTEXT_KEY.to_string(),
354            serde_json::to_string(&options).context(error::SerializeSnafu)?,
355        );
356        Ok(self)
357    }
358
359    pub fn set_fulltext_options(&mut self, options: &FulltextOptions) -> Result<()> {
360        self.metadata.insert(
361            FULLTEXT_KEY.to_string(),
362            serde_json::to_string(options).context(error::SerializeSnafu)?,
363        );
364        Ok(())
365    }
366
367    /// Retrieves the skipping index options for the column.
368    pub fn skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
369        match self.metadata.get(SKIPPING_INDEX_KEY) {
370            None => Ok(None),
371            Some(json) => {
372                let options =
373                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
374                Ok(Some(options))
375            }
376        }
377    }
378
379    pub fn with_skipping_options(mut self, options: SkippingIndexOptions) -> Result<Self> {
380        self.metadata.insert(
381            SKIPPING_INDEX_KEY.to_string(),
382            serde_json::to_string(&options).context(error::SerializeSnafu)?,
383        );
384        Ok(self)
385    }
386
387    pub fn set_skipping_options(&mut self, options: &SkippingIndexOptions) -> Result<()> {
388        self.metadata.insert(
389            SKIPPING_INDEX_KEY.to_string(),
390            serde_json::to_string(options).context(error::SerializeSnafu)?,
391        );
392        Ok(())
393    }
394
395    pub fn unset_skipping_options(&mut self) -> Result<()> {
396        self.metadata.remove(SKIPPING_INDEX_KEY);
397        Ok(())
398    }
399
400    pub fn extension_type<E>(&self) -> Result<Option<E>>
401    where
402        E: ExtensionType,
403    {
404        let extension_type_name = self.metadata.get(EXTENSION_TYPE_NAME_KEY);
405
406        if extension_type_name.map(|s| s.as_str()) == Some(E::NAME) {
407            let extension_metadata = self.metadata.get(EXTENSION_TYPE_METADATA_KEY);
408            let extension_metadata =
409                E::deserialize_metadata(extension_metadata.map(|s| s.as_str()))
410                    .context(ArrowMetadataSnafu)?;
411
412            let extension = E::try_new(&self.data_type.as_arrow_type(), extension_metadata)
413                .context(ArrowMetadataSnafu)?;
414            Ok(Some(extension))
415        } else {
416            Ok(None)
417        }
418    }
419
420    pub fn with_extension_type<E>(&mut self, extension_type: &E) -> Result<()>
421    where
422        E: ExtensionType,
423    {
424        self.metadata
425            .insert(EXTENSION_TYPE_NAME_KEY.to_string(), E::NAME.to_string());
426
427        if let Some(extension_metadata) = extension_type.serialize_metadata() {
428            self.metadata
429                .insert(EXTENSION_TYPE_METADATA_KEY.to_string(), extension_metadata);
430        }
431
432        Ok(())
433    }
434}
435
436/// Column extended type set in column schema's metadata.
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub enum ColumnExtType {
439    /// Json type.
440    Json,
441
442    /// Vector type with dimension.
443    Vector(u32),
444}
445
446impl fmt::Display for ColumnExtType {
447    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
448        match self {
449            ColumnExtType::Json => write!(f, "Json"),
450            ColumnExtType::Vector(dim) => write!(f, "Vector({})", dim),
451        }
452    }
453}
454
455impl FromStr for ColumnExtType {
456    type Err = String;
457
458    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
459        match s {
460            "Json" => Ok(ColumnExtType::Json),
461            _ if s.starts_with("Vector(") && s.ends_with(')') => s[7..s.len() - 1]
462                .parse::<u32>()
463                .map(ColumnExtType::Vector)
464                .map_err(|_| "Invalid dimension for Vector".to_string()),
465            _ => Err("Unknown variant".to_string()),
466        }
467    }
468}
469
470impl TryFrom<&Field> for ColumnSchema {
471    type Error = Error;
472
473    fn try_from(field: &Field) -> Result<ColumnSchema> {
474        let mut data_type = ConcreteDataType::try_from(field.data_type())?;
475        // Override the data type if it is specified in the metadata.
476        if let Some(s) = field.metadata().get(TYPE_KEY) {
477            let extype = ColumnExtType::from_str(s)
478                .map_err(|_| ParseExtendedTypeSnafu { value: s }.build())?;
479            match extype {
480                ColumnExtType::Json => {
481                    data_type = ConcreteDataType::json_datatype();
482                }
483                ColumnExtType::Vector(dim) => {
484                    data_type = ConcreteDataType::vector_datatype(dim);
485                }
486            }
487        }
488        let mut metadata = field.metadata().clone();
489        let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
490            Some(json) => {
491                Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
492            }
493            None => None,
494        };
495        let mut is_time_index = metadata.contains_key(TIME_INDEX_KEY);
496        if is_time_index && !data_type.is_timestamp() {
497            // If the column is time index but the data type is not timestamp, it is invalid.
498            // We set the time index to false and remove the metadata.
499            // This is possible if we cast the time index column to another type. DataFusion will
500            // keep the metadata:
501            // https://github.com/apache/datafusion/pull/12951
502            is_time_index = false;
503            metadata.remove(TIME_INDEX_KEY);
504            common_telemetry::debug!(
505                "Column {} is not timestamp ({:?}) but has time index metadata",
506                data_type,
507                field.name(),
508            );
509        }
510
511        Ok(ColumnSchema {
512            name: field.name().clone(),
513            data_type,
514            is_nullable: field.is_nullable(),
515            is_time_index,
516            default_constraint,
517            metadata,
518        })
519    }
520}
521
522impl TryFrom<&ColumnSchema> for Field {
523    type Error = Error;
524
525    fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
526        let mut metadata = column_schema.metadata.clone();
527        if let Some(value) = &column_schema.default_constraint {
528            // Adds an additional metadata to store the default constraint.
529            let old = metadata.insert(
530                DEFAULT_CONSTRAINT_KEY.to_string(),
531                serde_json::to_string(&value).context(error::SerializeSnafu)?,
532            );
533
534            ensure!(
535                old.is_none(),
536                error::DuplicateMetaSnafu {
537                    key: DEFAULT_CONSTRAINT_KEY,
538                }
539            );
540        }
541
542        Ok(Field::new(
543            &column_schema.name,
544            column_schema.data_type.as_arrow_type(),
545            column_schema.is_nullable(),
546        )
547        .with_metadata(metadata))
548    }
549}
550
551/// Fulltext options for a column.
552#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
553#[serde(rename_all = "kebab-case")]
554pub struct FulltextOptions {
555    /// Whether the fulltext index is enabled.
556    pub enable: bool,
557    /// The fulltext analyzer to use.
558    #[serde(default)]
559    pub analyzer: FulltextAnalyzer,
560    /// Whether the fulltext index is case-sensitive.
561    #[serde(default)]
562    pub case_sensitive: bool,
563    /// The fulltext backend to use.
564    #[serde(default)]
565    pub backend: FulltextBackend,
566    /// The granularity of the fulltext index (for bloom backend only)
567    #[serde(default = "fulltext_options_default_granularity")]
568    pub granularity: u32,
569    /// The false positive rate of the fulltext index (for bloom backend only)
570    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
571    pub false_positive_rate_in_10000: u32,
572}
573
574fn fulltext_options_default_granularity() -> u32 {
575    DEFAULT_GRANULARITY
576}
577
578fn index_options_default_false_positive_rate_in_10000() -> u32 {
579    (DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
580}
581
582impl FulltextOptions {
583    /// Creates a new fulltext options.
584    pub fn new(
585        enable: bool,
586        analyzer: FulltextAnalyzer,
587        case_sensitive: bool,
588        backend: FulltextBackend,
589        granularity: u32,
590        false_positive_rate: f64,
591    ) -> Result<Self> {
592        ensure!(
593            0.0 < false_positive_rate && false_positive_rate <= 1.0,
594            error::InvalidFulltextOptionSnafu {
595                msg: format!(
596                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
597                ),
598            }
599        );
600        ensure!(
601            granularity > 0,
602            error::InvalidFulltextOptionSnafu {
603                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
604            }
605        );
606        Ok(Self::new_unchecked(
607            enable,
608            analyzer,
609            case_sensitive,
610            backend,
611            granularity,
612            false_positive_rate,
613        ))
614    }
615
616    /// Creates a new fulltext options without checking `false_positive_rate` and `granularity`.
617    pub fn new_unchecked(
618        enable: bool,
619        analyzer: FulltextAnalyzer,
620        case_sensitive: bool,
621        backend: FulltextBackend,
622        granularity: u32,
623        false_positive_rate: f64,
624    ) -> Self {
625        Self {
626            enable,
627            analyzer,
628            case_sensitive,
629            backend,
630            granularity,
631            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
632        }
633    }
634
635    /// Gets the false positive rate.
636    pub fn false_positive_rate(&self) -> f64 {
637        self.false_positive_rate_in_10000 as f64 / 10000.0
638    }
639}
640
641impl Default for FulltextOptions {
642    fn default() -> Self {
643        Self::new_unchecked(
644            false,
645            FulltextAnalyzer::default(),
646            false,
647            FulltextBackend::default(),
648            DEFAULT_GRANULARITY,
649            DEFAULT_FALSE_POSITIVE_RATE,
650        )
651    }
652}
653
654impl fmt::Display for FulltextOptions {
655    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
656        write!(f, "enable={}", self.enable)?;
657        if self.enable {
658            write!(f, ", analyzer={}", self.analyzer)?;
659            write!(f, ", case_sensitive={}", self.case_sensitive)?;
660            write!(f, ", backend={}", self.backend)?;
661            if self.backend == FulltextBackend::Bloom {
662                write!(f, ", granularity={}", self.granularity)?;
663                write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
664            }
665        }
666        Ok(())
667    }
668}
669
670/// The backend of the fulltext index.
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
672#[serde(rename_all = "kebab-case")]
673pub enum FulltextBackend {
674    #[default]
675    Bloom,
676    Tantivy,
677}
678
679impl fmt::Display for FulltextBackend {
680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681        match self {
682            FulltextBackend::Tantivy => write!(f, "tantivy"),
683            FulltextBackend::Bloom => write!(f, "bloom"),
684        }
685    }
686}
687
688impl TryFrom<HashMap<String, String>> for FulltextOptions {
689    type Error = Error;
690
691    fn try_from(options: HashMap<String, String>) -> Result<Self> {
692        let mut fulltext_options = FulltextOptions {
693            enable: true,
694            ..Default::default()
695        };
696
697        if let Some(enable) = options.get(COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE) {
698            match enable.to_ascii_lowercase().as_str() {
699                "true" => fulltext_options.enable = true,
700                "false" => fulltext_options.enable = false,
701                _ => {
702                    return InvalidFulltextOptionSnafu {
703                        msg: format!("{enable}, expected: 'true' | 'false'"),
704                    }
705                    .fail();
706                }
707            }
708        };
709
710        if let Some(analyzer) = options.get(COLUMN_FULLTEXT_OPT_KEY_ANALYZER) {
711            match analyzer.to_ascii_lowercase().as_str() {
712                "english" => fulltext_options.analyzer = FulltextAnalyzer::English,
713                "chinese" => fulltext_options.analyzer = FulltextAnalyzer::Chinese,
714                _ => {
715                    return InvalidFulltextOptionSnafu {
716                        msg: format!("{analyzer}, expected: 'English' | 'Chinese'"),
717                    }
718                    .fail();
719                }
720            }
721        };
722
723        if let Some(case_sensitive) = options.get(COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE) {
724            match case_sensitive.to_ascii_lowercase().as_str() {
725                "true" => fulltext_options.case_sensitive = true,
726                "false" => fulltext_options.case_sensitive = false,
727                _ => {
728                    return InvalidFulltextOptionSnafu {
729                        msg: format!("{case_sensitive}, expected: 'true' | 'false'"),
730                    }
731                    .fail();
732                }
733            }
734        }
735
736        if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
737            match backend.to_ascii_lowercase().as_str() {
738                "bloom" => fulltext_options.backend = FulltextBackend::Bloom,
739                "tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
740                _ => {
741                    return InvalidFulltextOptionSnafu {
742                        msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
743                    }
744                    .fail();
745                }
746            }
747        }
748
749        if fulltext_options.backend == FulltextBackend::Bloom {
750            // Parse granularity with default value 10240
751            let granularity = match options.get(COLUMN_FULLTEXT_OPT_KEY_GRANULARITY) {
752                Some(value) => value
753                    .parse::<u32>()
754                    .ok()
755                    .filter(|&v| v > 0)
756                    .ok_or_else(|| {
757                        error::InvalidFulltextOptionSnafu {
758                            msg: format!(
759                                "Invalid granularity: {value}, expected: positive integer"
760                            ),
761                        }
762                        .build()
763                    })?,
764                None => DEFAULT_GRANULARITY,
765            };
766            fulltext_options.granularity = granularity;
767
768            // Parse false positive rate with default value 0.01
769            let false_positive_rate = match options.get(COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE)
770            {
771                Some(value) => value
772                    .parse::<f64>()
773                    .ok()
774                    .filter(|&v| v > 0.0 && v <= 1.0)
775                    .ok_or_else(|| {
776                        error::InvalidFulltextOptionSnafu {
777                            msg: format!(
778                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
779                            ),
780                        }
781                        .build()
782                    })?,
783                None => DEFAULT_FALSE_POSITIVE_RATE,
784            };
785            fulltext_options.false_positive_rate_in_10000 = (false_positive_rate * 10000.0) as u32;
786        }
787
788        Ok(fulltext_options)
789    }
790}
791
792/// Fulltext analyzer.
793#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
794pub enum FulltextAnalyzer {
795    #[default]
796    English,
797    Chinese,
798}
799
800impl fmt::Display for FulltextAnalyzer {
801    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802        match self {
803            FulltextAnalyzer::English => write!(f, "English"),
804            FulltextAnalyzer::Chinese => write!(f, "Chinese"),
805        }
806    }
807}
808
809/// Skipping options for a column.
810#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
811#[serde(rename_all = "kebab-case")]
812pub struct SkippingIndexOptions {
813    /// The granularity of the skip index.
814    pub granularity: u32,
815    /// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
816    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
817    pub false_positive_rate_in_10000: u32,
818    /// The type of the skip index.
819    #[serde(default)]
820    pub index_type: SkippingIndexType,
821}
822
823impl SkippingIndexOptions {
824    /// Creates a new skipping index options without checking `false_positive_rate` and `granularity`.
825    pub fn new_unchecked(
826        granularity: u32,
827        false_positive_rate: f64,
828        index_type: SkippingIndexType,
829    ) -> Self {
830        Self {
831            granularity,
832            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
833            index_type,
834        }
835    }
836
837    /// Creates a new skipping index options.
838    pub fn new(
839        granularity: u32,
840        false_positive_rate: f64,
841        index_type: SkippingIndexType,
842    ) -> Result<Self> {
843        ensure!(
844            0.0 < false_positive_rate && false_positive_rate <= 1.0,
845            error::InvalidSkippingIndexOptionSnafu {
846                msg: format!(
847                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
848                ),
849            }
850        );
851        ensure!(
852            granularity > 0,
853            error::InvalidSkippingIndexOptionSnafu {
854                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
855            }
856        );
857        Ok(Self::new_unchecked(
858            granularity,
859            false_positive_rate,
860            index_type,
861        ))
862    }
863
864    /// Gets the false positive rate.
865    pub fn false_positive_rate(&self) -> f64 {
866        self.false_positive_rate_in_10000 as f64 / 10000.0
867    }
868}
869
870impl Default for SkippingIndexOptions {
871    fn default() -> Self {
872        Self::new_unchecked(
873            DEFAULT_GRANULARITY,
874            DEFAULT_FALSE_POSITIVE_RATE,
875            SkippingIndexType::default(),
876        )
877    }
878}
879
880impl fmt::Display for SkippingIndexOptions {
881    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
882        write!(f, "granularity={}", self.granularity)?;
883        write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
884        write!(f, ", index_type={}", self.index_type)?;
885        Ok(())
886    }
887}
888
889/// Skip index types.
890#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
891pub enum SkippingIndexType {
892    #[default]
893    BloomFilter,
894}
895
896impl fmt::Display for SkippingIndexType {
897    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898        match self {
899            SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
900        }
901    }
902}
903
904impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
905    type Error = Error;
906
907    fn try_from(options: HashMap<String, String>) -> Result<Self> {
908        // Parse granularity with default value 1
909        let granularity = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY) {
910            Some(value) => value
911                .parse::<u32>()
912                .ok()
913                .filter(|&v| v > 0)
914                .ok_or_else(|| {
915                    error::InvalidSkippingIndexOptionSnafu {
916                        msg: format!("Invalid granularity: {value}, expected: positive integer"),
917                    }
918                    .build()
919                })?,
920            None => DEFAULT_GRANULARITY,
921        };
922
923        // Parse false positive rate with default value 100
924        let false_positive_rate =
925            match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE) {
926                Some(value) => value
927                    .parse::<f64>()
928                    .ok()
929                    .filter(|&v| v > 0.0 && v <= 1.0)
930                    .ok_or_else(|| {
931                        error::InvalidSkippingIndexOptionSnafu {
932                            msg: format!(
933                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
934                            ),
935                        }
936                        .build()
937                    })?,
938                None => DEFAULT_FALSE_POSITIVE_RATE,
939            };
940
941        // Parse index type with default value BloomFilter
942        let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
943            Some(typ) => match typ.to_ascii_uppercase().as_str() {
944                "BLOOM" => SkippingIndexType::BloomFilter,
945                _ => {
946                    return error::InvalidSkippingIndexOptionSnafu {
947                        msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
948                    }
949                    .fail();
950                }
951            },
952            None => SkippingIndexType::default(),
953        };
954
955        Ok(SkippingIndexOptions::new_unchecked(
956            granularity,
957            false_positive_rate,
958            index_type,
959        ))
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use std::sync::Arc;
966
967    use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
968
969    use super::*;
970    use crate::value::Value;
971    use crate::vectors::Int32Vector;
972
973    #[test]
974    fn test_column_schema() {
975        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
976        let field = Field::try_from(&column_schema).unwrap();
977        assert_eq!("test", field.name());
978        assert_eq!(ArrowDataType::Int32, *field.data_type());
979        assert!(field.is_nullable());
980
981        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
982        assert_eq!(column_schema, new_column_schema);
983    }
984
985    #[test]
986    fn test_column_schema_with_default_constraint() {
987        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
988            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
989            .unwrap();
990        assert!(
991            column_schema
992                .metadata()
993                .get(DEFAULT_CONSTRAINT_KEY)
994                .is_none()
995        );
996
997        let field = Field::try_from(&column_schema).unwrap();
998        assert_eq!("test", field.name());
999        assert_eq!(ArrowDataType::Int32, *field.data_type());
1000        assert!(field.is_nullable());
1001        assert_eq!(
1002            "{\"Value\":{\"Int32\":99}}",
1003            field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
1004        );
1005
1006        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
1007        assert_eq!(column_schema, new_column_schema);
1008    }
1009
1010    #[test]
1011    fn test_column_schema_with_metadata() {
1012        let metadata = Metadata::from([
1013            ("k1".to_string(), "v1".to_string()),
1014            (COMMENT_KEY.to_string(), "test comment".to_string()),
1015        ]);
1016        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1017            .with_metadata(metadata)
1018            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1019            .unwrap();
1020        assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
1021        assert_eq!("test comment", column_schema.column_comment().unwrap());
1022        assert!(
1023            column_schema
1024                .metadata()
1025                .get(DEFAULT_CONSTRAINT_KEY)
1026                .is_none()
1027        );
1028
1029        let field = Field::try_from(&column_schema).unwrap();
1030        assert_eq!("v1", field.metadata().get("k1").unwrap());
1031        let _ = field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap();
1032
1033        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
1034        assert_eq!(column_schema, new_column_schema);
1035    }
1036
1037    #[test]
1038    fn test_column_schema_with_duplicate_metadata() {
1039        let metadata = Metadata::from([(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string())]);
1040        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1041            .with_metadata(metadata)
1042            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1043            .unwrap();
1044        assert!(Field::try_from(&column_schema).is_err());
1045    }
1046
1047    #[test]
1048    fn test_column_schema_invalid_default_constraint() {
1049        assert!(
1050            ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
1051                .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1052                .is_err()
1053        );
1054    }
1055
1056    #[test]
1057    fn test_column_default_constraint_try_into_from() {
1058        let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
1059
1060        let bytes: Vec<u8> = default_constraint.clone().try_into().unwrap();
1061        let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap();
1062
1063        assert_eq!(default_constraint, from_value);
1064    }
1065
1066    #[test]
1067    fn test_column_schema_create_default_null() {
1068        // Implicit default null.
1069        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1070        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1071        assert_eq!(5, v.len());
1072        assert!(v.only_null());
1073
1074        // Explicit default null.
1075        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1076            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1077            .unwrap();
1078        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1079        assert_eq!(5, v.len());
1080        assert!(v.only_null());
1081    }
1082
1083    #[test]
1084    fn test_column_schema_no_default() {
1085        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1086        assert!(column_schema.create_default_vector(5).unwrap().is_none());
1087    }
1088
1089    #[test]
1090    fn test_create_default_vector_for_padding() {
1091        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1092        let vector = column_schema.create_default_vector_for_padding(4);
1093        assert!(vector.only_null());
1094        assert_eq!(4, vector.len());
1095
1096        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1097        let vector = column_schema.create_default_vector_for_padding(4);
1098        assert_eq!(4, vector.len());
1099        let expect: VectorRef = Arc::new(Int32Vector::from_slice([0, 0, 0, 0]));
1100        assert_eq!(expect, vector);
1101    }
1102
1103    #[test]
1104    fn test_column_schema_single_create_default_null() {
1105        // Implicit default null.
1106        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1107        let v = column_schema.create_default().unwrap().unwrap();
1108        assert!(v.is_null());
1109
1110        // Explicit default null.
1111        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1112            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1113            .unwrap();
1114        let v = column_schema.create_default().unwrap().unwrap();
1115        assert!(v.is_null());
1116    }
1117
1118    #[test]
1119    fn test_column_schema_single_create_default_not_null() {
1120        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1121            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(6))))
1122            .unwrap();
1123        let v = column_schema.create_default().unwrap().unwrap();
1124        assert_eq!(v, Value::Int32(6));
1125    }
1126
1127    #[test]
1128    fn test_column_schema_single_no_default() {
1129        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1130        assert!(column_schema.create_default().unwrap().is_none());
1131    }
1132
1133    #[test]
1134    fn test_debug_for_column_schema() {
1135        let column_schema_int8 =
1136            ColumnSchema::new("test_column_1", ConcreteDataType::int8_datatype(), true);
1137
1138        let column_schema_int32 =
1139            ColumnSchema::new("test_column_2", ConcreteDataType::int32_datatype(), false);
1140
1141        let formatted_int8 = format!("{:?}", column_schema_int8);
1142        let formatted_int32 = format!("{:?}", column_schema_int32);
1143        assert_eq!(formatted_int8, "test_column_1 Int8 null");
1144        assert_eq!(formatted_int32, "test_column_2 Int32 not null");
1145    }
1146
1147    #[test]
1148    fn test_from_field_to_column_schema() {
1149        let field = Field::new("test", ArrowDataType::Int32, true);
1150        let column_schema = ColumnSchema::try_from(&field).unwrap();
1151        assert_eq!("test", column_schema.name);
1152        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1153        assert!(column_schema.is_nullable);
1154        assert!(!column_schema.is_time_index);
1155        assert!(column_schema.default_constraint.is_none());
1156        assert!(column_schema.metadata.is_empty());
1157
1158        let field = Field::new("test", ArrowDataType::Binary, true);
1159        let field = field.with_metadata(Metadata::from([(
1160            TYPE_KEY.to_string(),
1161            ConcreteDataType::json_datatype().name(),
1162        )]));
1163        let column_schema = ColumnSchema::try_from(&field).unwrap();
1164        assert_eq!("test", column_schema.name);
1165        assert_eq!(ConcreteDataType::json_datatype(), column_schema.data_type);
1166        assert!(column_schema.is_nullable);
1167        assert!(!column_schema.is_time_index);
1168        assert!(column_schema.default_constraint.is_none());
1169        assert_eq!(
1170            column_schema.metadata.get(TYPE_KEY).unwrap(),
1171            &ConcreteDataType::json_datatype().name()
1172        );
1173
1174        let field = Field::new("test", ArrowDataType::Binary, true);
1175        let field = field.with_metadata(Metadata::from([(
1176            TYPE_KEY.to_string(),
1177            ConcreteDataType::vector_datatype(3).name(),
1178        )]));
1179        let column_schema = ColumnSchema::try_from(&field).unwrap();
1180        assert_eq!("test", column_schema.name);
1181        assert_eq!(
1182            ConcreteDataType::vector_datatype(3),
1183            column_schema.data_type
1184        );
1185        assert!(column_schema.is_nullable);
1186        assert!(!column_schema.is_time_index);
1187        assert!(column_schema.default_constraint.is_none());
1188        assert_eq!(
1189            column_schema.metadata.get(TYPE_KEY).unwrap(),
1190            &ConcreteDataType::vector_datatype(3).name()
1191        );
1192    }
1193
1194    #[test]
1195    fn test_column_schema_fix_time_index() {
1196        let field = Field::new(
1197            "test",
1198            ArrowDataType::Timestamp(TimeUnit::Second, None),
1199            false,
1200        );
1201        let field = field.with_metadata(Metadata::from([(
1202            TIME_INDEX_KEY.to_string(),
1203            "true".to_string(),
1204        )]));
1205        let column_schema = ColumnSchema::try_from(&field).unwrap();
1206        assert_eq!("test", column_schema.name);
1207        assert_eq!(
1208            ConcreteDataType::timestamp_second_datatype(),
1209            column_schema.data_type
1210        );
1211        assert!(!column_schema.is_nullable);
1212        assert!(column_schema.is_time_index);
1213        assert!(column_schema.default_constraint.is_none());
1214        assert_eq!(1, column_schema.metadata().len());
1215
1216        let field = Field::new("test", ArrowDataType::Int32, false);
1217        let field = field.with_metadata(Metadata::from([(
1218            TIME_INDEX_KEY.to_string(),
1219            "true".to_string(),
1220        )]));
1221        let column_schema = ColumnSchema::try_from(&field).unwrap();
1222        assert_eq!("test", column_schema.name);
1223        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1224        assert!(!column_schema.is_nullable);
1225        assert!(!column_schema.is_time_index);
1226        assert!(column_schema.default_constraint.is_none());
1227        assert!(column_schema.metadata.is_empty());
1228    }
1229
1230    #[test]
1231    fn test_skipping_index_options_deserialization() {
1232        let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
1233        let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
1234        assert_eq!(1024, options.granularity);
1235        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1236        assert_eq!(0.001, options.false_positive_rate());
1237
1238        let options_str = serde_json::to_string(&options).unwrap();
1239        assert_eq!(options_str, original_options);
1240    }
1241
1242    #[test]
1243    fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
1244        let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
1245        let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
1246        assert_eq!(10240, options.granularity);
1247        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1248        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1249
1250        let options_str = serde_json::to_string(&options).unwrap();
1251        assert_eq!(
1252            options_str,
1253            "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}"
1254        );
1255    }
1256
1257    #[test]
1258    fn test_fulltext_options_deserialization() {
1259        let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
1260        let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
1261        assert!(!options.case_sensitive);
1262        assert!(options.enable);
1263        assert_eq!(FulltextBackend::Bloom, options.backend);
1264        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1265        assert_eq!(1024, options.granularity);
1266        assert_eq!(0.001, options.false_positive_rate());
1267
1268        let options_str = serde_json::to_string(&options).unwrap();
1269        assert_eq!(options_str, original_options);
1270    }
1271
1272    #[test]
1273    fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
1274        // 0.14 to 0.15
1275        let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
1276        let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
1277        assert!(!options.case_sensitive);
1278        assert!(options.enable);
1279        assert_eq!(FulltextBackend::Bloom, options.backend);
1280        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1281        assert_eq!(DEFAULT_GRANULARITY, options.granularity);
1282        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1283
1284        let options_str = serde_json::to_string(&options).unwrap();
1285        assert_eq!(
1286            options_str,
1287            "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}"
1288        );
1289    }
1290}