mito_codec/row_converter/
sparse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::BufMut;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::prelude::ConcreteDataType;
21use datatypes::value::{Value, ValueRef};
22use memcomparable::{Deserializer, Serializer};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use store_api::storage::consts::ReservedColumnId;
29
30use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
31use crate::key_values::KeyValue;
32use crate::primary_key_filter::SparsePrimaryKeyFilter;
33use crate::row_converter::dense::SortField;
34use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
35
36/// A codec for sparse key of metrics.
37/// It requires the input primary key columns are sorted by the column name in lexicographical order.
38/// It encodes the column id of the physical region.
39#[derive(Clone, Debug)]
40pub struct SparsePrimaryKeyCodec {
41    inner: Arc<SparsePrimaryKeyCodecInner>,
42}
43
44#[derive(Debug)]
45struct SparsePrimaryKeyCodecInner {
46    // Internal fields
47    table_id_field: SortField,
48    // Internal fields
49    tsid_field: SortField,
50    // User defined label field
51    label_field: SortField,
52    // Columns in primary key
53    //
54    // None means all unknown columns is primary key(`Self::label_field`).
55    columns: Option<HashSet<ColumnId>>,
56}
57
58/// Sparse values representation.
59///
60/// A map of [`ColumnId`] to [`Value`].
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct SparseValues {
63    values: HashMap<ColumnId, Value>,
64}
65
66impl SparseValues {
67    /// Creates a new [`SparseValues`] instance.
68    pub fn new(values: HashMap<ColumnId, Value>) -> Self {
69        Self { values }
70    }
71
72    /// Returns the value of the given column, or [`Value::Null`] if the column is not present.
73    pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
74        self.values.get(&column_id).unwrap_or(&Value::Null)
75    }
76
77    /// Returns the value of the given column, or [`None`] if the column is not present.
78    pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
79        self.values.get(column_id)
80    }
81
82    /// Inserts a new value into the [`SparseValues`].
83    pub fn insert(&mut self, column_id: ColumnId, value: Value) {
84        self.values.insert(column_id, value);
85    }
86}
87
88/// The column id of the tsid.
89pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
90/// The column id of the table id.
91pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
92/// The size of the column id in the encoded sparse row.
93pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
94
95impl SparsePrimaryKeyCodec {
96    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
97    pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
98        let columns = columns_ids.collect();
99        Self {
100            inner: Arc::new(SparsePrimaryKeyCodecInner {
101                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
102                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
103                label_field: SortField::new(ConcreteDataType::string_datatype()),
104                columns: Some(columns),
105            }),
106        }
107    }
108
109    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
110    pub fn new(region_metadata: &RegionMetadataRef) -> Self {
111        Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id))
112    }
113
114    /// Returns a new [`SparsePrimaryKeyCodec`] instance.
115    ///
116    /// It treats all unknown columns as primary key(label field).
117    pub fn schemaless() -> Self {
118        Self {
119            inner: Arc::new(SparsePrimaryKeyCodecInner {
120                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
121                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
122                label_field: SortField::new(ConcreteDataType::string_datatype()),
123                columns: None,
124            }),
125        }
126    }
127
128    /// Creates a new [`SparsePrimaryKeyCodec`] instance with additional label `fields`.
129    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
130        Self {
131            inner: Arc::new(SparsePrimaryKeyCodecInner {
132                columns: Some(fields.iter().map(|f| f.0).collect()),
133                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
134                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
135                label_field: SortField::new(ConcreteDataType::string_datatype()),
136            }),
137        }
138    }
139
140    /// Returns the field of the given column id.
141    fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
142        // if the `columns` is not specified, all unknown columns is primary key(label field).
143        if let Some(columns) = &self.inner.columns
144            && !columns.contains(&column_id)
145        {
146            return None;
147        }
148
149        match column_id {
150            RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
151            RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
152            _ => Some(&self.inner.label_field),
153        }
154    }
155
156    /// Encodes the given bytes into a [`SparseValues`].
157    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
158    where
159        I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
160    {
161        let mut serializer = Serializer::new(buffer);
162        for (column_id, value) in row {
163            if value.is_null() {
164                continue;
165            }
166
167            if let Some(field) = self.get_field(column_id) {
168                column_id
169                    .serialize(&mut serializer)
170                    .context(SerializeFieldSnafu)?;
171                field.serialize(&mut serializer, &value)?;
172            } else {
173                // TODO(weny): handle the error.
174                common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
175            }
176        }
177        Ok(())
178    }
179
180    pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
181    where
182        I: Iterator<Item = (ColumnId, &'a [u8])>,
183    {
184        for (tag_column_id, tag_value) in row {
185            let value_len = tag_value.len();
186            buffer.reserve(6 + value_len / 8 * 9);
187            buffer.put_u32(tag_column_id);
188            buffer.put_u8(1);
189            buffer.put_u8(!tag_value.is_empty() as u8);
190
191            // Manual implementation of memcomparable::ser::Serializer::serialize_bytes
192            // to avoid byte-by-byte put.
193            let mut len = 0;
194            let num_chucks = value_len / 8;
195            let remainder = value_len % 8;
196
197            for idx in 0..num_chucks {
198                buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]);
199                len += 8;
200                // append an extra byte that signals the number of significant bytes in this chunk
201                // 1-8: many bytes were significant and this group is the last group
202                // 9: all 8 bytes were significant and there is more data to come
203                let extra = if len == value_len { 8 } else { 9 };
204                buffer.put_u8(extra);
205            }
206
207            if remainder != 0 {
208                buffer.extend_from_slice(&tag_value[len..value_len]);
209                buffer.put_bytes(0, 8 - remainder);
210                buffer.put_u8(remainder as u8);
211            }
212        }
213        Ok(())
214    }
215
216    /// Encodes the given bytes into a [`SparseValues`].
217    pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec<u8>) -> Result<()> {
218        buffer.reserve_exact(22);
219        buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID);
220        buffer.put_u8(1);
221        buffer.put_u32(table_id);
222        buffer.put_u32(RESERVED_COLUMN_ID_TSID);
223        buffer.put_u8(1);
224        buffer.put_u64(tsid);
225        Ok(())
226    }
227
228    /// Decodes the given bytes into a [`SparseValues`].
229    fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
230        let mut deserializer = Deserializer::new(bytes);
231        let mut values = SparseValues::new(HashMap::new());
232
233        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
234        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
235        values.insert(column_id, value);
236
237        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
238        let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
239        values.insert(column_id, value);
240        while deserializer.has_remaining() {
241            let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
242            let value = self.inner.label_field.deserialize(&mut deserializer)?;
243            values.insert(column_id, value);
244        }
245
246        Ok(values)
247    }
248
249    /// Decodes the given bytes into a [`Value`].
250    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
251        let mut deserializer = Deserializer::new(bytes);
252        // Skip the column id.
253        deserializer.advance(COLUMN_ID_ENCODE_SIZE);
254        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
255        Ok(Some(value))
256    }
257
258    /// Returns the offset of the given column id in the given primary key.
259    pub fn has_column(
260        &self,
261        pk: &[u8],
262        offsets_map: &mut HashMap<u32, usize>,
263        column_id: ColumnId,
264    ) -> Option<usize> {
265        if offsets_map.is_empty() {
266            let mut deserializer = Deserializer::new(pk);
267            let mut offset = 0;
268            while deserializer.has_remaining() {
269                let column_id = u32::deserialize(&mut deserializer).unwrap();
270                offset += 4;
271                offsets_map.insert(column_id, offset);
272                let Some(field) = self.get_field(column_id) else {
273                    break;
274                };
275
276                let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
277                offset += skip;
278            }
279
280            offsets_map.get(&column_id).copied()
281        } else {
282            offsets_map.get(&column_id).copied()
283        }
284    }
285
286    /// Decode value at `offset` in `pk`.
287    pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
288        let mut deserializer = Deserializer::new(pk);
289        deserializer.advance(offset);
290        // Safety: checked by `has_column`
291        let field = self.get_field(column_id).unwrap();
292        field.deserialize(&mut deserializer)
293    }
294}
295
296impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
297    fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
298        UnsupportedOperationSnafu {
299            err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
300        }
301        .fail()
302    }
303
304    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
305        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
306    }
307
308    fn encode_value_refs(
309        &self,
310        values: &[(ColumnId, ValueRef)],
311        buffer: &mut Vec<u8>,
312    ) -> Result<()> {
313        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.clone())), buffer)
314    }
315
316    fn estimated_size(&self) -> Option<usize> {
317        None
318    }
319
320    fn num_fields(&self) -> Option<usize> {
321        None
322    }
323
324    fn encoding(&self) -> PrimaryKeyEncoding {
325        PrimaryKeyEncoding::Sparse
326    }
327
328    fn primary_key_filter(
329        &self,
330        metadata: &RegionMetadataRef,
331        filters: Arc<Vec<SimpleFilterEvaluator>>,
332    ) -> Box<dyn PrimaryKeyFilter> {
333        Box::new(SparsePrimaryKeyFilter::new(
334            metadata.clone(),
335            filters,
336            self.clone(),
337        ))
338    }
339
340    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
341        Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
342    }
343
344    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
345        self.decode_leftmost(bytes)
346    }
347}
348
349/// Field with column id.
350pub struct FieldWithId {
351    pub field: SortField,
352    pub column_id: ColumnId,
353}
354
355/// A special encoder for memtable.
356pub struct SparseEncoder {
357    fields: Vec<FieldWithId>,
358}
359
360impl SparseEncoder {
361    pub fn new(fields: Vec<FieldWithId>) -> Self {
362        Self { fields }
363    }
364
365    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
366    where
367        I: Iterator<Item = ValueRef<'a>>,
368    {
369        let mut serializer = Serializer::new(buffer);
370        for (value, field) in row.zip(self.fields.iter()) {
371            if !value.is_null() {
372                field
373                    .column_id
374                    .serialize(&mut serializer)
375                    .context(SerializeFieldSnafu)?;
376                field.field.serialize(&mut serializer, &value)?;
377            }
378        }
379        Ok(())
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use std::sync::Arc;
386
387    use api::v1::SemanticType;
388    use common_query::prelude::{greptime_timestamp, greptime_value};
389    use common_time::Timestamp;
390    use common_time::timestamp::TimeUnit;
391    use datatypes::schema::ColumnSchema;
392    use datatypes::value::{OrderedFloat, Value};
393    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
394    use store_api::metric_engine_consts::{
395        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
396    };
397    use store_api::storage::{ColumnId, RegionId};
398
399    use super::*;
400
401    fn test_region_metadata() -> RegionMetadataRef {
402        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
403        builder
404            .push_column_metadata(ColumnMetadata {
405                column_schema: ColumnSchema::new(
406                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
407                    ConcreteDataType::uint32_datatype(),
408                    false,
409                ),
410                semantic_type: SemanticType::Tag,
411                column_id: ReservedColumnId::table_id(),
412            })
413            .push_column_metadata(ColumnMetadata {
414                column_schema: ColumnSchema::new(
415                    DATA_SCHEMA_TSID_COLUMN_NAME,
416                    ConcreteDataType::uint64_datatype(),
417                    false,
418                ),
419                semantic_type: SemanticType::Tag,
420                column_id: ReservedColumnId::tsid(),
421            })
422            .push_column_metadata(ColumnMetadata {
423                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
424                semantic_type: SemanticType::Tag,
425                column_id: 1,
426            })
427            .push_column_metadata(ColumnMetadata {
428                column_schema: ColumnSchema::new(
429                    "namespace",
430                    ConcreteDataType::string_datatype(),
431                    true,
432                ),
433                semantic_type: SemanticType::Tag,
434                column_id: 2,
435            })
436            .push_column_metadata(ColumnMetadata {
437                column_schema: ColumnSchema::new(
438                    "container",
439                    ConcreteDataType::string_datatype(),
440                    true,
441                ),
442                semantic_type: SemanticType::Tag,
443                column_id: 3,
444            })
445            .push_column_metadata(ColumnMetadata {
446                column_schema: ColumnSchema::new(
447                    "pod_name",
448                    ConcreteDataType::string_datatype(),
449                    true,
450                ),
451                semantic_type: SemanticType::Tag,
452                column_id: 4,
453            })
454            .push_column_metadata(ColumnMetadata {
455                column_schema: ColumnSchema::new(
456                    "pod_ip",
457                    ConcreteDataType::string_datatype(),
458                    true,
459                ),
460                semantic_type: SemanticType::Tag,
461                column_id: 5,
462            })
463            .push_column_metadata(ColumnMetadata {
464                column_schema: ColumnSchema::new(
465                    greptime_value(),
466                    ConcreteDataType::float64_datatype(),
467                    false,
468                ),
469                semantic_type: SemanticType::Field,
470                column_id: 6,
471            })
472            .push_column_metadata(ColumnMetadata {
473                column_schema: ColumnSchema::new(
474                    greptime_timestamp(),
475                    ConcreteDataType::timestamp_nanosecond_datatype(),
476                    false,
477                ),
478                semantic_type: SemanticType::Timestamp,
479                column_id: 7,
480            })
481            .primary_key(vec![
482                ReservedColumnId::table_id(),
483                ReservedColumnId::tsid(),
484                1,
485                2,
486                3,
487                4,
488                5,
489            ]);
490        let metadata = builder.build().unwrap();
491        Arc::new(metadata)
492    }
493
494    #[test]
495    fn test_sparse_value_new_and_get_or_null() {
496        let mut values = HashMap::new();
497        values.insert(1, Value::Int32(42));
498        let sparse_value = SparseValues::new(values);
499
500        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
501        assert_eq!(sparse_value.get_or_null(2), &Value::Null);
502    }
503
504    #[test]
505    fn test_sparse_value_insert() {
506        let mut sparse_value = SparseValues::new(HashMap::new());
507        sparse_value.insert(1, Value::Int32(42));
508
509        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
510    }
511
512    fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
513        vec![
514            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
515            (
516                RESERVED_COLUMN_ID_TSID,
517                ValueRef::UInt64(123843349035232323),
518            ),
519            // label: pod
520            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
521            // label: namespace
522            (2, ValueRef::String("greptime-cluster")),
523            // label: container
524            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
525            // label: pod_name
526            (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
527            // label: pod_ip
528            (5, ValueRef::String("10.10.10.10")),
529            // field: greptime_value
530            (6, ValueRef::Float64(OrderedFloat(1.0))),
531            // field: greptime_timestamp
532            (
533                7,
534                ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
535            ),
536        ]
537    }
538
539    #[test]
540    fn test_encode_by_short_cuts() {
541        let region_metadata = test_region_metadata();
542        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
543        let mut buffer = Vec::new();
544        let internal_columns = [
545            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)),
546            (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)),
547        ];
548        let tags = [
549            (1, "greptime-frontend-6989d9899-22222"),
550            (2, "greptime-cluster"),
551            (3, "greptime-frontend-6989d9899-22222"),
552            (4, "greptime-frontend-6989d9899-22222"),
553            (5, "10.10.10.10"),
554        ];
555        codec
556            .encode_to_vec(internal_columns.into_iter(), &mut buffer)
557            .unwrap();
558        codec
559            .encode_to_vec(
560                tags.iter()
561                    .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
562                &mut buffer,
563            )
564            .unwrap();
565
566        let mut buffer_by_raw_encoding = Vec::new();
567        codec
568            .encode_internal(1024, 42, &mut buffer_by_raw_encoding)
569            .unwrap();
570        let tags: Vec<_> = tags
571            .into_iter()
572            .map(|(col_id, tag_value)| (col_id, tag_value.as_bytes()))
573            .collect();
574        codec
575            .encode_raw_tag_value(
576                tags.iter().map(|(c, b)| (*c, *b)),
577                &mut buffer_by_raw_encoding,
578            )
579            .unwrap();
580        assert_eq!(buffer, buffer_by_raw_encoding);
581    }
582
583    #[test]
584    fn test_encode_to_vec() {
585        let region_metadata = test_region_metadata();
586        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
587        let mut buffer = Vec::new();
588
589        let row = test_row();
590        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
591        assert!(!buffer.is_empty());
592        let sparse_value = codec.decode_sparse(&buffer).unwrap();
593        assert_eq!(
594            sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
595            &Value::UInt32(42)
596        );
597        assert_eq!(
598            sparse_value.get_or_null(1),
599            &Value::String("greptime-frontend-6989d9899-22222".into())
600        );
601        assert_eq!(
602            sparse_value.get_or_null(2),
603            &Value::String("greptime-cluster".into())
604        );
605        assert_eq!(
606            sparse_value.get_or_null(3),
607            &Value::String("greptime-frontend-6989d9899-22222".into())
608        );
609        assert_eq!(
610            sparse_value.get_or_null(4),
611            &Value::String("greptime-frontend-6989d9899-22222".into())
612        );
613        assert_eq!(
614            sparse_value.get_or_null(5),
615            &Value::String("10.10.10.10".into())
616        );
617    }
618
619    #[test]
620    fn test_decode_leftmost() {
621        let region_metadata = test_region_metadata();
622        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
623        let mut buffer = Vec::new();
624        let row = test_row();
625        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
626        assert!(!buffer.is_empty());
627        let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
628        assert_eq!(result, Value::UInt32(42));
629    }
630
631    #[test]
632    fn test_has_column() {
633        let region_metadata = test_region_metadata();
634        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
635        let mut buffer = Vec::new();
636        let row = test_row();
637        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
638        assert!(!buffer.is_empty());
639
640        let mut offsets_map = HashMap::new();
641        for column_id in [
642            RESERVED_COLUMN_ID_TABLE_ID,
643            RESERVED_COLUMN_ID_TSID,
644            1,
645            2,
646            3,
647            4,
648            5,
649        ] {
650            let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
651            assert!(offset.is_some());
652        }
653
654        let offset = codec.has_column(&buffer, &mut offsets_map, 6);
655        assert!(offset.is_none());
656    }
657
658    #[test]
659    fn test_decode_value_at() {
660        let region_metadata = test_region_metadata();
661        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
662        let mut buffer = Vec::new();
663        let row = test_row();
664        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
665        assert!(!buffer.is_empty());
666
667        let row = test_row();
668        let mut offsets_map = HashMap::new();
669        for column_id in [
670            RESERVED_COLUMN_ID_TABLE_ID,
671            RESERVED_COLUMN_ID_TSID,
672            1,
673            2,
674            3,
675            4,
676            5,
677        ] {
678            let offset = codec
679                .has_column(&buffer, &mut offsets_map, column_id)
680                .unwrap();
681            let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
682            let expected_value = row
683                .iter()
684                .find(|(id, _)| *id == column_id)
685                .unwrap()
686                .1
687                .clone();
688            assert_eq!(value.as_value_ref(), expected_value);
689        }
690    }
691}