1use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::value::ValueRef;
21use memcomparable::Serializer;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::metadata::ColumnMetadata;
25use store_api::storage::ColumnId;
26
27use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
28use crate::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
29
30pub struct IndexValueCodec;
32
33impl IndexValueCodec {
34    pub fn encode_nonnull_value(
45        value: ValueRef,
46        field: &SortField,
47        buffer: &mut Vec<u8>,
48    ) -> Result<()> {
49        ensure!(!value.is_null(), IndexEncodeNullSnafu);
50
51        if field.data_type().is_string() {
52            let value = value
53                .try_into_string()
54                .context(FieldTypeMismatchSnafu)?
55                .context(IndexEncodeNullSnafu)?;
56            buffer.extend_from_slice(value.as_bytes());
57            Ok(())
58        } else {
59            buffer.reserve(field.estimated_size());
60            let mut serializer = Serializer::new(buffer);
61            field.serialize(&mut serializer, &value)
62        }
63    }
64}
65
66pub struct PkColInfo {
67    pub idx: usize,
68    pub field: SortField,
69}
70
71impl PkColInfo {
72    pub fn new(idx: usize, field: SortField) -> Self {
73        Self { idx, field }
74    }
75}
76
77pub struct IndexValuesCodec {
79    columns_mapping: HashMap<ColumnId, PkColInfo>,
81    decoder: Arc<dyn PrimaryKeyCodec>,
83}
84
85impl IndexValuesCodec {
86    pub fn from_tag_columns<'a>(
88        primary_key_encoding: PrimaryKeyEncoding,
89        tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
90    ) -> Self {
91        let (columns_mapping, fields): (HashMap<ColumnId, PkColInfo>, Vec<(ColumnId, SortField)>) =
92            tag_columns
93                .enumerate()
94                .map(|(idx, column)| {
95                    let col_id = column.column_id;
96                    let field = SortField::new(column.column_schema.data_type.clone());
97                    let pk_col_info = PkColInfo::new(idx, field.clone());
98                    ((col_id, pk_col_info), (col_id, field))
99                })
100                .unzip();
101
102        let decoder = build_primary_key_codec_with_fields(primary_key_encoding, fields.into_iter());
103
104        Self {
105            columns_mapping,
106            decoder,
107        }
108    }
109
110    pub fn pk_col_info(&self, column_id: ColumnId) -> Option<&PkColInfo> {
111        self.columns_mapping.get(&column_id)
112    }
113
114    pub fn decoder(&self) -> &dyn PrimaryKeyCodec {
115        self.decoder.as_ref()
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use datatypes::data_type::ConcreteDataType;
122    use datatypes::schema::ColumnSchema;
123    use datatypes::value::Value;
124    use store_api::metadata::ColumnMetadata;
125
126    use super::*;
127    use crate::error::Error;
128    use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
129
130    #[test]
131    fn test_encode_value_basic() {
132        let value = ValueRef::from("hello");
133        let field = SortField::new(ConcreteDataType::string_datatype());
134
135        let mut buffer = Vec::new();
136        IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer).unwrap();
137        assert!(!buffer.is_empty());
138    }
139
140    #[test]
141    fn test_encode_value_type_mismatch() {
142        let value = ValueRef::from("hello");
143        let field = SortField::new(ConcreteDataType::int64_datatype());
144
145        let mut buffer = Vec::new();
146        let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
147        assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
148    }
149
150    #[test]
151    fn test_encode_null_value() {
152        let value = ValueRef::Null;
153        let field = SortField::new(ConcreteDataType::string_datatype());
154
155        let mut buffer = Vec::new();
156        let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
157        assert!(matches!(res, Err(Error::IndexEncodeNull { .. })));
158    }
159
160    #[test]
161    fn test_decode_primary_key_basic() {
162        let tag_columns = [
163            ColumnMetadata {
164                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
165                semantic_type: api::v1::SemanticType::Tag,
166                column_id: 1,
167            },
168            ColumnMetadata {
169                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
170                semantic_type: api::v1::SemanticType::Tag,
171                column_id: 2,
172            },
173        ];
174
175        let primary_key = DensePrimaryKeyCodec::with_fields(vec![
176            (0, SortField::new(ConcreteDataType::string_datatype())),
177            (1, SortField::new(ConcreteDataType::int64_datatype())),
178        ])
179        .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
180        .unwrap();
181
182        let codec =
183            IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
184        let values = codec.decoder().decode(&primary_key).unwrap().into_dense();
185
186        assert_eq!(values.len(), 2);
187        assert_eq!(values[0], Value::Null);
188        assert_eq!(values[1], Value::Int64(10));
189    }
190}