1use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::ValueRef;
22use memcomparable::Serializer;
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::ColumnMetadata;
26use store_api::storage::ColumnId;
27
28use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
29use crate::row_converter::{build_primary_key_codec_with_fields, PrimaryKeyCodec, SortField};
30
31pub struct IndexValueCodec;
33
34impl IndexValueCodec {
35 pub fn encode_nonnull_value(
46 value: ValueRef,
47 field: &SortField,
48 buffer: &mut Vec<u8>,
49 ) -> Result<()> {
50 ensure!(!value.is_null(), IndexEncodeNullSnafu);
51
52 if matches!(field.data_type(), ConcreteDataType::String(_)) {
53 let value = value
54 .as_string()
55 .context(FieldTypeMismatchSnafu)?
56 .context(IndexEncodeNullSnafu)?;
57 buffer.extend_from_slice(value.as_bytes());
58 Ok(())
59 } else {
60 buffer.reserve(field.estimated_size());
61 let mut serializer = Serializer::new(buffer);
62 field.serialize(&mut serializer, &value)
63 }
64 }
65}
66
67pub struct PkColInfo {
68 pub idx: usize,
69 pub field: SortField,
70}
71
72impl PkColInfo {
73 pub fn new(idx: usize, field: SortField) -> Self {
74 Self { idx, field }
75 }
76}
77
78pub struct IndexValuesCodec {
80 columns_mapping: HashMap<ColumnId, PkColInfo>,
82 decoder: Arc<dyn PrimaryKeyCodec>,
84}
85
86impl IndexValuesCodec {
87 pub fn from_tag_columns<'a>(
89 primary_key_encoding: PrimaryKeyEncoding,
90 tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
91 ) -> Self {
92 let (columns_mapping, fields): (HashMap<ColumnId, PkColInfo>, Vec<(ColumnId, SortField)>) =
93 tag_columns
94 .enumerate()
95 .map(|(idx, column)| {
96 let col_id = column.column_id;
97 let field = SortField::new(column.column_schema.data_type.clone());
98 let pk_col_info = PkColInfo::new(idx, field.clone());
99 ((col_id, pk_col_info), (col_id, field))
100 })
101 .unzip();
102
103 let decoder = build_primary_key_codec_with_fields(primary_key_encoding, fields.into_iter());
104
105 Self {
106 columns_mapping,
107 decoder,
108 }
109 }
110
111 pub fn pk_col_info(&self, column_id: ColumnId) -> Option<&PkColInfo> {
112 self.columns_mapping.get(&column_id)
113 }
114
115 pub fn decoder(&self) -> &dyn PrimaryKeyCodec {
116 self.decoder.as_ref()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use datatypes::data_type::ConcreteDataType;
123 use datatypes::schema::ColumnSchema;
124 use datatypes::value::Value;
125 use store_api::metadata::ColumnMetadata;
126
127 use super::*;
128 use crate::error::Error;
129 use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
130
131 #[test]
132 fn test_encode_value_basic() {
133 let value = ValueRef::from("hello");
134 let field = SortField::new(ConcreteDataType::string_datatype());
135
136 let mut buffer = Vec::new();
137 IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer).unwrap();
138 assert!(!buffer.is_empty());
139 }
140
141 #[test]
142 fn test_encode_value_type_mismatch() {
143 let value = ValueRef::from("hello");
144 let field = SortField::new(ConcreteDataType::int64_datatype());
145
146 let mut buffer = Vec::new();
147 let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
148 assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
149 }
150
151 #[test]
152 fn test_encode_null_value() {
153 let value = ValueRef::Null;
154 let field = SortField::new(ConcreteDataType::string_datatype());
155
156 let mut buffer = Vec::new();
157 let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
158 assert!(matches!(res, Err(Error::IndexEncodeNull { .. })));
159 }
160
161 #[test]
162 fn test_decode_primary_key_basic() {
163 let tag_columns = vec![
164 ColumnMetadata {
165 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
166 semantic_type: api::v1::SemanticType::Tag,
167 column_id: 1,
168 },
169 ColumnMetadata {
170 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), false),
171 semantic_type: api::v1::SemanticType::Tag,
172 column_id: 2,
173 },
174 ];
175
176 let primary_key = DensePrimaryKeyCodec::with_fields(vec![
177 (0, SortField::new(ConcreteDataType::string_datatype())),
178 (1, SortField::new(ConcreteDataType::int64_datatype())),
179 ])
180 .encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
181 .unwrap();
182
183 let codec =
184 IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
185 let values = codec.decoder().decode(&primary_key).unwrap().into_dense();
186
187 assert_eq!(values.len(), 2);
188 assert_eq!(values[0], Value::Null);
189 assert_eq!(values[1], Value::Int64(10));
190 }
191}