mito_codec/row_converter/
sparse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::prelude::ConcreteDataType;
20use datatypes::value::{Value, ValueRef};
21use memcomparable::{Deserializer, Serializer};
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::consts::ReservedColumnId;
27use store_api::storage::ColumnId;
28
29use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
30use crate::key_values::KeyValue;
31use crate::primary_key_filter::SparsePrimaryKeyFilter;
32use crate::row_converter::dense::SortField;
33use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
34
35/// A codec for sparse key of metrics.
36/// It requires the input primary key columns are sorted by the column name in lexicographical order.
37/// It encodes the column id of the physical region.
38#[derive(Clone, Debug)]
39pub struct SparsePrimaryKeyCodec {
40    inner: Arc<SparsePrimaryKeyCodecInner>,
41}
42
43#[derive(Debug)]
44struct SparsePrimaryKeyCodecInner {
45    // Internal fields
46    table_id_field: SortField,
47    // Internal fields
48    tsid_field: SortField,
49    // User defined label field
50    label_field: SortField,
51    // Columns in primary key
52    //
53    // None means all unknown columns is primary key(`Self::label_field`).
54    columns: Option<HashSet<ColumnId>>,
55}
56
57/// Sparse values representation.
58///
59/// A map of [`ColumnId`] to [`Value`].
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct SparseValues {
62    values: HashMap<ColumnId, Value>,
63}
64
65impl SparseValues {
66    /// Creates a new [`SparseValues`] instance.
67    pub fn new(values: HashMap<ColumnId, Value>) -> Self {
68        Self { values }
69    }
70
71    /// Returns the value of the given column, or [`Value::Null`] if the column is not present.
72    pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
73        self.values.get(&column_id).unwrap_or(&Value::Null)
74    }
75
76    /// Returns the value of the given column, or [`None`] if the column is not present.
77    pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
78        self.values.get(column_id)
79    }
80
81    /// Inserts a new value into the [`SparseValues`].
82    pub fn insert(&mut self, column_id: ColumnId, value: Value) {
83        self.values.insert(column_id, value);
84    }
85}
86
87/// The column id of the tsid.
88const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
89/// The column id of the table id.
90const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
91/// The size of the column id in the encoded sparse row.
92pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
93
94impl SparsePrimaryKeyCodec {
95    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
96    pub fn new(region_metadata: &RegionMetadataRef) -> Self {
97        Self {
98            inner: Arc::new(SparsePrimaryKeyCodecInner {
99                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
100                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
101                label_field: SortField::new(ConcreteDataType::string_datatype()),
102                columns: Some(
103                    region_metadata
104                        .primary_key_columns()
105                        .map(|c| c.column_id)
106                        .collect(),
107                ),
108            }),
109        }
110    }
111
112    /// Returns a new [`SparsePrimaryKeyCodec`] instance.
113    ///
114    /// It treats all unknown columns as primary key(label field).
115    pub fn schemaless() -> Self {
116        Self {
117            inner: Arc::new(SparsePrimaryKeyCodecInner {
118                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
119                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
120                label_field: SortField::new(ConcreteDataType::string_datatype()),
121                columns: None,
122            }),
123        }
124    }
125
126    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
127        Self {
128            inner: Arc::new(SparsePrimaryKeyCodecInner {
129                columns: Some(fields.iter().map(|f| f.0).collect()),
130                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
131                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
132                label_field: SortField::new(ConcreteDataType::string_datatype()),
133            }),
134        }
135    }
136
137    /// Returns the field of the given column id.
138    fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
139        // if the `columns` is not specified, all unknown columns is primary key(label field).
140        if let Some(columns) = &self.inner.columns {
141            if !columns.contains(&column_id) {
142                return None;
143            }
144        }
145
146        match column_id {
147            RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
148            RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
149            _ => Some(&self.inner.label_field),
150        }
151    }
152
153    /// Encodes the given bytes into a [`SparseValues`].
154    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
155    where
156        I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
157    {
158        let mut serializer = Serializer::new(buffer);
159        for (column_id, value) in row {
160            if value.is_null() {
161                continue;
162            }
163
164            if let Some(field) = self.get_field(column_id) {
165                column_id
166                    .serialize(&mut serializer)
167                    .context(SerializeFieldSnafu)?;
168                field.serialize(&mut serializer, &value)?;
169            } else {
170                // TODO(weny): handle the error.
171                common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
172            }
173        }
174        Ok(())
175    }
176
177    /// Decodes the given bytes into a [`SparseValues`].
178    fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
179        let mut deserializer = Deserializer::new(bytes);
180        let mut values = SparseValues::new(HashMap::new());
181
182        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
183        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
184        values.insert(column_id, value);
185
186        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
187        let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
188        values.insert(column_id, value);
189        while deserializer.has_remaining() {
190            let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
191            let value = self.inner.label_field.deserialize(&mut deserializer)?;
192            values.insert(column_id, value);
193        }
194
195        Ok(values)
196    }
197
198    /// Decodes the given bytes into a [`Value`].
199    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
200        let mut deserializer = Deserializer::new(bytes);
201        // Skip the column id.
202        deserializer.advance(COLUMN_ID_ENCODE_SIZE);
203        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
204        Ok(Some(value))
205    }
206
207    /// Returns the offset of the given column id in the given primary key.
208    pub fn has_column(
209        &self,
210        pk: &[u8],
211        offsets_map: &mut HashMap<u32, usize>,
212        column_id: ColumnId,
213    ) -> Option<usize> {
214        if offsets_map.is_empty() {
215            let mut deserializer = Deserializer::new(pk);
216            let mut offset = 0;
217            while deserializer.has_remaining() {
218                let column_id = u32::deserialize(&mut deserializer).unwrap();
219                offset += 4;
220                offsets_map.insert(column_id, offset);
221                let Some(field) = self.get_field(column_id) else {
222                    break;
223                };
224
225                let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
226                offset += skip;
227            }
228
229            offsets_map.get(&column_id).copied()
230        } else {
231            offsets_map.get(&column_id).copied()
232        }
233    }
234
235    /// Decode value at `offset` in `pk`.
236    pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
237        let mut deserializer = Deserializer::new(pk);
238        deserializer.advance(offset);
239        // Safety: checked by `has_column`
240        let field = self.get_field(column_id).unwrap();
241        field.deserialize(&mut deserializer)
242    }
243}
244
245impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
246    fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
247        UnsupportedOperationSnafu {
248            err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
249        }
250        .fail()
251    }
252
253    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
254        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
255    }
256
257    fn encode_value_refs(
258        &self,
259        values: &[(ColumnId, ValueRef)],
260        buffer: &mut Vec<u8>,
261    ) -> Result<()> {
262        self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
263    }
264
265    fn estimated_size(&self) -> Option<usize> {
266        None
267    }
268
269    fn num_fields(&self) -> Option<usize> {
270        None
271    }
272
273    fn encoding(&self) -> PrimaryKeyEncoding {
274        PrimaryKeyEncoding::Sparse
275    }
276
277    fn primary_key_filter(
278        &self,
279        metadata: &RegionMetadataRef,
280        filters: Arc<Vec<SimpleFilterEvaluator>>,
281    ) -> Box<dyn PrimaryKeyFilter> {
282        Box::new(SparsePrimaryKeyFilter::new(
283            metadata.clone(),
284            filters,
285            self.clone(),
286        ))
287    }
288
289    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
290        Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
291    }
292
293    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
294        self.decode_leftmost(bytes)
295    }
296}
297
298/// Field with column id.
299pub struct FieldWithId {
300    pub field: SortField,
301    pub column_id: ColumnId,
302}
303
304/// A special encoder for memtable.
305pub struct SparseEncoder {
306    fields: Vec<FieldWithId>,
307}
308
309impl SparseEncoder {
310    pub fn new(fields: Vec<FieldWithId>) -> Self {
311        Self { fields }
312    }
313
314    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
315    where
316        I: Iterator<Item = ValueRef<'a>>,
317    {
318        let mut serializer = Serializer::new(buffer);
319        for (value, field) in row.zip(self.fields.iter()) {
320            if !value.is_null() {
321                field
322                    .column_id
323                    .serialize(&mut serializer)
324                    .context(SerializeFieldSnafu)?;
325                field.field.serialize(&mut serializer, &value)?;
326            }
327        }
328        Ok(())
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use std::sync::Arc;
335
336    use api::v1::SemanticType;
337    use common_time::timestamp::TimeUnit;
338    use common_time::Timestamp;
339    use datatypes::schema::ColumnSchema;
340    use datatypes::value::{OrderedFloat, Value};
341    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
342    use store_api::metric_engine_consts::{
343        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
344    };
345    use store_api::storage::{ColumnId, RegionId};
346
347    use super::*;
348
349    fn test_region_metadata() -> RegionMetadataRef {
350        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
351        builder
352            .push_column_metadata(ColumnMetadata {
353                column_schema: ColumnSchema::new(
354                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
355                    ConcreteDataType::uint32_datatype(),
356                    false,
357                ),
358                semantic_type: SemanticType::Tag,
359                column_id: ReservedColumnId::table_id(),
360            })
361            .push_column_metadata(ColumnMetadata {
362                column_schema: ColumnSchema::new(
363                    DATA_SCHEMA_TSID_COLUMN_NAME,
364                    ConcreteDataType::uint64_datatype(),
365                    false,
366                ),
367                semantic_type: SemanticType::Tag,
368                column_id: ReservedColumnId::tsid(),
369            })
370            .push_column_metadata(ColumnMetadata {
371                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
372                semantic_type: SemanticType::Tag,
373                column_id: 1,
374            })
375            .push_column_metadata(ColumnMetadata {
376                column_schema: ColumnSchema::new(
377                    "namespace",
378                    ConcreteDataType::string_datatype(),
379                    true,
380                ),
381                semantic_type: SemanticType::Tag,
382                column_id: 2,
383            })
384            .push_column_metadata(ColumnMetadata {
385                column_schema: ColumnSchema::new(
386                    "container",
387                    ConcreteDataType::string_datatype(),
388                    true,
389                ),
390                semantic_type: SemanticType::Tag,
391                column_id: 3,
392            })
393            .push_column_metadata(ColumnMetadata {
394                column_schema: ColumnSchema::new(
395                    "pod_name",
396                    ConcreteDataType::string_datatype(),
397                    true,
398                ),
399                semantic_type: SemanticType::Tag,
400                column_id: 4,
401            })
402            .push_column_metadata(ColumnMetadata {
403                column_schema: ColumnSchema::new(
404                    "pod_ip",
405                    ConcreteDataType::string_datatype(),
406                    true,
407                ),
408                semantic_type: SemanticType::Tag,
409                column_id: 5,
410            })
411            .push_column_metadata(ColumnMetadata {
412                column_schema: ColumnSchema::new(
413                    "greptime_value",
414                    ConcreteDataType::float64_datatype(),
415                    false,
416                ),
417                semantic_type: SemanticType::Field,
418                column_id: 6,
419            })
420            .push_column_metadata(ColumnMetadata {
421                column_schema: ColumnSchema::new(
422                    "greptime_timestamp",
423                    ConcreteDataType::timestamp_nanosecond_datatype(),
424                    false,
425                ),
426                semantic_type: SemanticType::Timestamp,
427                column_id: 7,
428            })
429            .primary_key(vec![
430                ReservedColumnId::table_id(),
431                ReservedColumnId::tsid(),
432                1,
433                2,
434                3,
435                4,
436                5,
437            ]);
438        let metadata = builder.build().unwrap();
439        Arc::new(metadata)
440    }
441
442    #[test]
443    fn test_sparse_value_new_and_get_or_null() {
444        let mut values = HashMap::new();
445        values.insert(1, Value::Int32(42));
446        let sparse_value = SparseValues::new(values);
447
448        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
449        assert_eq!(sparse_value.get_or_null(2), &Value::Null);
450    }
451
452    #[test]
453    fn test_sparse_value_insert() {
454        let mut sparse_value = SparseValues::new(HashMap::new());
455        sparse_value.insert(1, Value::Int32(42));
456
457        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
458    }
459
460    fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
461        vec![
462            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
463            (
464                RESERVED_COLUMN_ID_TSID,
465                ValueRef::UInt64(123843349035232323),
466            ),
467            // label: pod
468            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
469            // label: namespace
470            (2, ValueRef::String("greptime-cluster")),
471            // label: container
472            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
473            // label: pod_name
474            (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
475            // label: pod_ip
476            (5, ValueRef::String("10.10.10.10")),
477            // field: greptime_value
478            (6, ValueRef::Float64(OrderedFloat(1.0))),
479            // field: greptime_timestamp
480            (
481                7,
482                ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
483            ),
484        ]
485    }
486
487    #[test]
488    fn test_encode_to_vec() {
489        let region_metadata = test_region_metadata();
490        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
491        let mut buffer = Vec::new();
492
493        let row = test_row();
494        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
495        assert!(!buffer.is_empty());
496        let sparse_value = codec.decode_sparse(&buffer).unwrap();
497        assert_eq!(
498            sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
499            &Value::UInt32(42)
500        );
501        assert_eq!(
502            sparse_value.get_or_null(1),
503            &Value::String("greptime-frontend-6989d9899-22222".into())
504        );
505        assert_eq!(
506            sparse_value.get_or_null(2),
507            &Value::String("greptime-cluster".into())
508        );
509        assert_eq!(
510            sparse_value.get_or_null(3),
511            &Value::String("greptime-frontend-6989d9899-22222".into())
512        );
513        assert_eq!(
514            sparse_value.get_or_null(4),
515            &Value::String("greptime-frontend-6989d9899-22222".into())
516        );
517        assert_eq!(
518            sparse_value.get_or_null(5),
519            &Value::String("10.10.10.10".into())
520        );
521    }
522
523    #[test]
524    fn test_decode_leftmost() {
525        let region_metadata = test_region_metadata();
526        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
527        let mut buffer = Vec::new();
528        let row = test_row();
529        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
530        assert!(!buffer.is_empty());
531        let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
532        assert_eq!(result, Value::UInt32(42));
533    }
534
535    #[test]
536    fn test_has_column() {
537        let region_metadata = test_region_metadata();
538        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
539        let mut buffer = Vec::new();
540        let row = test_row();
541        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
542        assert!(!buffer.is_empty());
543
544        let mut offsets_map = HashMap::new();
545        for column_id in [
546            RESERVED_COLUMN_ID_TABLE_ID,
547            RESERVED_COLUMN_ID_TSID,
548            1,
549            2,
550            3,
551            4,
552            5,
553        ] {
554            let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
555            assert!(offset.is_some());
556        }
557
558        let offset = codec.has_column(&buffer, &mut offsets_map, 6);
559        assert!(offset.is_none());
560    }
561
562    #[test]
563    fn test_decode_value_at() {
564        let region_metadata = test_region_metadata();
565        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
566        let mut buffer = Vec::new();
567        let row = test_row();
568        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
569        assert!(!buffer.is_empty());
570
571        let row = test_row();
572        let mut offsets_map = HashMap::new();
573        for column_id in [
574            RESERVED_COLUMN_ID_TABLE_ID,
575            RESERVED_COLUMN_ID_TSID,
576            1,
577            2,
578            3,
579            4,
580            5,
581        ] {
582            let offset = codec
583                .has_column(&buffer, &mut offsets_map, column_id)
584                .unwrap();
585            let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
586            let expected_value = row.iter().find(|(id, _)| *id == column_id).unwrap().1;
587            assert_eq!(value.as_value_ref(), expected_value);
588        }
589    }
590}