datatypes/schema/
column_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18
19use arrow::datatypes::Field;
20use serde::{Deserialize, Serialize};
21use snafu::{ensure, ResultExt};
22use sqlparser_derive::{Visit, VisitMut};
23
24use crate::data_type::{ConcreteDataType, DataType};
25use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result};
26use crate::schema::constraint::ColumnDefaultConstraint;
27use crate::schema::TYPE_KEY;
28use crate::value::Value;
29use crate::vectors::VectorRef;
30
31pub type Metadata = HashMap<String, String>;
32
33/// Key used to store whether the column is time index in arrow field's metadata.
34pub const TIME_INDEX_KEY: &str = "greptime:time_index";
35pub const COMMENT_KEY: &str = "greptime:storage:comment";
36/// Key used to store default constraint in arrow field's metadata.
37const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
38/// Key used to store fulltext options in arrow field's metadata.
39pub const FULLTEXT_KEY: &str = "greptime:fulltext";
40/// Key used to store whether the column has inverted index in arrow field's metadata.
41pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
42/// Key used to store skip options in arrow field's metadata.
43pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
44
45/// Keys used in fulltext options
46pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
47pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
48pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
49pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
50pub const COLUMN_FULLTEXT_OPT_KEY_GRANULARITY: &str = "granularity";
51pub const COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
52
53/// Keys used in SKIPPING index options
54pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
55pub const COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
56pub const COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE: &str = "type";
57
58pub const DEFAULT_GRANULARITY: u32 = 10240;
59
60pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
61
62/// Schema of a column, used as an immutable struct.
63#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct ColumnSchema {
65    pub name: String,
66    pub data_type: ConcreteDataType,
67    is_nullable: bool,
68    is_time_index: bool,
69    default_constraint: Option<ColumnDefaultConstraint>,
70    metadata: Metadata,
71}
72
73impl fmt::Debug for ColumnSchema {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        write!(
76            f,
77            "{} {} {}",
78            self.name,
79            self.data_type,
80            if self.is_nullable { "null" } else { "not null" },
81        )?;
82
83        if self.is_time_index {
84            write!(f, " time_index")?;
85        }
86
87        // Add default constraint if present
88        if let Some(default_constraint) = &self.default_constraint {
89            write!(f, " default={:?}", default_constraint)?;
90        }
91
92        // Add metadata if present
93        if !self.metadata.is_empty() {
94            write!(f, " metadata={:?}", self.metadata)?;
95        }
96
97        Ok(())
98    }
99}
100
101impl ColumnSchema {
102    pub fn new<T: Into<String>>(
103        name: T,
104        data_type: ConcreteDataType,
105        is_nullable: bool,
106    ) -> ColumnSchema {
107        ColumnSchema {
108            name: name.into(),
109            data_type,
110            is_nullable,
111            is_time_index: false,
112            default_constraint: None,
113            metadata: Metadata::new(),
114        }
115    }
116
117    #[inline]
118    pub fn is_time_index(&self) -> bool {
119        self.is_time_index
120    }
121
122    #[inline]
123    pub fn is_nullable(&self) -> bool {
124        self.is_nullable
125    }
126
127    #[inline]
128    pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
129        self.default_constraint.as_ref()
130    }
131
132    /// Check if the default constraint is a impure function.
133    pub fn is_default_impure(&self) -> bool {
134        self.default_constraint
135            .as_ref()
136            .map(|c| c.is_function())
137            .unwrap_or(false)
138    }
139
140    #[inline]
141    pub fn metadata(&self) -> &Metadata {
142        &self.metadata
143    }
144
145    #[inline]
146    pub fn mut_metadata(&mut self) -> &mut Metadata {
147        &mut self.metadata
148    }
149
150    /// Retrieve the column comment
151    pub fn column_comment(&self) -> Option<&String> {
152        self.metadata.get(COMMENT_KEY)
153    }
154
155    pub fn with_time_index(mut self, is_time_index: bool) -> Self {
156        self.is_time_index = is_time_index;
157        if is_time_index {
158            let _ = self
159                .metadata
160                .insert(TIME_INDEX_KEY.to_string(), "true".to_string());
161        } else {
162            let _ = self.metadata.remove(TIME_INDEX_KEY);
163        }
164        self
165    }
166
167    /// Set the inverted index for the column.
168    /// Similar to [with_inverted_index] but don't take the ownership.
169    ///
170    /// [with_inverted_index]: Self::with_inverted_index
171    pub fn set_inverted_index(&mut self, value: bool) {
172        match value {
173            true => {
174                self.metadata
175                    .insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
176            }
177            false => {
178                self.metadata.remove(INVERTED_INDEX_KEY);
179            }
180        }
181    }
182
183    /// Set the inverted index for the column.
184    /// Similar to [set_inverted_index] but take the ownership and return a owned value.
185    ///
186    /// [set_inverted_index]: Self::set_inverted_index
187    pub fn with_inverted_index(mut self, value: bool) -> Self {
188        self.set_inverted_index(value);
189        self
190    }
191
192    pub fn is_inverted_indexed(&self) -> bool {
193        self.metadata
194            .get(INVERTED_INDEX_KEY)
195            .map(|v| v.eq_ignore_ascii_case("true"))
196            .unwrap_or(false)
197    }
198
199    pub fn is_fulltext_indexed(&self) -> bool {
200        self.fulltext_options()
201            .unwrap_or_default()
202            .map(|option| option.enable)
203            .unwrap_or_default()
204    }
205
206    pub fn is_skipping_indexed(&self) -> bool {
207        self.skipping_index_options().unwrap_or_default().is_some()
208    }
209
210    pub fn has_inverted_index_key(&self) -> bool {
211        self.metadata.contains_key(INVERTED_INDEX_KEY)
212    }
213
214    /// Set default constraint.
215    ///
216    /// If a default constraint exists for the column, this method will
217    /// validate it against the column's data type and nullability.
218    pub fn with_default_constraint(
219        mut self,
220        default_constraint: Option<ColumnDefaultConstraint>,
221    ) -> Result<Self> {
222        if let Some(constraint) = &default_constraint {
223            constraint.validate(&self.data_type, self.is_nullable)?;
224        }
225
226        self.default_constraint = default_constraint;
227        Ok(self)
228    }
229
230    /// Set the nullablity to `true` of the column.
231    /// Similar to [set_nullable] but take the ownership and return a owned value.
232    ///
233    /// [set_nullable]: Self::set_nullable
234    pub fn with_nullable_set(mut self) -> Self {
235        self.is_nullable = true;
236        self
237    }
238
239    /// Set the nullability to `true` of the column.
240    /// Similar to [with_nullable_set] but don't take the ownership
241    ///
242    /// [with_nullable_set]: Self::with_nullable_set
243    pub fn set_nullable(&mut self) {
244        self.is_nullable = true;
245    }
246
247    /// Set the `is_time_index` to `true` of the column.
248    /// Similar to [with_time_index] but don't take the ownership.
249    ///
250    /// [with_time_index]: Self::with_time_index
251    pub fn set_time_index(&mut self) {
252        self.is_time_index = true;
253    }
254
255    /// Creates a new [`ColumnSchema`] with given metadata.
256    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
257        self.metadata = metadata;
258        self
259    }
260
261    /// Creates a vector with default value for this column.
262    ///
263    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
264    pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
265        match &self.default_constraint {
266            Some(c) => c
267                .create_default_vector(&self.data_type, self.is_nullable, num_rows)
268                .map(Some),
269            None => {
270                if self.is_nullable {
271                    // No default constraint, use null as default value.
272                    // TODO(yingwen): Use NullVector once it supports setting logical type.
273                    ColumnDefaultConstraint::null_value()
274                        .create_default_vector(&self.data_type, self.is_nullable, num_rows)
275                        .map(Some)
276                } else {
277                    Ok(None)
278                }
279            }
280        }
281    }
282
283    /// Creates a vector for padding.
284    ///
285    /// This method always returns a vector since it uses [DataType::default_value]
286    /// to fill the vector. Callers should only use the created vector for padding
287    /// and never read its content.
288    pub fn create_default_vector_for_padding(&self, num_rows: usize) -> VectorRef {
289        let padding_value = if self.is_nullable {
290            Value::Null
291        } else {
292            // If the column is not null, use the data type's default value as it is
293            // more efficient to acquire.
294            self.data_type.default_value()
295        };
296        let value_ref = padding_value.as_value_ref();
297        let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
298        for _ in 0..num_rows {
299            mutable_vector.push_value_ref(value_ref);
300        }
301        mutable_vector.to_vector()
302    }
303
304    /// Creates a default value for this column.
305    ///
306    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
307    pub fn create_default(&self) -> Result<Option<Value>> {
308        match &self.default_constraint {
309            Some(c) => c
310                .create_default(&self.data_type, self.is_nullable)
311                .map(Some),
312            None => {
313                if self.is_nullable {
314                    // No default constraint, use null as default value.
315                    ColumnDefaultConstraint::null_value()
316                        .create_default(&self.data_type, self.is_nullable)
317                        .map(Some)
318                } else {
319                    Ok(None)
320                }
321            }
322        }
323    }
324
325    /// Creates an impure default value for this column, only if it have a impure default constraint.
326    /// Otherwise, returns `Ok(None)`.
327    pub fn create_impure_default(&self) -> Result<Option<Value>> {
328        match &self.default_constraint {
329            Some(c) => c.create_impure_default(&self.data_type),
330            None => Ok(None),
331        }
332    }
333
334    /// Retrieves the fulltext options for the column.
335    pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
336        match self.metadata.get(FULLTEXT_KEY) {
337            None => Ok(None),
338            Some(json) => {
339                let options =
340                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
341                Ok(Some(options))
342            }
343        }
344    }
345
346    pub fn with_fulltext_options(mut self, options: FulltextOptions) -> Result<Self> {
347        self.metadata.insert(
348            FULLTEXT_KEY.to_string(),
349            serde_json::to_string(&options).context(error::SerializeSnafu)?,
350        );
351        Ok(self)
352    }
353
354    pub fn set_fulltext_options(&mut self, options: &FulltextOptions) -> Result<()> {
355        self.metadata.insert(
356            FULLTEXT_KEY.to_string(),
357            serde_json::to_string(options).context(error::SerializeSnafu)?,
358        );
359        Ok(())
360    }
361
362    /// Retrieves the skipping index options for the column.
363    pub fn skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
364        match self.metadata.get(SKIPPING_INDEX_KEY) {
365            None => Ok(None),
366            Some(json) => {
367                let options =
368                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
369                Ok(Some(options))
370            }
371        }
372    }
373
374    pub fn with_skipping_options(mut self, options: SkippingIndexOptions) -> Result<Self> {
375        self.metadata.insert(
376            SKIPPING_INDEX_KEY.to_string(),
377            serde_json::to_string(&options).context(error::SerializeSnafu)?,
378        );
379        Ok(self)
380    }
381
382    pub fn set_skipping_options(&mut self, options: &SkippingIndexOptions) -> Result<()> {
383        self.metadata.insert(
384            SKIPPING_INDEX_KEY.to_string(),
385            serde_json::to_string(options).context(error::SerializeSnafu)?,
386        );
387        Ok(())
388    }
389
390    pub fn unset_skipping_options(&mut self) -> Result<()> {
391        self.metadata.remove(SKIPPING_INDEX_KEY);
392        Ok(())
393    }
394}
395
396/// Column extended type set in column schema's metadata.
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub enum ColumnExtType {
399    /// Json type.
400    Json,
401
402    /// Vector type with dimension.
403    Vector(u32),
404}
405
406impl fmt::Display for ColumnExtType {
407    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
408        match self {
409            ColumnExtType::Json => write!(f, "Json"),
410            ColumnExtType::Vector(dim) => write!(f, "Vector({})", dim),
411        }
412    }
413}
414
415impl FromStr for ColumnExtType {
416    type Err = String;
417
418    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
419        match s {
420            "Json" => Ok(ColumnExtType::Json),
421            _ if s.starts_with("Vector(") && s.ends_with(')') => s[7..s.len() - 1]
422                .parse::<u32>()
423                .map(ColumnExtType::Vector)
424                .map_err(|_| "Invalid dimension for Vector".to_string()),
425            _ => Err("Unknown variant".to_string()),
426        }
427    }
428}
429
430impl TryFrom<&Field> for ColumnSchema {
431    type Error = Error;
432
433    fn try_from(field: &Field) -> Result<ColumnSchema> {
434        let mut data_type = ConcreteDataType::try_from(field.data_type())?;
435        // Override the data type if it is specified in the metadata.
436        if let Some(s) = field.metadata().get(TYPE_KEY) {
437            let extype = ColumnExtType::from_str(s)
438                .map_err(|_| ParseExtendedTypeSnafu { value: s }.build())?;
439            match extype {
440                ColumnExtType::Json => {
441                    data_type = ConcreteDataType::json_datatype();
442                }
443                ColumnExtType::Vector(dim) => {
444                    data_type = ConcreteDataType::vector_datatype(dim);
445                }
446            }
447        }
448        let mut metadata = field.metadata().clone();
449        let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
450            Some(json) => {
451                Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
452            }
453            None => None,
454        };
455        let mut is_time_index = metadata.contains_key(TIME_INDEX_KEY);
456        if is_time_index && !data_type.is_timestamp() {
457            // If the column is time index but the data type is not timestamp, it is invalid.
458            // We set the time index to false and remove the metadata.
459            // This is possible if we cast the time index column to another type. DataFusion will
460            // keep the metadata:
461            // https://github.com/apache/datafusion/pull/12951
462            is_time_index = false;
463            metadata.remove(TIME_INDEX_KEY);
464            common_telemetry::debug!(
465                "Column {} is not timestamp ({:?}) but has time index metadata",
466                data_type,
467                field.name(),
468            );
469        }
470
471        Ok(ColumnSchema {
472            name: field.name().clone(),
473            data_type,
474            is_nullable: field.is_nullable(),
475            is_time_index,
476            default_constraint,
477            metadata,
478        })
479    }
480}
481
482impl TryFrom<&ColumnSchema> for Field {
483    type Error = Error;
484
485    fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
486        let mut metadata = column_schema.metadata.clone();
487        if let Some(value) = &column_schema.default_constraint {
488            // Adds an additional metadata to store the default constraint.
489            let old = metadata.insert(
490                DEFAULT_CONSTRAINT_KEY.to_string(),
491                serde_json::to_string(&value).context(error::SerializeSnafu)?,
492            );
493
494            ensure!(
495                old.is_none(),
496                error::DuplicateMetaSnafu {
497                    key: DEFAULT_CONSTRAINT_KEY,
498                }
499            );
500        }
501
502        Ok(Field::new(
503            &column_schema.name,
504            column_schema.data_type.as_arrow_type(),
505            column_schema.is_nullable(),
506        )
507        .with_metadata(metadata))
508    }
509}
510
511/// Fulltext options for a column.
512#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
513#[serde(rename_all = "kebab-case")]
514pub struct FulltextOptions {
515    /// Whether the fulltext index is enabled.
516    pub enable: bool,
517    /// The fulltext analyzer to use.
518    #[serde(default)]
519    pub analyzer: FulltextAnalyzer,
520    /// Whether the fulltext index is case-sensitive.
521    #[serde(default)]
522    pub case_sensitive: bool,
523    /// The fulltext backend to use.
524    #[serde(default)]
525    pub backend: FulltextBackend,
526    /// The granularity of the fulltext index (for bloom backend only)
527    #[serde(default = "fulltext_options_default_granularity")]
528    pub granularity: u32,
529    /// The false positive rate of the fulltext index (for bloom backend only)
530    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
531    pub false_positive_rate_in_10000: u32,
532}
533
534fn fulltext_options_default_granularity() -> u32 {
535    DEFAULT_GRANULARITY
536}
537
538fn index_options_default_false_positive_rate_in_10000() -> u32 {
539    (DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
540}
541
542impl FulltextOptions {
543    /// Creates a new fulltext options.
544    pub fn new(
545        enable: bool,
546        analyzer: FulltextAnalyzer,
547        case_sensitive: bool,
548        backend: FulltextBackend,
549        granularity: u32,
550        false_positive_rate: f64,
551    ) -> Result<Self> {
552        ensure!(
553            0.0 < false_positive_rate && false_positive_rate <= 1.0,
554            error::InvalidFulltextOptionSnafu {
555                msg: format!(
556                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
557                ),
558            }
559        );
560        ensure!(
561            granularity > 0,
562            error::InvalidFulltextOptionSnafu {
563                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
564            }
565        );
566        Ok(Self::new_unchecked(
567            enable,
568            analyzer,
569            case_sensitive,
570            backend,
571            granularity,
572            false_positive_rate,
573        ))
574    }
575
576    /// Creates a new fulltext options without checking `false_positive_rate` and `granularity`.
577    pub fn new_unchecked(
578        enable: bool,
579        analyzer: FulltextAnalyzer,
580        case_sensitive: bool,
581        backend: FulltextBackend,
582        granularity: u32,
583        false_positive_rate: f64,
584    ) -> Self {
585        Self {
586            enable,
587            analyzer,
588            case_sensitive,
589            backend,
590            granularity,
591            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
592        }
593    }
594
595    /// Gets the false positive rate.
596    pub fn false_positive_rate(&self) -> f64 {
597        self.false_positive_rate_in_10000 as f64 / 10000.0
598    }
599}
600
601impl Default for FulltextOptions {
602    fn default() -> Self {
603        Self::new_unchecked(
604            false,
605            FulltextAnalyzer::default(),
606            false,
607            FulltextBackend::default(),
608            DEFAULT_GRANULARITY,
609            DEFAULT_FALSE_POSITIVE_RATE,
610        )
611    }
612}
613
614impl fmt::Display for FulltextOptions {
615    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616        write!(f, "enable={}", self.enable)?;
617        if self.enable {
618            write!(f, ", analyzer={}", self.analyzer)?;
619            write!(f, ", case_sensitive={}", self.case_sensitive)?;
620            write!(f, ", backend={}", self.backend)?;
621            if self.backend == FulltextBackend::Bloom {
622                write!(f, ", granularity={}", self.granularity)?;
623                write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
624            }
625        }
626        Ok(())
627    }
628}
629
630/// The backend of the fulltext index.
631#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
632#[serde(rename_all = "kebab-case")]
633pub enum FulltextBackend {
634    #[default]
635    Bloom,
636    Tantivy,
637}
638
639impl fmt::Display for FulltextBackend {
640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641        match self {
642            FulltextBackend::Tantivy => write!(f, "tantivy"),
643            FulltextBackend::Bloom => write!(f, "bloom"),
644        }
645    }
646}
647
648impl TryFrom<HashMap<String, String>> for FulltextOptions {
649    type Error = Error;
650
651    fn try_from(options: HashMap<String, String>) -> Result<Self> {
652        let mut fulltext_options = FulltextOptions {
653            enable: true,
654            ..Default::default()
655        };
656
657        if let Some(enable) = options.get(COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE) {
658            match enable.to_ascii_lowercase().as_str() {
659                "true" => fulltext_options.enable = true,
660                "false" => fulltext_options.enable = false,
661                _ => {
662                    return InvalidFulltextOptionSnafu {
663                        msg: format!("{enable}, expected: 'true' | 'false'"),
664                    }
665                    .fail();
666                }
667            }
668        };
669
670        if let Some(analyzer) = options.get(COLUMN_FULLTEXT_OPT_KEY_ANALYZER) {
671            match analyzer.to_ascii_lowercase().as_str() {
672                "english" => fulltext_options.analyzer = FulltextAnalyzer::English,
673                "chinese" => fulltext_options.analyzer = FulltextAnalyzer::Chinese,
674                _ => {
675                    return InvalidFulltextOptionSnafu {
676                        msg: format!("{analyzer}, expected: 'English' | 'Chinese'"),
677                    }
678                    .fail();
679                }
680            }
681        };
682
683        if let Some(case_sensitive) = options.get(COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE) {
684            match case_sensitive.to_ascii_lowercase().as_str() {
685                "true" => fulltext_options.case_sensitive = true,
686                "false" => fulltext_options.case_sensitive = false,
687                _ => {
688                    return InvalidFulltextOptionSnafu {
689                        msg: format!("{case_sensitive}, expected: 'true' | 'false'"),
690                    }
691                    .fail();
692                }
693            }
694        }
695
696        if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
697            match backend.to_ascii_lowercase().as_str() {
698                "bloom" => fulltext_options.backend = FulltextBackend::Bloom,
699                "tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
700                _ => {
701                    return InvalidFulltextOptionSnafu {
702                        msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
703                    }
704                    .fail();
705                }
706            }
707        }
708
709        if fulltext_options.backend == FulltextBackend::Bloom {
710            // Parse granularity with default value 10240
711            let granularity = match options.get(COLUMN_FULLTEXT_OPT_KEY_GRANULARITY) {
712                Some(value) => value
713                    .parse::<u32>()
714                    .ok()
715                    .filter(|&v| v > 0)
716                    .ok_or_else(|| {
717                        error::InvalidFulltextOptionSnafu {
718                            msg: format!(
719                                "Invalid granularity: {value}, expected: positive integer"
720                            ),
721                        }
722                        .build()
723                    })?,
724                None => DEFAULT_GRANULARITY,
725            };
726            fulltext_options.granularity = granularity;
727
728            // Parse false positive rate with default value 0.01
729            let false_positive_rate = match options.get(COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE)
730            {
731                Some(value) => value
732                    .parse::<f64>()
733                    .ok()
734                    .filter(|&v| v > 0.0 && v <= 1.0)
735                    .ok_or_else(|| {
736                        error::InvalidFulltextOptionSnafu {
737                            msg: format!(
738                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
739                            ),
740                        }
741                        .build()
742                    })?,
743                None => DEFAULT_FALSE_POSITIVE_RATE,
744            };
745            fulltext_options.false_positive_rate_in_10000 = (false_positive_rate * 10000.0) as u32;
746        }
747
748        Ok(fulltext_options)
749    }
750}
751
752/// Fulltext analyzer.
753#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
754pub enum FulltextAnalyzer {
755    #[default]
756    English,
757    Chinese,
758}
759
760impl fmt::Display for FulltextAnalyzer {
761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
762        match self {
763            FulltextAnalyzer::English => write!(f, "English"),
764            FulltextAnalyzer::Chinese => write!(f, "Chinese"),
765        }
766    }
767}
768
769/// Skipping options for a column.
770#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
771#[serde(rename_all = "kebab-case")]
772pub struct SkippingIndexOptions {
773    /// The granularity of the skip index.
774    pub granularity: u32,
775    /// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
776    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
777    pub false_positive_rate_in_10000: u32,
778    /// The type of the skip index.
779    #[serde(default)]
780    pub index_type: SkippingIndexType,
781}
782
783impl SkippingIndexOptions {
784    /// Creates a new skipping index options without checking `false_positive_rate` and `granularity`.
785    pub fn new_unchecked(
786        granularity: u32,
787        false_positive_rate: f64,
788        index_type: SkippingIndexType,
789    ) -> Self {
790        Self {
791            granularity,
792            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
793            index_type,
794        }
795    }
796
797    /// Creates a new skipping index options.
798    pub fn new(
799        granularity: u32,
800        false_positive_rate: f64,
801        index_type: SkippingIndexType,
802    ) -> Result<Self> {
803        ensure!(
804            0.0 < false_positive_rate && false_positive_rate <= 1.0,
805            error::InvalidSkippingIndexOptionSnafu {
806                msg: format!("Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"),
807            }
808        );
809        ensure!(
810            granularity > 0,
811            error::InvalidSkippingIndexOptionSnafu {
812                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
813            }
814        );
815        Ok(Self::new_unchecked(
816            granularity,
817            false_positive_rate,
818            index_type,
819        ))
820    }
821
822    /// Gets the false positive rate.
823    pub fn false_positive_rate(&self) -> f64 {
824        self.false_positive_rate_in_10000 as f64 / 10000.0
825    }
826}
827
828impl Default for SkippingIndexOptions {
829    fn default() -> Self {
830        Self::new_unchecked(
831            DEFAULT_GRANULARITY,
832            DEFAULT_FALSE_POSITIVE_RATE,
833            SkippingIndexType::default(),
834        )
835    }
836}
837
838impl fmt::Display for SkippingIndexOptions {
839    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840        write!(f, "granularity={}", self.granularity)?;
841        write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
842        write!(f, ", index_type={}", self.index_type)?;
843        Ok(())
844    }
845}
846
847/// Skip index types.
848#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
849pub enum SkippingIndexType {
850    #[default]
851    BloomFilter,
852}
853
854impl fmt::Display for SkippingIndexType {
855    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
856        match self {
857            SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
858        }
859    }
860}
861
862impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
863    type Error = Error;
864
865    fn try_from(options: HashMap<String, String>) -> Result<Self> {
866        // Parse granularity with default value 1
867        let granularity = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY) {
868            Some(value) => value
869                .parse::<u32>()
870                .ok()
871                .filter(|&v| v > 0)
872                .ok_or_else(|| {
873                    error::InvalidSkippingIndexOptionSnafu {
874                        msg: format!("Invalid granularity: {value}, expected: positive integer"),
875                    }
876                    .build()
877                })?,
878            None => DEFAULT_GRANULARITY,
879        };
880
881        // Parse false positive rate with default value 100
882        let false_positive_rate =
883            match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE) {
884                Some(value) => value
885                    .parse::<f64>()
886                    .ok()
887                    .filter(|&v| v > 0.0 && v <= 1.0)
888                    .ok_or_else(|| {
889                        error::InvalidSkippingIndexOptionSnafu {
890                            msg: format!(
891                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
892                            ),
893                        }
894                        .build()
895                    })?,
896                None => DEFAULT_FALSE_POSITIVE_RATE,
897            };
898
899        // Parse index type with default value BloomFilter
900        let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
901            Some(typ) => match typ.to_ascii_uppercase().as_str() {
902                "BLOOM" => SkippingIndexType::BloomFilter,
903                _ => {
904                    return error::InvalidSkippingIndexOptionSnafu {
905                        msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
906                    }
907                    .fail();
908                }
909            },
910            None => SkippingIndexType::default(),
911        };
912
913        Ok(SkippingIndexOptions::new_unchecked(
914            granularity,
915            false_positive_rate,
916            index_type,
917        ))
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use std::sync::Arc;
924
925    use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
926
927    use super::*;
928    use crate::value::Value;
929    use crate::vectors::Int32Vector;
930
931    #[test]
932    fn test_column_schema() {
933        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
934        let field = Field::try_from(&column_schema).unwrap();
935        assert_eq!("test", field.name());
936        assert_eq!(ArrowDataType::Int32, *field.data_type());
937        assert!(field.is_nullable());
938
939        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
940        assert_eq!(column_schema, new_column_schema);
941    }
942
943    #[test]
944    fn test_column_schema_with_default_constraint() {
945        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
946            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
947            .unwrap();
948        assert!(column_schema
949            .metadata()
950            .get(DEFAULT_CONSTRAINT_KEY)
951            .is_none());
952
953        let field = Field::try_from(&column_schema).unwrap();
954        assert_eq!("test", field.name());
955        assert_eq!(ArrowDataType::Int32, *field.data_type());
956        assert!(field.is_nullable());
957        assert_eq!(
958            "{\"Value\":{\"Int32\":99}}",
959            field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
960        );
961
962        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
963        assert_eq!(column_schema, new_column_schema);
964    }
965
966    #[test]
967    fn test_column_schema_with_metadata() {
968        let metadata = Metadata::from([
969            ("k1".to_string(), "v1".to_string()),
970            (COMMENT_KEY.to_string(), "test comment".to_string()),
971        ]);
972        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
973            .with_metadata(metadata)
974            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
975            .unwrap();
976        assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
977        assert_eq!("test comment", column_schema.column_comment().unwrap());
978        assert!(column_schema
979            .metadata()
980            .get(DEFAULT_CONSTRAINT_KEY)
981            .is_none());
982
983        let field = Field::try_from(&column_schema).unwrap();
984        assert_eq!("v1", field.metadata().get("k1").unwrap());
985        let _ = field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap();
986
987        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
988        assert_eq!(column_schema, new_column_schema);
989    }
990
991    #[test]
992    fn test_column_schema_with_duplicate_metadata() {
993        let metadata = Metadata::from([(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string())]);
994        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
995            .with_metadata(metadata)
996            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
997            .unwrap();
998        assert!(Field::try_from(&column_schema).is_err());
999    }
1000
1001    #[test]
1002    fn test_column_schema_invalid_default_constraint() {
1003        assert!(
1004            ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
1005                .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1006                .is_err()
1007        );
1008    }
1009
1010    #[test]
1011    fn test_column_default_constraint_try_into_from() {
1012        let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
1013
1014        let bytes: Vec<u8> = default_constraint.clone().try_into().unwrap();
1015        let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap();
1016
1017        assert_eq!(default_constraint, from_value);
1018    }
1019
1020    #[test]
1021    fn test_column_schema_create_default_null() {
1022        // Implicit default null.
1023        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1024        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1025        assert_eq!(5, v.len());
1026        assert!(v.only_null());
1027
1028        // Explicit default null.
1029        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1030            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1031            .unwrap();
1032        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1033        assert_eq!(5, v.len());
1034        assert!(v.only_null());
1035    }
1036
1037    #[test]
1038    fn test_column_schema_no_default() {
1039        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1040        assert!(column_schema.create_default_vector(5).unwrap().is_none());
1041    }
1042
1043    #[test]
1044    fn test_create_default_vector_for_padding() {
1045        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1046        let vector = column_schema.create_default_vector_for_padding(4);
1047        assert!(vector.only_null());
1048        assert_eq!(4, vector.len());
1049
1050        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1051        let vector = column_schema.create_default_vector_for_padding(4);
1052        assert_eq!(4, vector.len());
1053        let expect: VectorRef = Arc::new(Int32Vector::from_slice([0, 0, 0, 0]));
1054        assert_eq!(expect, vector);
1055    }
1056
1057    #[test]
1058    fn test_column_schema_single_create_default_null() {
1059        // Implicit default null.
1060        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1061        let v = column_schema.create_default().unwrap().unwrap();
1062        assert!(v.is_null());
1063
1064        // Explicit default null.
1065        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1066            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1067            .unwrap();
1068        let v = column_schema.create_default().unwrap().unwrap();
1069        assert!(v.is_null());
1070    }
1071
1072    #[test]
1073    fn test_column_schema_single_create_default_not_null() {
1074        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1075            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(6))))
1076            .unwrap();
1077        let v = column_schema.create_default().unwrap().unwrap();
1078        assert_eq!(v, Value::Int32(6));
1079    }
1080
1081    #[test]
1082    fn test_column_schema_single_no_default() {
1083        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1084        assert!(column_schema.create_default().unwrap().is_none());
1085    }
1086
1087    #[test]
1088    fn test_debug_for_column_schema() {
1089        let column_schema_int8 =
1090            ColumnSchema::new("test_column_1", ConcreteDataType::int8_datatype(), true);
1091
1092        let column_schema_int32 =
1093            ColumnSchema::new("test_column_2", ConcreteDataType::int32_datatype(), false);
1094
1095        let formatted_int8 = format!("{:?}", column_schema_int8);
1096        let formatted_int32 = format!("{:?}", column_schema_int32);
1097        assert_eq!(formatted_int8, "test_column_1 Int8 null");
1098        assert_eq!(formatted_int32, "test_column_2 Int32 not null");
1099    }
1100
1101    #[test]
1102    fn test_from_field_to_column_schema() {
1103        let field = Field::new("test", ArrowDataType::Int32, true);
1104        let column_schema = ColumnSchema::try_from(&field).unwrap();
1105        assert_eq!("test", column_schema.name);
1106        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1107        assert!(column_schema.is_nullable);
1108        assert!(!column_schema.is_time_index);
1109        assert!(column_schema.default_constraint.is_none());
1110        assert!(column_schema.metadata.is_empty());
1111
1112        let field = Field::new("test", ArrowDataType::Binary, true);
1113        let field = field.with_metadata(Metadata::from([(
1114            TYPE_KEY.to_string(),
1115            ConcreteDataType::json_datatype().name(),
1116        )]));
1117        let column_schema = ColumnSchema::try_from(&field).unwrap();
1118        assert_eq!("test", column_schema.name);
1119        assert_eq!(ConcreteDataType::json_datatype(), column_schema.data_type);
1120        assert!(column_schema.is_nullable);
1121        assert!(!column_schema.is_time_index);
1122        assert!(column_schema.default_constraint.is_none());
1123        assert_eq!(
1124            column_schema.metadata.get(TYPE_KEY).unwrap(),
1125            &ConcreteDataType::json_datatype().name()
1126        );
1127
1128        let field = Field::new("test", ArrowDataType::Binary, true);
1129        let field = field.with_metadata(Metadata::from([(
1130            TYPE_KEY.to_string(),
1131            ConcreteDataType::vector_datatype(3).name(),
1132        )]));
1133        let column_schema = ColumnSchema::try_from(&field).unwrap();
1134        assert_eq!("test", column_schema.name);
1135        assert_eq!(
1136            ConcreteDataType::vector_datatype(3),
1137            column_schema.data_type
1138        );
1139        assert!(column_schema.is_nullable);
1140        assert!(!column_schema.is_time_index);
1141        assert!(column_schema.default_constraint.is_none());
1142        assert_eq!(
1143            column_schema.metadata.get(TYPE_KEY).unwrap(),
1144            &ConcreteDataType::vector_datatype(3).name()
1145        );
1146    }
1147
1148    #[test]
1149    fn test_column_schema_fix_time_index() {
1150        let field = Field::new(
1151            "test",
1152            ArrowDataType::Timestamp(TimeUnit::Second, None),
1153            false,
1154        );
1155        let field = field.with_metadata(Metadata::from([(
1156            TIME_INDEX_KEY.to_string(),
1157            "true".to_string(),
1158        )]));
1159        let column_schema = ColumnSchema::try_from(&field).unwrap();
1160        assert_eq!("test", column_schema.name);
1161        assert_eq!(
1162            ConcreteDataType::timestamp_second_datatype(),
1163            column_schema.data_type
1164        );
1165        assert!(!column_schema.is_nullable);
1166        assert!(column_schema.is_time_index);
1167        assert!(column_schema.default_constraint.is_none());
1168        assert_eq!(1, column_schema.metadata().len());
1169
1170        let field = Field::new("test", ArrowDataType::Int32, false);
1171        let field = field.with_metadata(Metadata::from([(
1172            TIME_INDEX_KEY.to_string(),
1173            "true".to_string(),
1174        )]));
1175        let column_schema = ColumnSchema::try_from(&field).unwrap();
1176        assert_eq!("test", column_schema.name);
1177        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1178        assert!(!column_schema.is_nullable);
1179        assert!(!column_schema.is_time_index);
1180        assert!(column_schema.default_constraint.is_none());
1181        assert!(column_schema.metadata.is_empty());
1182    }
1183
1184    #[test]
1185    fn test_skipping_index_options_deserialization() {
1186        let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
1187        let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
1188        assert_eq!(1024, options.granularity);
1189        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1190        assert_eq!(0.001, options.false_positive_rate());
1191
1192        let options_str = serde_json::to_string(&options).unwrap();
1193        assert_eq!(options_str, original_options);
1194    }
1195
1196    #[test]
1197    fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
1198        let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
1199        let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
1200        assert_eq!(10240, options.granularity);
1201        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1202        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1203
1204        let options_str = serde_json::to_string(&options).unwrap();
1205        assert_eq!(options_str, "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}");
1206    }
1207
1208    #[test]
1209    fn test_fulltext_options_deserialization() {
1210        let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
1211        let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
1212        assert!(!options.case_sensitive);
1213        assert!(options.enable);
1214        assert_eq!(FulltextBackend::Bloom, options.backend);
1215        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1216        assert_eq!(1024, options.granularity);
1217        assert_eq!(0.001, options.false_positive_rate());
1218
1219        let options_str = serde_json::to_string(&options).unwrap();
1220        assert_eq!(options_str, original_options);
1221    }
1222
1223    #[test]
1224    fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
1225        // 0.14 to 0.15
1226        let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
1227        let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
1228        assert!(!options.case_sensitive);
1229        assert!(options.enable);
1230        assert_eq!(FulltextBackend::Bloom, options.backend);
1231        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1232        assert_eq!(DEFAULT_GRANULARITY, options.granularity);
1233        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1234
1235        let options_str = serde_json::to_string(&options).unwrap();
1236        assert_eq!(options_str, "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}");
1237    }
1238}