Skip to main content

mito_codec/row_converter/
sparse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::BufMut;
19use common_recordbatch::filter::SimpleFilterEvaluator;
20use datatypes::prelude::ConcreteDataType;
21use datatypes::value::{Value, ValueRef};
22use memcomparable::{Deserializer, Serializer};
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use store_api::storage::consts::ReservedColumnId;
29
30use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
31use crate::key_values::KeyValue;
32use crate::primary_key_filter::SparsePrimaryKeyFilter;
33use crate::row_converter::dense::SortField;
34use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
35
36/// A codec for sparse key of metrics.
37///
38/// ## Encoding format
39/// Each primary key is encoded as a sequence of `(column_id, value)` pairs:
40/// - The first two fields are always the reserved `table_id` (uint32) and `tsid` (uint64).
41/// - User-defined labels follow, sorted by **column name** in lexicographical order.
42/// - Null values are omitted (not encoded).
43///
44/// The `column_id` is encoded as a 4-byte big-endian integer, and the value is encoded
45/// using memcomparable serialization.
46///
47/// `decode_leftmost` always decodes the first value from the encoded bytes (i.e., the
48/// `table_id` field).
49///
50/// ## Requirements
51/// It requires the input primary key columns are sorted by the column name in lexicographical order.
52/// It encodes the column id of the physical region.
53#[derive(Clone, Debug)]
54pub struct SparsePrimaryKeyCodec {
55    inner: Arc<SparsePrimaryKeyCodecInner>,
56}
57
58#[derive(Debug)]
59struct SparsePrimaryKeyCodecInner {
60    // Internal fields
61    table_id_field: SortField,
62    // Internal fields
63    tsid_field: SortField,
64    // User defined label field
65    label_field: SortField,
66    // Columns in primary key
67    //
68    // None means all unknown columns is primary key(`Self::label_field`).
69    columns: Option<HashSet<ColumnId>>,
70}
71
72/// Sparse values representation.
73///
74/// Callers must not insert a column id that is already present; otherwise
75/// the existing entry will shadow the newly inserted value on lookup.
76#[derive(Debug, Clone, PartialEq, Eq, Default)]
77pub struct SparseValues {
78    values: Vec<(ColumnId, Value)>,
79}
80
81impl SparseValues {
82    /// Creates an empty [`SparseValues`].
83    pub fn new() -> Self {
84        Self { values: Vec::new() }
85    }
86
87    /// Creates an empty [`SparseValues`] with space reserved for `cap` entries.
88    pub fn with_capacity(cap: usize) -> Self {
89        Self {
90            values: Vec::with_capacity(cap),
91        }
92    }
93
94    /// Returns the value of the given column, or [`Value::Null`] if the column is not present.
95    pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
96        for (id, value) in &self.values {
97            if *id == column_id {
98                return value;
99            }
100        }
101        &Value::Null
102    }
103
104    /// Returns the value of the given column, or [`None`] if the column is not present.
105    pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
106        for (id, value) in &self.values {
107            if id == column_id {
108                return Some(value);
109            }
110        }
111        None
112    }
113
114    /// Appends a new `(column_id, value)` pair.
115    ///
116    /// Append-only: the caller must ensure `column_id` is not already present.
117    pub fn insert(&mut self, column_id: ColumnId, value: Value) {
118        self.values.push((column_id, value));
119    }
120
121    /// Returns an iterator over all stored column id/value pairs.
122    pub fn iter(&self) -> impl Iterator<Item = (&ColumnId, &Value)> {
123        self.values.iter().map(|(id, value)| (id, value))
124    }
125}
126
127/// The column id of the tsid.
128pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
129/// The column id of the table id.
130pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
131/// The size of the column id in the encoded sparse row.
132pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
133
134// Fixed byte offsets for reserved columns in the sparse encoding.
135// Layout: [table_id_col_id: 4B][marker: 1B][table_id: 4B][tsid_col_id: 4B][marker: 1B][tsid: 8B]
136/// Byte offset to the table_id value (after its 4-byte column id).
137const TABLE_ID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE;
138/// Byte offset to the tsid value (after 9-byte table_id entry + 4-byte tsid column id).
139const TSID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE;
140/// Byte offset where tag columns start (after 9-byte table_id + 13-byte tsid entries).
141const TAGS_START_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE + 9;
142
143/// Inline capacity for the small-vec fast path of [`SparseOffsetsCache`].
144///
145/// Most sparse primary keys carry only a handful of tags, so a linear scan
146/// over a short `Vec` beats a `HashMap` lookup. Tags beyond this capacity
147/// spill into the overflow `HashMap`.
148const SPARSE_OFFSETS_INLINE_CAP: usize = 32;
149
150/// A lazily populated cache of tag column offsets inside a sparse primary key.
151#[derive(Debug, Clone)]
152pub struct SparseOffsetsCache {
153    /// Small-vec fast path. Reserves [`SPARSE_OFFSETS_INLINE_CAP`] slots on
154    /// the first insert.
155    inline: Vec<(ColumnId, usize)>,
156    /// Overflow for columns beyond the inline capacity. Lazily allocated.
157    overflow: HashMap<ColumnId, usize>,
158    /// Next byte position in the pk to resume parsing from.
159    cursor: usize,
160    /// True once the decoder has walked past the last tag column (or stopped
161    /// on an unknown column id); no further offsets can be discovered.
162    finished: bool,
163}
164
165impl Default for SparseOffsetsCache {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171impl SparseOffsetsCache {
172    pub fn new() -> Self {
173        Self {
174            inline: Vec::new(),
175            overflow: HashMap::new(),
176            cursor: TAGS_START_OFFSET,
177            finished: false,
178        }
179    }
180
181    pub fn clear(&mut self) {
182        self.inline.clear();
183        self.overflow.clear();
184        self.cursor = TAGS_START_OFFSET;
185        self.finished = false;
186    }
187
188    /// Returns the cached offset for `column_id`, if any.
189    fn get(&self, column_id: ColumnId) -> Option<usize> {
190        for entry in &self.inline {
191            if entry.0 == column_id {
192                return Some(entry.1);
193            }
194        }
195        self.overflow.get(&column_id).copied()
196    }
197
198    /// Records a new `(column_id, offset)` entry.
199    fn insert(&mut self, column_id: ColumnId, offset: usize) {
200        if self.inline.len() < SPARSE_OFFSETS_INLINE_CAP {
201            if self.inline.capacity() == 0 {
202                self.inline.reserve_exact(SPARSE_OFFSETS_INLINE_CAP);
203            }
204            self.inline.push((column_id, offset));
205        } else {
206            self.overflow.insert(column_id, offset);
207        }
208    }
209
210    #[cfg(test)]
211    fn contains(&self, column_id: ColumnId) -> bool {
212        self.get(column_id).is_some()
213    }
214}
215
216impl SparsePrimaryKeyCodec {
217    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
218    pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
219        let columns = columns_ids.collect();
220        Self {
221            inner: Arc::new(SparsePrimaryKeyCodecInner {
222                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
223                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
224                label_field: SortField::new(ConcreteDataType::string_datatype()),
225                columns: Some(columns),
226            }),
227        }
228    }
229
230    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
231    pub fn new(region_metadata: &RegionMetadataRef) -> Self {
232        Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id))
233    }
234
235    /// Returns a new [`SparsePrimaryKeyCodec`] instance.
236    ///
237    /// It treats all unknown columns as primary key(label field).
238    pub fn schemaless() -> Self {
239        Self {
240            inner: Arc::new(SparsePrimaryKeyCodecInner {
241                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
242                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
243                label_field: SortField::new(ConcreteDataType::string_datatype()),
244                columns: None,
245            }),
246        }
247    }
248
249    /// Creates a new [`SparsePrimaryKeyCodec`] instance with additional label `fields`.
250    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
251        Self {
252            inner: Arc::new(SparsePrimaryKeyCodecInner {
253                columns: Some(fields.iter().map(|f| f.0).collect()),
254                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
255                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
256                label_field: SortField::new(ConcreteDataType::string_datatype()),
257            }),
258        }
259    }
260
261    /// Returns the field of the given column id.
262    fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
263        // if the `columns` is not specified, all unknown columns is primary key(label field).
264        if let Some(columns) = &self.inner.columns
265            && !columns.contains(&column_id)
266        {
267            return None;
268        }
269
270        match column_id {
271            RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
272            RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
273            _ => Some(&self.inner.label_field),
274        }
275    }
276
277    /// Encodes the given bytes into a [`SparseValues`].
278    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
279    where
280        I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
281    {
282        let mut serializer = Serializer::new(buffer);
283        for (column_id, value) in row {
284            if value.is_null() {
285                continue;
286            }
287
288            if let Some(field) = self.get_field(column_id) {
289                column_id
290                    .serialize(&mut serializer)
291                    .context(SerializeFieldSnafu)?;
292                field.serialize(&mut serializer, &value)?;
293            } else {
294                // TODO(weny): handle the error.
295                common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
296            }
297        }
298        Ok(())
299    }
300
301    pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
302    where
303        I: Iterator<Item = (ColumnId, &'a [u8])>,
304    {
305        for (tag_column_id, tag_value) in row {
306            let value_len = tag_value.len();
307            buffer.reserve(6 + value_len / 8 * 9);
308            buffer.put_u32(tag_column_id);
309            buffer.put_u8(1);
310            buffer.put_u8(!tag_value.is_empty() as u8);
311
312            // Manual implementation of memcomparable::ser::Serializer::serialize_bytes
313            // to avoid byte-by-byte put.
314            let mut len = 0;
315            let num_chucks = value_len / 8;
316            let remainder = value_len % 8;
317
318            for idx in 0..num_chucks {
319                buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]);
320                len += 8;
321                // append an extra byte that signals the number of significant bytes in this chunk
322                // 1-8: many bytes were significant and this group is the last group
323                // 9: all 8 bytes were significant and there is more data to come
324                let extra = if len == value_len { 8 } else { 9 };
325                buffer.put_u8(extra);
326            }
327
328            if remainder != 0 {
329                buffer.extend_from_slice(&tag_value[len..value_len]);
330                buffer.put_bytes(0, 8 - remainder);
331                buffer.put_u8(remainder as u8);
332            }
333        }
334        Ok(())
335    }
336
337    /// Encodes the given bytes into a [`SparseValues`].
338    pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec<u8>) -> Result<()> {
339        buffer.reserve_exact(22);
340        buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID);
341        buffer.put_u8(1);
342        buffer.put_u32(table_id);
343        buffer.put_u32(RESERVED_COLUMN_ID_TSID);
344        buffer.put_u8(1);
345        buffer.put_u64(tsid);
346        Ok(())
347    }
348
349    /// Decodes the given bytes into a [`SparseValues`].
350    fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
351        let mut deserializer = Deserializer::new(bytes);
352        let mut values = SparseValues::with_capacity(16);
353
354        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
355        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
356        values.insert(column_id, value);
357
358        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
359        let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
360        values.insert(column_id, value);
361        while deserializer.has_remaining() {
362            let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
363            let value = self.inner.label_field.deserialize(&mut deserializer)?;
364            values.insert(column_id, value);
365        }
366
367        Ok(values)
368    }
369
370    /// Decodes the given bytes into a [`Value`].
371    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
372        let mut deserializer = Deserializer::new(bytes);
373        // Skip the column id.
374        deserializer.advance(COLUMN_ID_ENCODE_SIZE);
375        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
376        Ok(Some(value))
377    }
378
379    /// Returns the offset of the given column id in the given primary key.
380    ///
381    /// The pk must start with the table_id + tsid prefix written by
382    /// `encode_internal`.
383    ///
384    /// # Panics
385    ///
386    /// Panics if `pk` is not a well-formed sparse primary key produced by
387    /// this codec (e.g. truncated or otherwise malformed bytes).
388    pub fn has_column(
389        &self,
390        pk: &[u8],
391        cache: &mut SparseOffsetsCache,
392        column_id: ColumnId,
393    ) -> Option<usize> {
394        // Decoding is lazy: on each call we only advance the cache's cursor as
395        // far as needed to answer the query. A column that has already been
396        // seen returns immediately; a column we haven't reached yet causes the
397        // parser to resume from `cache.cursor` and stop as soon as the column
398        // is located. Once the cursor walks off the end (or hits an unknown
399        // column id) the cache is marked finished, so subsequent misses are
400        // O(1).
401        // table_id and tsid are at fixed offsets.
402        match column_id {
403            RESERVED_COLUMN_ID_TABLE_ID => return Some(TABLE_ID_VALUE_OFFSET),
404            RESERVED_COLUMN_ID_TSID => return Some(TSID_VALUE_OFFSET),
405            _ => {}
406        }
407
408        if let Some(offset) = cache.get(column_id) {
409            return Some(offset);
410        }
411        if cache.finished {
412            return None;
413        }
414
415        let mut deserializer = Deserializer::new(pk);
416        deserializer.advance(cache.cursor);
417        let mut offset = cache.cursor;
418        while deserializer.has_remaining() {
419            let col = u32::deserialize(&mut deserializer).unwrap();
420            offset += COLUMN_ID_ENCODE_SIZE;
421            let value_offset = offset;
422            cache.insert(col, value_offset);
423            let Some(field) = self.get_field(col) else {
424                cache.finished = true;
425                cache.cursor = offset;
426                return None;
427            };
428
429            let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
430            offset += skip;
431            cache.cursor = offset;
432            if col == column_id {
433                return Some(value_offset);
434            }
435        }
436
437        cache.finished = true;
438        None
439    }
440
441    /// Decode value at `offset` in `pk`.
442    pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
443        let mut deserializer = Deserializer::new(pk);
444        deserializer.advance(offset);
445        // Safety: checked by `has_column`
446        let field = self.get_field(column_id).unwrap();
447        field.deserialize(&mut deserializer)
448    }
449
450    /// Returns the encoded bytes of the given `column_id` in `pk`.
451    ///
452    /// Returns `Ok(None)` if the `column_id` is missing in `pk`.
453    pub fn encoded_value_for_column<'a>(
454        &self,
455        pk: &'a [u8],
456        cache: &mut SparseOffsetsCache,
457        column_id: ColumnId,
458    ) -> Result<Option<&'a [u8]>> {
459        let Some(offset) = self.has_column(pk, cache, column_id) else {
460            return Ok(None);
461        };
462
463        let Some(field) = self.get_field(column_id) else {
464            return Ok(None);
465        };
466
467        let mut deserializer = Deserializer::new(pk);
468        deserializer.advance(offset);
469        let len = field.skip_deserialize(pk, &mut deserializer)?;
470        Ok(Some(&pk[offset..offset + len]))
471    }
472}
473
474impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
475    fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
476        UnsupportedOperationSnafu {
477            err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
478        }
479        .fail()
480    }
481
482    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
483        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
484    }
485
486    fn encode_value_refs(
487        &self,
488        values: &[(ColumnId, ValueRef)],
489        buffer: &mut Vec<u8>,
490    ) -> Result<()> {
491        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.clone())), buffer)
492    }
493
494    fn estimated_size(&self) -> Option<usize> {
495        None
496    }
497
498    fn num_fields(&self) -> Option<usize> {
499        None
500    }
501
502    fn encoding(&self) -> PrimaryKeyEncoding {
503        PrimaryKeyEncoding::Sparse
504    }
505
506    fn primary_key_filter(
507        &self,
508        metadata: &RegionMetadataRef,
509        filters: Arc<Vec<SimpleFilterEvaluator>>,
510        skip_partition_column: bool,
511    ) -> Box<dyn PrimaryKeyFilter> {
512        Box::new(SparsePrimaryKeyFilter::new(
513            metadata.clone(),
514            filters,
515            self.clone(),
516            skip_partition_column,
517        ))
518    }
519
520    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
521        Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
522    }
523
524    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
525        self.decode_leftmost(bytes)
526    }
527}
528
529/// Field with column id.
530pub struct FieldWithId {
531    pub field: SortField,
532    pub column_id: ColumnId,
533}
534
535/// A special encoder for memtable.
536pub struct SparseEncoder {
537    fields: Vec<FieldWithId>,
538}
539
540impl SparseEncoder {
541    pub fn new(fields: Vec<FieldWithId>) -> Self {
542        Self { fields }
543    }
544
545    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
546    where
547        I: Iterator<Item = ValueRef<'a>>,
548    {
549        let mut serializer = Serializer::new(buffer);
550        for (value, field) in row.zip(self.fields.iter()) {
551            if !value.is_null() {
552                field
553                    .column_id
554                    .serialize(&mut serializer)
555                    .context(SerializeFieldSnafu)?;
556                field.field.serialize(&mut serializer, &value)?;
557            }
558        }
559        Ok(())
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use std::sync::Arc;
566
567    use api::v1::SemanticType;
568    use common_query::prelude::{greptime_timestamp, greptime_value};
569    use common_time::Timestamp;
570    use common_time::timestamp::TimeUnit;
571    use datatypes::schema::ColumnSchema;
572    use datatypes::value::{OrderedFloat, Value};
573    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
574    use store_api::metric_engine_consts::{
575        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
576    };
577    use store_api::storage::{ColumnId, RegionId};
578
579    use super::*;
580
581    fn test_region_metadata() -> RegionMetadataRef {
582        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
583        builder
584            .push_column_metadata(ColumnMetadata {
585                column_schema: ColumnSchema::new(
586                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
587                    ConcreteDataType::uint32_datatype(),
588                    false,
589                ),
590                semantic_type: SemanticType::Tag,
591                column_id: ReservedColumnId::table_id(),
592            })
593            .push_column_metadata(ColumnMetadata {
594                column_schema: ColumnSchema::new(
595                    DATA_SCHEMA_TSID_COLUMN_NAME,
596                    ConcreteDataType::uint64_datatype(),
597                    false,
598                ),
599                semantic_type: SemanticType::Tag,
600                column_id: ReservedColumnId::tsid(),
601            })
602            .push_column_metadata(ColumnMetadata {
603                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
604                semantic_type: SemanticType::Tag,
605                column_id: 1,
606            })
607            .push_column_metadata(ColumnMetadata {
608                column_schema: ColumnSchema::new(
609                    "namespace",
610                    ConcreteDataType::string_datatype(),
611                    true,
612                ),
613                semantic_type: SemanticType::Tag,
614                column_id: 2,
615            })
616            .push_column_metadata(ColumnMetadata {
617                column_schema: ColumnSchema::new(
618                    "container",
619                    ConcreteDataType::string_datatype(),
620                    true,
621                ),
622                semantic_type: SemanticType::Tag,
623                column_id: 3,
624            })
625            .push_column_metadata(ColumnMetadata {
626                column_schema: ColumnSchema::new(
627                    "pod_name",
628                    ConcreteDataType::string_datatype(),
629                    true,
630                ),
631                semantic_type: SemanticType::Tag,
632                column_id: 4,
633            })
634            .push_column_metadata(ColumnMetadata {
635                column_schema: ColumnSchema::new(
636                    "pod_ip",
637                    ConcreteDataType::string_datatype(),
638                    true,
639                ),
640                semantic_type: SemanticType::Tag,
641                column_id: 5,
642            })
643            .push_column_metadata(ColumnMetadata {
644                column_schema: ColumnSchema::new(
645                    greptime_value(),
646                    ConcreteDataType::float64_datatype(),
647                    false,
648                ),
649                semantic_type: SemanticType::Field,
650                column_id: 6,
651            })
652            .push_column_metadata(ColumnMetadata {
653                column_schema: ColumnSchema::new(
654                    greptime_timestamp(),
655                    ConcreteDataType::timestamp_nanosecond_datatype(),
656                    false,
657                ),
658                semantic_type: SemanticType::Timestamp,
659                column_id: 7,
660            })
661            .primary_key(vec![
662                ReservedColumnId::table_id(),
663                ReservedColumnId::tsid(),
664                1,
665                2,
666                3,
667                4,
668                5,
669            ]);
670        let metadata = builder.build().unwrap();
671        Arc::new(metadata)
672    }
673
674    #[test]
675    fn test_sparse_value_new_and_get_or_null() {
676        let mut sparse_value = SparseValues::new();
677        sparse_value.insert(1, Value::Int32(42));
678
679        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
680        assert_eq!(sparse_value.get_or_null(2), &Value::Null);
681    }
682
683    #[test]
684    fn test_sparse_value_insert() {
685        let mut sparse_value = SparseValues::new();
686        sparse_value.insert(1, Value::Int32(42));
687
688        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
689    }
690
691    fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
692        vec![
693            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
694            (
695                RESERVED_COLUMN_ID_TSID,
696                ValueRef::UInt64(123843349035232323),
697            ),
698            // label: pod
699            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
700            // label: namespace
701            (2, ValueRef::String("greptime-cluster")),
702            // label: container
703            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
704            // label: pod_name
705            (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
706            // label: pod_ip
707            (5, ValueRef::String("10.10.10.10")),
708            // field: greptime_value
709            (6, ValueRef::Float64(OrderedFloat(1.0))),
710            // field: greptime_timestamp
711            (
712                7,
713                ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
714            ),
715        ]
716    }
717
718    #[test]
719    fn test_encode_by_short_cuts() {
720        let region_metadata = test_region_metadata();
721        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
722        let mut buffer = Vec::new();
723        let internal_columns = [
724            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)),
725            (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)),
726        ];
727        let tags = [
728            (1, "greptime-frontend-6989d9899-22222"),
729            (2, "greptime-cluster"),
730            (3, "greptime-frontend-6989d9899-22222"),
731            (4, "greptime-frontend-6989d9899-22222"),
732            (5, "10.10.10.10"),
733        ];
734        codec
735            .encode_to_vec(internal_columns.into_iter(), &mut buffer)
736            .unwrap();
737        codec
738            .encode_to_vec(
739                tags.iter()
740                    .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
741                &mut buffer,
742            )
743            .unwrap();
744
745        let mut buffer_by_raw_encoding = Vec::new();
746        codec
747            .encode_internal(1024, 42, &mut buffer_by_raw_encoding)
748            .unwrap();
749        let tags: Vec<_> = tags
750            .into_iter()
751            .map(|(col_id, tag_value)| (col_id, tag_value.as_bytes()))
752            .collect();
753        codec
754            .encode_raw_tag_value(
755                tags.iter().map(|(c, b)| (*c, *b)),
756                &mut buffer_by_raw_encoding,
757            )
758            .unwrap();
759        assert_eq!(buffer, buffer_by_raw_encoding);
760    }
761
762    #[test]
763    fn test_encode_to_vec() {
764        let region_metadata = test_region_metadata();
765        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
766        let mut buffer = Vec::new();
767
768        let row = test_row();
769        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
770        assert!(!buffer.is_empty());
771        let sparse_value = codec.decode_sparse(&buffer).unwrap();
772        assert_eq!(
773            sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
774            &Value::UInt32(42)
775        );
776        assert_eq!(
777            sparse_value.get_or_null(1),
778            &Value::String("greptime-frontend-6989d9899-22222".into())
779        );
780        assert_eq!(
781            sparse_value.get_or_null(2),
782            &Value::String("greptime-cluster".into())
783        );
784        assert_eq!(
785            sparse_value.get_or_null(3),
786            &Value::String("greptime-frontend-6989d9899-22222".into())
787        );
788        assert_eq!(
789            sparse_value.get_or_null(4),
790            &Value::String("greptime-frontend-6989d9899-22222".into())
791        );
792        assert_eq!(
793            sparse_value.get_or_null(5),
794            &Value::String("10.10.10.10".into())
795        );
796    }
797
798    #[test]
799    fn test_decode_leftmost() {
800        let region_metadata = test_region_metadata();
801        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
802        let mut buffer = Vec::new();
803        let row = test_row();
804        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
805        assert!(!buffer.is_empty());
806        let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
807        assert_eq!(result, Value::UInt32(42));
808    }
809
810    #[test]
811    fn test_has_column() {
812        let region_metadata = test_region_metadata();
813        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
814        let mut buffer = Vec::new();
815        let row = test_row();
816        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
817        assert!(!buffer.is_empty());
818
819        let mut offsets_map = SparseOffsetsCache::new();
820        for column_id in [
821            RESERVED_COLUMN_ID_TABLE_ID,
822            RESERVED_COLUMN_ID_TSID,
823            1,
824            2,
825            3,
826            4,
827            5,
828        ] {
829            let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
830            assert!(offset.is_some());
831        }
832
833        let offset = codec.has_column(&buffer, &mut offsets_map, 6);
834        assert!(offset.is_none());
835    }
836
837    #[test]
838    fn test_has_column_lazy_resume() {
839        let region_metadata = test_region_metadata();
840        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
841        let mut buffer = Vec::new();
842        codec
843            .encode_to_vec(test_row().into_iter(), &mut buffer)
844            .unwrap();
845
846        let mut cache = SparseOffsetsCache::new();
847        // Look up an early column: only a prefix of tags is decoded.
848        assert!(codec.has_column(&buffer, &mut cache, 1).is_some());
849        assert!(!cache.finished);
850        assert!(cache.contains(1));
851        assert!(!cache.contains(5));
852
853        // A later column resumes from the cursor.
854        assert!(codec.has_column(&buffer, &mut cache, 5).is_some());
855        assert!(cache.contains(5));
856
857        // An earlier column that was already cached still resolves.
858        assert!(codec.has_column(&buffer, &mut cache, 2).is_some());
859
860        // A non-existent column walks off the end and marks the cache finished.
861        assert!(codec.has_column(&buffer, &mut cache, 999).is_none());
862        assert!(cache.finished);
863        // Further misses are O(1).
864        assert!(codec.has_column(&buffer, &mut cache, 998).is_none());
865    }
866
867    #[test]
868    fn test_decode_value_at() {
869        let region_metadata = test_region_metadata();
870        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
871        let mut buffer = Vec::new();
872        let row = test_row();
873        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
874        assert!(!buffer.is_empty());
875
876        let row = test_row();
877        let mut offsets_map = SparseOffsetsCache::new();
878        for column_id in [
879            RESERVED_COLUMN_ID_TABLE_ID,
880            RESERVED_COLUMN_ID_TSID,
881            1,
882            2,
883            3,
884            4,
885            5,
886        ] {
887            let offset = codec
888                .has_column(&buffer, &mut offsets_map, column_id)
889                .unwrap();
890            let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
891            let expected_value = row
892                .iter()
893                .find(|(id, _)| *id == column_id)
894                .unwrap()
895                .1
896                .clone();
897            assert_eq!(value.as_value_ref(), expected_value);
898        }
899    }
900
901    #[test]
902    fn test_encoded_value_for_column() {
903        let region_metadata = test_region_metadata();
904        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
905        let mut buffer = Vec::new();
906        let row = test_row();
907        codec
908            .encode_to_vec(row.clone().into_iter(), &mut buffer)
909            .unwrap();
910        assert!(!buffer.is_empty());
911
912        let mut offsets_map = SparseOffsetsCache::new();
913        for column_id in [
914            RESERVED_COLUMN_ID_TABLE_ID,
915            RESERVED_COLUMN_ID_TSID,
916            1,
917            2,
918            3,
919            4,
920            5,
921        ] {
922            let encoded_value = codec
923                .encoded_value_for_column(&buffer, &mut offsets_map, column_id)
924                .unwrap()
925                .unwrap();
926            let expected_value = row
927                .iter()
928                .find(|(id, _)| *id == column_id)
929                .unwrap()
930                .1
931                .clone();
932            let data_type = match column_id {
933                RESERVED_COLUMN_ID_TABLE_ID => ConcreteDataType::uint32_datatype(),
934                RESERVED_COLUMN_ID_TSID => ConcreteDataType::uint64_datatype(),
935                _ => ConcreteDataType::string_datatype(),
936            };
937            let field = SortField::new(data_type);
938            let mut expected_encoded = Vec::new();
939            let mut serializer = Serializer::new(&mut expected_encoded);
940            field.serialize(&mut serializer, &expected_value).unwrap();
941            assert_eq!(encoded_value, expected_encoded.as_slice());
942        }
943
944        for column_id in [6_u32, 7_u32, 999_u32] {
945            let encoded_value = codec
946                .encoded_value_for_column(&buffer, &mut offsets_map, column_id)
947                .unwrap();
948            assert!(encoded_value.is_none());
949        }
950    }
951}