1use std::collections::HashMap;
16use std::sync::Arc;
17
18use datatypes::data_type::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Serializer;
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::codec::PrimaryKeyEncoding;
23use store_api::metadata::ColumnMetadata;
24use store_api::storage::ColumnId;
25
26use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
27use crate::row_converter::{build_primary_key_codec_with_fields, PrimaryKeyCodec, SortField};
28
29pub struct IndexValueCodec;
31
32impl IndexValueCodec {
33 pub fn encode_nonnull_value(
44 value: ValueRef,
45 field: &SortField,
46 buffer: &mut Vec<u8>,
47 ) -> Result<()> {
48 ensure!(!value.is_null(), IndexEncodeNullSnafu);
49
50 if matches!(field.data_type, ConcreteDataType::String(_)) {
51 let value = value
52 .as_string()
53 .context(FieldTypeMismatchSnafu)?
54 .context(IndexEncodeNullSnafu)?;
55 buffer.extend_from_slice(value.as_bytes());
56 Ok(())
57 } else {
58 buffer.reserve(field.estimated_size());
59 let mut serializer = Serializer::new(buffer);
60 field.serialize(&mut serializer, &value)
61 }
62 }
63}
64
65pub struct PkColInfo {
66 pub idx: usize,
67 pub field: SortField,
68}
69
70impl PkColInfo {
71 pub fn new(idx: usize, field: SortField) -> Self {
72 Self { idx, field }
73 }
74}
75
76pub struct IndexValuesCodec {
78 columns_mapping: HashMap<ColumnId, PkColInfo>,
80 decoder: Arc<dyn PrimaryKeyCodec>,
82}
83
84impl IndexValuesCodec {
85 pub fn from_tag_columns<'a>(
87 primary_key_encoding: PrimaryKeyEncoding,
88 tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
89 ) -> Self {
90 let (columns_mapping, fields): (HashMap<ColumnId, PkColInfo>, Vec<(ColumnId, SortField)>) =
91 tag_columns
92 .enumerate()
93 .map(|(idx, column)| {
94 let col_id = column.column_id;
95 let field = SortField::new(column.column_schema.data_type.clone());
96 let pk_col_info = PkColInfo::new(idx, field.clone());
97 ((col_id, pk_col_info), (col_id, field))
98 })
99 .unzip();
100
101 let decoder = build_primary_key_codec_with_fields(primary_key_encoding, fields.into_iter());
102
103 Self {
104 columns_mapping,
105 decoder,
106 }
107 }
108
109 pub fn pk_col_info(&self, column_id: ColumnId) -> Option<&PkColInfo> {
110 self.columns_mapping.get(&column_id)
111 }
112
113 pub fn decoder(&self) -> &dyn PrimaryKeyCodec {
114 self.decoder.as_ref()
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use datatypes::data_type::ConcreteDataType;
121 use datatypes::schema::ColumnSchema;
122 use datatypes::value::Value;
123 use store_api::metadata::ColumnMetadata;
124
125 use super::*;
126 use crate::error::Error;
127 use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
128
129 #[test]
130 fn test_encode_value_basic() {
131 let value = ValueRef::from("hello");
132 let field = SortField::new(ConcreteDataType::string_datatype());
133
134 let mut buffer = Vec::new();
135 IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer).unwrap();
136 assert!(!buffer.is_empty());
137 }
138
139 #[test]
140 fn test_encode_value_type_mismatch() {
141 let value = ValueRef::from("hello");
142 let field = SortField::new(ConcreteDataType::int64_datatype());
143
144 let mut buffer = Vec::new();
145 let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
146 assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
147 }
148
149 #[test]
150 fn test_encode_null_value() {
151 let value = ValueRef::Null;
152 let field = SortField::new(ConcreteDataType::string_datatype());
153
154 let mut buffer = Vec::new();
155 let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
156 assert!(matches!(res, Err(Error::IndexEncodeNull { .. })));
157 }
158
159 #[test]
160 fn test_decode_primary_key_basic() {
161 let tag_columns = vec![
162 ColumnMetadata {
163 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
164 semantic_type: api::v1::SemanticType::Tag,
165 column_id: 1,
166 },
167 ColumnMetadata {
168 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
169 semantic_type: api::v1::SemanticType::Tag,
170 column_id: 2,
171 },
172 ];
173
174 let primary_key = DensePrimaryKeyCodec::with_fields(vec![
175 (0, SortField::new(ConcreteDataType::string_datatype())),
176 (1, SortField::new(ConcreteDataType::int64_datatype())),
177 ])
178 .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
179 .unwrap();
180
181 let codec =
182 IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
183 let values = codec.decoder().decode(&primary_key).unwrap().into_dense();
184
185 assert_eq!(values.len(), 2);
186 assert_eq!(values[0], Value::Null);
187 assert_eq!(values[1], Value::Int64(10));
188 }
189}