datatypes/schema/
column_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18
19use arrow::datatypes::Field;
20use serde::{Deserialize, Serialize};
21use snafu::{ResultExt, ensure};
22use sqlparser_derive::{Visit, VisitMut};
23
24use crate::data_type::{ConcreteDataType, DataType};
25use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result};
26use crate::schema::TYPE_KEY;
27use crate::schema::constraint::ColumnDefaultConstraint;
28use crate::value::Value;
29use crate::vectors::VectorRef;
30
31pub type Metadata = HashMap<String, String>;
32
33/// Key used to store whether the column is time index in arrow field's metadata.
34pub const TIME_INDEX_KEY: &str = "greptime:time_index";
35pub const COMMENT_KEY: &str = "greptime:storage:comment";
36/// Key used to store default constraint in arrow field's metadata.
37const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
38/// Key used to store fulltext options in arrow field's metadata.
39pub const FULLTEXT_KEY: &str = "greptime:fulltext";
40/// Key used to store whether the column has inverted index in arrow field's metadata.
41pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
42/// Key used to store skip options in arrow field's metadata.
43pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
44
45/// Keys used in fulltext options
46pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
47pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
48pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
49pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
50pub const COLUMN_FULLTEXT_OPT_KEY_GRANULARITY: &str = "granularity";
51pub const COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
52
53/// Keys used in SKIPPING index options
54pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
55pub const COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
56pub const COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE: &str = "type";
57
58pub const DEFAULT_GRANULARITY: u32 = 10240;
59
60pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
61
62/// Schema of a column, used as an immutable struct.
63#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct ColumnSchema {
65    pub name: String,
66    pub data_type: ConcreteDataType,
67    is_nullable: bool,
68    is_time_index: bool,
69    default_constraint: Option<ColumnDefaultConstraint>,
70    metadata: Metadata,
71}
72
73impl fmt::Debug for ColumnSchema {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        write!(
76            f,
77            "{} {} {}",
78            self.name,
79            self.data_type,
80            if self.is_nullable { "null" } else { "not null" },
81        )?;
82
83        if self.is_time_index {
84            write!(f, " time_index")?;
85        }
86
87        // Add default constraint if present
88        if let Some(default_constraint) = &self.default_constraint {
89            write!(f, " default={:?}", default_constraint)?;
90        }
91
92        // Add metadata if present
93        if !self.metadata.is_empty() {
94            write!(f, " metadata={:?}", self.metadata)?;
95        }
96
97        Ok(())
98    }
99}
100
101impl ColumnSchema {
102    pub fn new<T: Into<String>>(
103        name: T,
104        data_type: ConcreteDataType,
105        is_nullable: bool,
106    ) -> ColumnSchema {
107        ColumnSchema {
108            name: name.into(),
109            data_type,
110            is_nullable,
111            is_time_index: false,
112            default_constraint: None,
113            metadata: Metadata::new(),
114        }
115    }
116
117    #[inline]
118    pub fn is_time_index(&self) -> bool {
119        self.is_time_index
120    }
121
122    #[inline]
123    pub fn is_nullable(&self) -> bool {
124        self.is_nullable
125    }
126
127    #[inline]
128    pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
129        self.default_constraint.as_ref()
130    }
131
132    /// Check if the default constraint is a impure function.
133    pub fn is_default_impure(&self) -> bool {
134        self.default_constraint
135            .as_ref()
136            .map(|c| c.is_function())
137            .unwrap_or(false)
138    }
139
140    #[inline]
141    pub fn metadata(&self) -> &Metadata {
142        &self.metadata
143    }
144
145    #[inline]
146    pub fn mut_metadata(&mut self) -> &mut Metadata {
147        &mut self.metadata
148    }
149
150    /// Retrieve the column comment
151    pub fn column_comment(&self) -> Option<&String> {
152        self.metadata.get(COMMENT_KEY)
153    }
154
155    pub fn with_time_index(mut self, is_time_index: bool) -> Self {
156        self.is_time_index = is_time_index;
157        if is_time_index {
158            let _ = self
159                .metadata
160                .insert(TIME_INDEX_KEY.to_string(), "true".to_string());
161        } else {
162            let _ = self.metadata.remove(TIME_INDEX_KEY);
163        }
164        self
165    }
166
167    /// Set the inverted index for the column.
168    /// Similar to [with_inverted_index] but don't take the ownership.
169    ///
170    /// [with_inverted_index]: Self::with_inverted_index
171    pub fn set_inverted_index(&mut self, value: bool) {
172        match value {
173            true => {
174                self.metadata
175                    .insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
176            }
177            false => {
178                self.metadata.remove(INVERTED_INDEX_KEY);
179            }
180        }
181    }
182
183    /// Set the inverted index for the column.
184    /// Similar to [set_inverted_index] but take the ownership and return a owned value.
185    ///
186    /// [set_inverted_index]: Self::set_inverted_index
187    pub fn with_inverted_index(mut self, value: bool) -> Self {
188        self.set_inverted_index(value);
189        self
190    }
191
192    pub fn is_inverted_indexed(&self) -> bool {
193        self.metadata
194            .get(INVERTED_INDEX_KEY)
195            .map(|v| v.eq_ignore_ascii_case("true"))
196            .unwrap_or(false)
197    }
198
199    pub fn is_fulltext_indexed(&self) -> bool {
200        self.fulltext_options()
201            .unwrap_or_default()
202            .map(|option| option.enable)
203            .unwrap_or_default()
204    }
205
206    pub fn is_skipping_indexed(&self) -> bool {
207        self.skipping_index_options().unwrap_or_default().is_some()
208    }
209
210    pub fn has_inverted_index_key(&self) -> bool {
211        self.metadata.contains_key(INVERTED_INDEX_KEY)
212    }
213
214    /// Set default constraint.
215    ///
216    /// If a default constraint exists for the column, this method will
217    /// validate it against the column's data type and nullability.
218    pub fn with_default_constraint(
219        mut self,
220        default_constraint: Option<ColumnDefaultConstraint>,
221    ) -> Result<Self> {
222        if let Some(constraint) = &default_constraint {
223            constraint.validate(&self.data_type, self.is_nullable)?;
224        }
225
226        self.default_constraint = default_constraint;
227        Ok(self)
228    }
229
230    /// Set the nullablity to `true` of the column.
231    /// Similar to [set_nullable] but take the ownership and return a owned value.
232    ///
233    /// [set_nullable]: Self::set_nullable
234    pub fn with_nullable_set(mut self) -> Self {
235        self.is_nullable = true;
236        self
237    }
238
239    /// Set the nullability to `true` of the column.
240    /// Similar to [with_nullable_set] but don't take the ownership
241    ///
242    /// [with_nullable_set]: Self::with_nullable_set
243    pub fn set_nullable(&mut self) {
244        self.is_nullable = true;
245    }
246
247    /// Set the `is_time_index` to `true` of the column.
248    /// Similar to [with_time_index] but don't take the ownership.
249    ///
250    /// [with_time_index]: Self::with_time_index
251    pub fn set_time_index(&mut self) {
252        self.is_time_index = true;
253    }
254
255    /// Creates a new [`ColumnSchema`] with given metadata.
256    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
257        self.metadata = metadata;
258        self
259    }
260
261    /// Creates a vector with default value for this column.
262    ///
263    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
264    pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
265        match &self.default_constraint {
266            Some(c) => c
267                .create_default_vector(&self.data_type, self.is_nullable, num_rows)
268                .map(Some),
269            None => {
270                if self.is_nullable {
271                    // No default constraint, use null as default value.
272                    // TODO(yingwen): Use NullVector once it supports setting logical type.
273                    ColumnDefaultConstraint::null_value()
274                        .create_default_vector(&self.data_type, self.is_nullable, num_rows)
275                        .map(Some)
276                } else {
277                    Ok(None)
278                }
279            }
280        }
281    }
282
283    /// Creates a vector for padding.
284    ///
285    /// This method always returns a vector since it uses [DataType::default_value]
286    /// to fill the vector. Callers should only use the created vector for padding
287    /// and never read its content.
288    pub fn create_default_vector_for_padding(&self, num_rows: usize) -> VectorRef {
289        let padding_value = if self.is_nullable {
290            Value::Null
291        } else {
292            // If the column is not null, use the data type's default value as it is
293            // more efficient to acquire.
294            self.data_type.default_value()
295        };
296        let value_ref = padding_value.as_value_ref();
297        let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
298        for _ in 0..num_rows {
299            mutable_vector.push_value_ref(value_ref);
300        }
301        mutable_vector.to_vector()
302    }
303
304    /// Creates a default value for this column.
305    ///
306    /// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
307    pub fn create_default(&self) -> Result<Option<Value>> {
308        match &self.default_constraint {
309            Some(c) => c
310                .create_default(&self.data_type, self.is_nullable)
311                .map(Some),
312            None => {
313                if self.is_nullable {
314                    // No default constraint, use null as default value.
315                    ColumnDefaultConstraint::null_value()
316                        .create_default(&self.data_type, self.is_nullable)
317                        .map(Some)
318                } else {
319                    Ok(None)
320                }
321            }
322        }
323    }
324
325    /// Creates an impure default value for this column, only if it have a impure default constraint.
326    /// Otherwise, returns `Ok(None)`.
327    pub fn create_impure_default(&self) -> Result<Option<Value>> {
328        match &self.default_constraint {
329            Some(c) => c.create_impure_default(&self.data_type),
330            None => Ok(None),
331        }
332    }
333
334    /// Retrieves the fulltext options for the column.
335    pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
336        match self.metadata.get(FULLTEXT_KEY) {
337            None => Ok(None),
338            Some(json) => {
339                let options =
340                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
341                Ok(Some(options))
342            }
343        }
344    }
345
346    pub fn with_fulltext_options(mut self, options: FulltextOptions) -> Result<Self> {
347        self.metadata.insert(
348            FULLTEXT_KEY.to_string(),
349            serde_json::to_string(&options).context(error::SerializeSnafu)?,
350        );
351        Ok(self)
352    }
353
354    pub fn set_fulltext_options(&mut self, options: &FulltextOptions) -> Result<()> {
355        self.metadata.insert(
356            FULLTEXT_KEY.to_string(),
357            serde_json::to_string(options).context(error::SerializeSnafu)?,
358        );
359        Ok(())
360    }
361
362    /// Retrieves the skipping index options for the column.
363    pub fn skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
364        match self.metadata.get(SKIPPING_INDEX_KEY) {
365            None => Ok(None),
366            Some(json) => {
367                let options =
368                    serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
369                Ok(Some(options))
370            }
371        }
372    }
373
374    pub fn with_skipping_options(mut self, options: SkippingIndexOptions) -> Result<Self> {
375        self.metadata.insert(
376            SKIPPING_INDEX_KEY.to_string(),
377            serde_json::to_string(&options).context(error::SerializeSnafu)?,
378        );
379        Ok(self)
380    }
381
382    pub fn set_skipping_options(&mut self, options: &SkippingIndexOptions) -> Result<()> {
383        self.metadata.insert(
384            SKIPPING_INDEX_KEY.to_string(),
385            serde_json::to_string(options).context(error::SerializeSnafu)?,
386        );
387        Ok(())
388    }
389
390    pub fn unset_skipping_options(&mut self) -> Result<()> {
391        self.metadata.remove(SKIPPING_INDEX_KEY);
392        Ok(())
393    }
394}
395
396/// Column extended type set in column schema's metadata.
397#[derive(Debug, Clone, PartialEq, Eq)]
398pub enum ColumnExtType {
399    /// Json type.
400    Json,
401
402    /// Vector type with dimension.
403    Vector(u32),
404}
405
406impl fmt::Display for ColumnExtType {
407    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
408        match self {
409            ColumnExtType::Json => write!(f, "Json"),
410            ColumnExtType::Vector(dim) => write!(f, "Vector({})", dim),
411        }
412    }
413}
414
415impl FromStr for ColumnExtType {
416    type Err = String;
417
418    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
419        match s {
420            "Json" => Ok(ColumnExtType::Json),
421            _ if s.starts_with("Vector(") && s.ends_with(')') => s[7..s.len() - 1]
422                .parse::<u32>()
423                .map(ColumnExtType::Vector)
424                .map_err(|_| "Invalid dimension for Vector".to_string()),
425            _ => Err("Unknown variant".to_string()),
426        }
427    }
428}
429
430impl TryFrom<&Field> for ColumnSchema {
431    type Error = Error;
432
433    fn try_from(field: &Field) -> Result<ColumnSchema> {
434        let mut data_type = ConcreteDataType::try_from(field.data_type())?;
435        // Override the data type if it is specified in the metadata.
436        if let Some(s) = field.metadata().get(TYPE_KEY) {
437            let extype = ColumnExtType::from_str(s)
438                .map_err(|_| ParseExtendedTypeSnafu { value: s }.build())?;
439            match extype {
440                ColumnExtType::Json => {
441                    data_type = ConcreteDataType::json_datatype();
442                }
443                ColumnExtType::Vector(dim) => {
444                    data_type = ConcreteDataType::vector_datatype(dim);
445                }
446            }
447        }
448        let mut metadata = field.metadata().clone();
449        let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
450            Some(json) => {
451                Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
452            }
453            None => None,
454        };
455        let mut is_time_index = metadata.contains_key(TIME_INDEX_KEY);
456        if is_time_index && !data_type.is_timestamp() {
457            // If the column is time index but the data type is not timestamp, it is invalid.
458            // We set the time index to false and remove the metadata.
459            // This is possible if we cast the time index column to another type. DataFusion will
460            // keep the metadata:
461            // https://github.com/apache/datafusion/pull/12951
462            is_time_index = false;
463            metadata.remove(TIME_INDEX_KEY);
464            common_telemetry::debug!(
465                "Column {} is not timestamp ({:?}) but has time index metadata",
466                data_type,
467                field.name(),
468            );
469        }
470
471        Ok(ColumnSchema {
472            name: field.name().clone(),
473            data_type,
474            is_nullable: field.is_nullable(),
475            is_time_index,
476            default_constraint,
477            metadata,
478        })
479    }
480}
481
482impl TryFrom<&ColumnSchema> for Field {
483    type Error = Error;
484
485    fn try_from(column_schema: &ColumnSchema) -> Result<Field> {
486        let mut metadata = column_schema.metadata.clone();
487        if let Some(value) = &column_schema.default_constraint {
488            // Adds an additional metadata to store the default constraint.
489            let old = metadata.insert(
490                DEFAULT_CONSTRAINT_KEY.to_string(),
491                serde_json::to_string(&value).context(error::SerializeSnafu)?,
492            );
493
494            ensure!(
495                old.is_none(),
496                error::DuplicateMetaSnafu {
497                    key: DEFAULT_CONSTRAINT_KEY,
498                }
499            );
500        }
501
502        Ok(Field::new(
503            &column_schema.name,
504            column_schema.data_type.as_arrow_type(),
505            column_schema.is_nullable(),
506        )
507        .with_metadata(metadata))
508    }
509}
510
511/// Fulltext options for a column.
512#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
513#[serde(rename_all = "kebab-case")]
514pub struct FulltextOptions {
515    /// Whether the fulltext index is enabled.
516    pub enable: bool,
517    /// The fulltext analyzer to use.
518    #[serde(default)]
519    pub analyzer: FulltextAnalyzer,
520    /// Whether the fulltext index is case-sensitive.
521    #[serde(default)]
522    pub case_sensitive: bool,
523    /// The fulltext backend to use.
524    #[serde(default)]
525    pub backend: FulltextBackend,
526    /// The granularity of the fulltext index (for bloom backend only)
527    #[serde(default = "fulltext_options_default_granularity")]
528    pub granularity: u32,
529    /// The false positive rate of the fulltext index (for bloom backend only)
530    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
531    pub false_positive_rate_in_10000: u32,
532}
533
534fn fulltext_options_default_granularity() -> u32 {
535    DEFAULT_GRANULARITY
536}
537
538fn index_options_default_false_positive_rate_in_10000() -> u32 {
539    (DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
540}
541
542impl FulltextOptions {
543    /// Creates a new fulltext options.
544    pub fn new(
545        enable: bool,
546        analyzer: FulltextAnalyzer,
547        case_sensitive: bool,
548        backend: FulltextBackend,
549        granularity: u32,
550        false_positive_rate: f64,
551    ) -> Result<Self> {
552        ensure!(
553            0.0 < false_positive_rate && false_positive_rate <= 1.0,
554            error::InvalidFulltextOptionSnafu {
555                msg: format!(
556                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
557                ),
558            }
559        );
560        ensure!(
561            granularity > 0,
562            error::InvalidFulltextOptionSnafu {
563                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
564            }
565        );
566        Ok(Self::new_unchecked(
567            enable,
568            analyzer,
569            case_sensitive,
570            backend,
571            granularity,
572            false_positive_rate,
573        ))
574    }
575
576    /// Creates a new fulltext options without checking `false_positive_rate` and `granularity`.
577    pub fn new_unchecked(
578        enable: bool,
579        analyzer: FulltextAnalyzer,
580        case_sensitive: bool,
581        backend: FulltextBackend,
582        granularity: u32,
583        false_positive_rate: f64,
584    ) -> Self {
585        Self {
586            enable,
587            analyzer,
588            case_sensitive,
589            backend,
590            granularity,
591            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
592        }
593    }
594
595    /// Gets the false positive rate.
596    pub fn false_positive_rate(&self) -> f64 {
597        self.false_positive_rate_in_10000 as f64 / 10000.0
598    }
599}
600
601impl Default for FulltextOptions {
602    fn default() -> Self {
603        Self::new_unchecked(
604            false,
605            FulltextAnalyzer::default(),
606            false,
607            FulltextBackend::default(),
608            DEFAULT_GRANULARITY,
609            DEFAULT_FALSE_POSITIVE_RATE,
610        )
611    }
612}
613
614impl fmt::Display for FulltextOptions {
615    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616        write!(f, "enable={}", self.enable)?;
617        if self.enable {
618            write!(f, ", analyzer={}", self.analyzer)?;
619            write!(f, ", case_sensitive={}", self.case_sensitive)?;
620            write!(f, ", backend={}", self.backend)?;
621            if self.backend == FulltextBackend::Bloom {
622                write!(f, ", granularity={}", self.granularity)?;
623                write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
624            }
625        }
626        Ok(())
627    }
628}
629
630/// The backend of the fulltext index.
631#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
632#[serde(rename_all = "kebab-case")]
633pub enum FulltextBackend {
634    #[default]
635    Bloom,
636    Tantivy,
637}
638
639impl fmt::Display for FulltextBackend {
640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641        match self {
642            FulltextBackend::Tantivy => write!(f, "tantivy"),
643            FulltextBackend::Bloom => write!(f, "bloom"),
644        }
645    }
646}
647
648impl TryFrom<HashMap<String, String>> for FulltextOptions {
649    type Error = Error;
650
651    fn try_from(options: HashMap<String, String>) -> Result<Self> {
652        let mut fulltext_options = FulltextOptions {
653            enable: true,
654            ..Default::default()
655        };
656
657        if let Some(enable) = options.get(COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE) {
658            match enable.to_ascii_lowercase().as_str() {
659                "true" => fulltext_options.enable = true,
660                "false" => fulltext_options.enable = false,
661                _ => {
662                    return InvalidFulltextOptionSnafu {
663                        msg: format!("{enable}, expected: 'true' | 'false'"),
664                    }
665                    .fail();
666                }
667            }
668        };
669
670        if let Some(analyzer) = options.get(COLUMN_FULLTEXT_OPT_KEY_ANALYZER) {
671            match analyzer.to_ascii_lowercase().as_str() {
672                "english" => fulltext_options.analyzer = FulltextAnalyzer::English,
673                "chinese" => fulltext_options.analyzer = FulltextAnalyzer::Chinese,
674                _ => {
675                    return InvalidFulltextOptionSnafu {
676                        msg: format!("{analyzer}, expected: 'English' | 'Chinese'"),
677                    }
678                    .fail();
679                }
680            }
681        };
682
683        if let Some(case_sensitive) = options.get(COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE) {
684            match case_sensitive.to_ascii_lowercase().as_str() {
685                "true" => fulltext_options.case_sensitive = true,
686                "false" => fulltext_options.case_sensitive = false,
687                _ => {
688                    return InvalidFulltextOptionSnafu {
689                        msg: format!("{case_sensitive}, expected: 'true' | 'false'"),
690                    }
691                    .fail();
692                }
693            }
694        }
695
696        if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
697            match backend.to_ascii_lowercase().as_str() {
698                "bloom" => fulltext_options.backend = FulltextBackend::Bloom,
699                "tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
700                _ => {
701                    return InvalidFulltextOptionSnafu {
702                        msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
703                    }
704                    .fail();
705                }
706            }
707        }
708
709        if fulltext_options.backend == FulltextBackend::Bloom {
710            // Parse granularity with default value 10240
711            let granularity = match options.get(COLUMN_FULLTEXT_OPT_KEY_GRANULARITY) {
712                Some(value) => value
713                    .parse::<u32>()
714                    .ok()
715                    .filter(|&v| v > 0)
716                    .ok_or_else(|| {
717                        error::InvalidFulltextOptionSnafu {
718                            msg: format!(
719                                "Invalid granularity: {value}, expected: positive integer"
720                            ),
721                        }
722                        .build()
723                    })?,
724                None => DEFAULT_GRANULARITY,
725            };
726            fulltext_options.granularity = granularity;
727
728            // Parse false positive rate with default value 0.01
729            let false_positive_rate = match options.get(COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE)
730            {
731                Some(value) => value
732                    .parse::<f64>()
733                    .ok()
734                    .filter(|&v| v > 0.0 && v <= 1.0)
735                    .ok_or_else(|| {
736                        error::InvalidFulltextOptionSnafu {
737                            msg: format!(
738                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
739                            ),
740                        }
741                        .build()
742                    })?,
743                None => DEFAULT_FALSE_POSITIVE_RATE,
744            };
745            fulltext_options.false_positive_rate_in_10000 = (false_positive_rate * 10000.0) as u32;
746        }
747
748        Ok(fulltext_options)
749    }
750}
751
752/// Fulltext analyzer.
753#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
754pub enum FulltextAnalyzer {
755    #[default]
756    English,
757    Chinese,
758}
759
760impl fmt::Display for FulltextAnalyzer {
761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
762        match self {
763            FulltextAnalyzer::English => write!(f, "English"),
764            FulltextAnalyzer::Chinese => write!(f, "Chinese"),
765        }
766    }
767}
768
769/// Skipping options for a column.
770#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
771#[serde(rename_all = "kebab-case")]
772pub struct SkippingIndexOptions {
773    /// The granularity of the skip index.
774    pub granularity: u32,
775    /// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
776    #[serde(default = "index_options_default_false_positive_rate_in_10000")]
777    pub false_positive_rate_in_10000: u32,
778    /// The type of the skip index.
779    #[serde(default)]
780    pub index_type: SkippingIndexType,
781}
782
783impl SkippingIndexOptions {
784    /// Creates a new skipping index options without checking `false_positive_rate` and `granularity`.
785    pub fn new_unchecked(
786        granularity: u32,
787        false_positive_rate: f64,
788        index_type: SkippingIndexType,
789    ) -> Self {
790        Self {
791            granularity,
792            false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
793            index_type,
794        }
795    }
796
797    /// Creates a new skipping index options.
798    pub fn new(
799        granularity: u32,
800        false_positive_rate: f64,
801        index_type: SkippingIndexType,
802    ) -> Result<Self> {
803        ensure!(
804            0.0 < false_positive_rate && false_positive_rate <= 1.0,
805            error::InvalidSkippingIndexOptionSnafu {
806                msg: format!(
807                    "Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
808                ),
809            }
810        );
811        ensure!(
812            granularity > 0,
813            error::InvalidSkippingIndexOptionSnafu {
814                msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
815            }
816        );
817        Ok(Self::new_unchecked(
818            granularity,
819            false_positive_rate,
820            index_type,
821        ))
822    }
823
824    /// Gets the false positive rate.
825    pub fn false_positive_rate(&self) -> f64 {
826        self.false_positive_rate_in_10000 as f64 / 10000.0
827    }
828}
829
830impl Default for SkippingIndexOptions {
831    fn default() -> Self {
832        Self::new_unchecked(
833            DEFAULT_GRANULARITY,
834            DEFAULT_FALSE_POSITIVE_RATE,
835            SkippingIndexType::default(),
836        )
837    }
838}
839
840impl fmt::Display for SkippingIndexOptions {
841    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
842        write!(f, "granularity={}", self.granularity)?;
843        write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
844        write!(f, ", index_type={}", self.index_type)?;
845        Ok(())
846    }
847}
848
849/// Skip index types.
850#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
851pub enum SkippingIndexType {
852    #[default]
853    BloomFilter,
854}
855
856impl fmt::Display for SkippingIndexType {
857    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858        match self {
859            SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
860        }
861    }
862}
863
864impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
865    type Error = Error;
866
867    fn try_from(options: HashMap<String, String>) -> Result<Self> {
868        // Parse granularity with default value 1
869        let granularity = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY) {
870            Some(value) => value
871                .parse::<u32>()
872                .ok()
873                .filter(|&v| v > 0)
874                .ok_or_else(|| {
875                    error::InvalidSkippingIndexOptionSnafu {
876                        msg: format!("Invalid granularity: {value}, expected: positive integer"),
877                    }
878                    .build()
879                })?,
880            None => DEFAULT_GRANULARITY,
881        };
882
883        // Parse false positive rate with default value 100
884        let false_positive_rate =
885            match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE) {
886                Some(value) => value
887                    .parse::<f64>()
888                    .ok()
889                    .filter(|&v| v > 0.0 && v <= 1.0)
890                    .ok_or_else(|| {
891                        error::InvalidSkippingIndexOptionSnafu {
892                            msg: format!(
893                                "Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
894                            ),
895                        }
896                        .build()
897                    })?,
898                None => DEFAULT_FALSE_POSITIVE_RATE,
899            };
900
901        // Parse index type with default value BloomFilter
902        let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
903            Some(typ) => match typ.to_ascii_uppercase().as_str() {
904                "BLOOM" => SkippingIndexType::BloomFilter,
905                _ => {
906                    return error::InvalidSkippingIndexOptionSnafu {
907                        msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
908                    }
909                    .fail();
910                }
911            },
912            None => SkippingIndexType::default(),
913        };
914
915        Ok(SkippingIndexOptions::new_unchecked(
916            granularity,
917            false_positive_rate,
918            index_type,
919        ))
920    }
921}
922
923#[cfg(test)]
924mod tests {
925    use std::sync::Arc;
926
927    use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
928
929    use super::*;
930    use crate::value::Value;
931    use crate::vectors::Int32Vector;
932
933    #[test]
934    fn test_column_schema() {
935        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
936        let field = Field::try_from(&column_schema).unwrap();
937        assert_eq!("test", field.name());
938        assert_eq!(ArrowDataType::Int32, *field.data_type());
939        assert!(field.is_nullable());
940
941        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
942        assert_eq!(column_schema, new_column_schema);
943    }
944
945    #[test]
946    fn test_column_schema_with_default_constraint() {
947        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
948            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
949            .unwrap();
950        assert!(
951            column_schema
952                .metadata()
953                .get(DEFAULT_CONSTRAINT_KEY)
954                .is_none()
955        );
956
957        let field = Field::try_from(&column_schema).unwrap();
958        assert_eq!("test", field.name());
959        assert_eq!(ArrowDataType::Int32, *field.data_type());
960        assert!(field.is_nullable());
961        assert_eq!(
962            "{\"Value\":{\"Int32\":99}}",
963            field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
964        );
965
966        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
967        assert_eq!(column_schema, new_column_schema);
968    }
969
970    #[test]
971    fn test_column_schema_with_metadata() {
972        let metadata = Metadata::from([
973            ("k1".to_string(), "v1".to_string()),
974            (COMMENT_KEY.to_string(), "test comment".to_string()),
975        ]);
976        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
977            .with_metadata(metadata)
978            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
979            .unwrap();
980        assert_eq!("v1", column_schema.metadata().get("k1").unwrap());
981        assert_eq!("test comment", column_schema.column_comment().unwrap());
982        assert!(
983            column_schema
984                .metadata()
985                .get(DEFAULT_CONSTRAINT_KEY)
986                .is_none()
987        );
988
989        let field = Field::try_from(&column_schema).unwrap();
990        assert_eq!("v1", field.metadata().get("k1").unwrap());
991        let _ = field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap();
992
993        let new_column_schema = ColumnSchema::try_from(&field).unwrap();
994        assert_eq!(column_schema, new_column_schema);
995    }
996
997    #[test]
998    fn test_column_schema_with_duplicate_metadata() {
999        let metadata = Metadata::from([(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string())]);
1000        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1001            .with_metadata(metadata)
1002            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1003            .unwrap();
1004        assert!(Field::try_from(&column_schema).is_err());
1005    }
1006
1007    #[test]
1008    fn test_column_schema_invalid_default_constraint() {
1009        assert!(
1010            ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
1011                .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1012                .is_err()
1013        );
1014    }
1015
1016    #[test]
1017    fn test_column_default_constraint_try_into_from() {
1018        let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
1019
1020        let bytes: Vec<u8> = default_constraint.clone().try_into().unwrap();
1021        let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap();
1022
1023        assert_eq!(default_constraint, from_value);
1024    }
1025
1026    #[test]
1027    fn test_column_schema_create_default_null() {
1028        // Implicit default null.
1029        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1030        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1031        assert_eq!(5, v.len());
1032        assert!(v.only_null());
1033
1034        // Explicit default null.
1035        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1036            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1037            .unwrap();
1038        let v = column_schema.create_default_vector(5).unwrap().unwrap();
1039        assert_eq!(5, v.len());
1040        assert!(v.only_null());
1041    }
1042
1043    #[test]
1044    fn test_column_schema_no_default() {
1045        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1046        assert!(column_schema.create_default_vector(5).unwrap().is_none());
1047    }
1048
1049    #[test]
1050    fn test_create_default_vector_for_padding() {
1051        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1052        let vector = column_schema.create_default_vector_for_padding(4);
1053        assert!(vector.only_null());
1054        assert_eq!(4, vector.len());
1055
1056        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1057        let vector = column_schema.create_default_vector_for_padding(4);
1058        assert_eq!(4, vector.len());
1059        let expect: VectorRef = Arc::new(Int32Vector::from_slice([0, 0, 0, 0]));
1060        assert_eq!(expect, vector);
1061    }
1062
1063    #[test]
1064    fn test_column_schema_single_create_default_null() {
1065        // Implicit default null.
1066        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
1067        let v = column_schema.create_default().unwrap().unwrap();
1068        assert!(v.is_null());
1069
1070        // Explicit default null.
1071        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1072            .with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
1073            .unwrap();
1074        let v = column_schema.create_default().unwrap().unwrap();
1075        assert!(v.is_null());
1076    }
1077
1078    #[test]
1079    fn test_column_schema_single_create_default_not_null() {
1080        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
1081            .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(6))))
1082            .unwrap();
1083        let v = column_schema.create_default().unwrap().unwrap();
1084        assert_eq!(v, Value::Int32(6));
1085    }
1086
1087    #[test]
1088    fn test_column_schema_single_no_default() {
1089        let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
1090        assert!(column_schema.create_default().unwrap().is_none());
1091    }
1092
1093    #[test]
1094    fn test_debug_for_column_schema() {
1095        let column_schema_int8 =
1096            ColumnSchema::new("test_column_1", ConcreteDataType::int8_datatype(), true);
1097
1098        let column_schema_int32 =
1099            ColumnSchema::new("test_column_2", ConcreteDataType::int32_datatype(), false);
1100
1101        let formatted_int8 = format!("{:?}", column_schema_int8);
1102        let formatted_int32 = format!("{:?}", column_schema_int32);
1103        assert_eq!(formatted_int8, "test_column_1 Int8 null");
1104        assert_eq!(formatted_int32, "test_column_2 Int32 not null");
1105    }
1106
1107    #[test]
1108    fn test_from_field_to_column_schema() {
1109        let field = Field::new("test", ArrowDataType::Int32, true);
1110        let column_schema = ColumnSchema::try_from(&field).unwrap();
1111        assert_eq!("test", column_schema.name);
1112        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1113        assert!(column_schema.is_nullable);
1114        assert!(!column_schema.is_time_index);
1115        assert!(column_schema.default_constraint.is_none());
1116        assert!(column_schema.metadata.is_empty());
1117
1118        let field = Field::new("test", ArrowDataType::Binary, true);
1119        let field = field.with_metadata(Metadata::from([(
1120            TYPE_KEY.to_string(),
1121            ConcreteDataType::json_datatype().name(),
1122        )]));
1123        let column_schema = ColumnSchema::try_from(&field).unwrap();
1124        assert_eq!("test", column_schema.name);
1125        assert_eq!(ConcreteDataType::json_datatype(), column_schema.data_type);
1126        assert!(column_schema.is_nullable);
1127        assert!(!column_schema.is_time_index);
1128        assert!(column_schema.default_constraint.is_none());
1129        assert_eq!(
1130            column_schema.metadata.get(TYPE_KEY).unwrap(),
1131            &ConcreteDataType::json_datatype().name()
1132        );
1133
1134        let field = Field::new("test", ArrowDataType::Binary, true);
1135        let field = field.with_metadata(Metadata::from([(
1136            TYPE_KEY.to_string(),
1137            ConcreteDataType::vector_datatype(3).name(),
1138        )]));
1139        let column_schema = ColumnSchema::try_from(&field).unwrap();
1140        assert_eq!("test", column_schema.name);
1141        assert_eq!(
1142            ConcreteDataType::vector_datatype(3),
1143            column_schema.data_type
1144        );
1145        assert!(column_schema.is_nullable);
1146        assert!(!column_schema.is_time_index);
1147        assert!(column_schema.default_constraint.is_none());
1148        assert_eq!(
1149            column_schema.metadata.get(TYPE_KEY).unwrap(),
1150            &ConcreteDataType::vector_datatype(3).name()
1151        );
1152    }
1153
1154    #[test]
1155    fn test_column_schema_fix_time_index() {
1156        let field = Field::new(
1157            "test",
1158            ArrowDataType::Timestamp(TimeUnit::Second, None),
1159            false,
1160        );
1161        let field = field.with_metadata(Metadata::from([(
1162            TIME_INDEX_KEY.to_string(),
1163            "true".to_string(),
1164        )]));
1165        let column_schema = ColumnSchema::try_from(&field).unwrap();
1166        assert_eq!("test", column_schema.name);
1167        assert_eq!(
1168            ConcreteDataType::timestamp_second_datatype(),
1169            column_schema.data_type
1170        );
1171        assert!(!column_schema.is_nullable);
1172        assert!(column_schema.is_time_index);
1173        assert!(column_schema.default_constraint.is_none());
1174        assert_eq!(1, column_schema.metadata().len());
1175
1176        let field = Field::new("test", ArrowDataType::Int32, false);
1177        let field = field.with_metadata(Metadata::from([(
1178            TIME_INDEX_KEY.to_string(),
1179            "true".to_string(),
1180        )]));
1181        let column_schema = ColumnSchema::try_from(&field).unwrap();
1182        assert_eq!("test", column_schema.name);
1183        assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
1184        assert!(!column_schema.is_nullable);
1185        assert!(!column_schema.is_time_index);
1186        assert!(column_schema.default_constraint.is_none());
1187        assert!(column_schema.metadata.is_empty());
1188    }
1189
1190    #[test]
1191    fn test_skipping_index_options_deserialization() {
1192        let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
1193        let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
1194        assert_eq!(1024, options.granularity);
1195        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1196        assert_eq!(0.001, options.false_positive_rate());
1197
1198        let options_str = serde_json::to_string(&options).unwrap();
1199        assert_eq!(options_str, original_options);
1200    }
1201
1202    #[test]
1203    fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
1204        let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
1205        let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
1206        assert_eq!(10240, options.granularity);
1207        assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
1208        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1209
1210        let options_str = serde_json::to_string(&options).unwrap();
1211        assert_eq!(
1212            options_str,
1213            "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}"
1214        );
1215    }
1216
1217    #[test]
1218    fn test_fulltext_options_deserialization() {
1219        let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
1220        let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
1221        assert!(!options.case_sensitive);
1222        assert!(options.enable);
1223        assert_eq!(FulltextBackend::Bloom, options.backend);
1224        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1225        assert_eq!(1024, options.granularity);
1226        assert_eq!(0.001, options.false_positive_rate());
1227
1228        let options_str = serde_json::to_string(&options).unwrap();
1229        assert_eq!(options_str, original_options);
1230    }
1231
1232    #[test]
1233    fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
1234        // 0.14 to 0.15
1235        let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
1236        let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
1237        assert!(!options.case_sensitive);
1238        assert!(options.enable);
1239        assert_eq!(FulltextBackend::Bloom, options.backend);
1240        assert_eq!(FulltextAnalyzer::default(), options.analyzer);
1241        assert_eq!(DEFAULT_GRANULARITY, options.granularity);
1242        assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
1243
1244        let options_str = serde_json::to_string(&options).unwrap();
1245        assert_eq!(
1246            options_str,
1247            "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}"
1248        );
1249    }
1250}