mito2/row_converter/
sparse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use datatypes::prelude::ConcreteDataType;
20use datatypes::value::{Value, ValueRef};
21use memcomparable::{Deserializer, Serializer};
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::consts::ReservedColumnId;
27use store_api::storage::ColumnId;
28
29use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
30use crate::memtable::key_values::KeyValue;
31use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
32use crate::row_converter::dense::SortField;
33use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
34
35/// A codec for sparse key of metrics.
36/// It requires the input primary key columns are sorted by the column name in lexicographical order.
37/// It encodes the column id of the physical region.
38#[derive(Clone, Debug)]
39pub struct SparsePrimaryKeyCodec {
40    inner: Arc<SparsePrimaryKeyCodecInner>,
41}
42
43#[derive(Debug)]
44struct SparsePrimaryKeyCodecInner {
45    // Internal fields
46    table_id_field: SortField,
47    // Internal fields
48    tsid_field: SortField,
49    // User defined label field
50    label_field: SortField,
51    // Columns in primary key
52    //
53    // None means all unknown columns is primary key(`Self::label_field`).
54    columns: Option<HashSet<ColumnId>>,
55}
56
57/// Sparse values representation.
58///
59/// A map of [`ColumnId`] to [`Value`].
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct SparseValues {
62    values: HashMap<ColumnId, Value>,
63}
64
65impl SparseValues {
66    /// Creates a new [`SparseValues`] instance.
67    pub fn new(values: HashMap<ColumnId, Value>) -> Self {
68        Self { values }
69    }
70
71    /// Returns the value of the given column, or [`Value::Null`] if the column is not present.
72    pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
73        self.values.get(&column_id).unwrap_or(&Value::Null)
74    }
75
76    /// Returns the value of the given column, or [`None`] if the column is not present.
77    pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
78        self.values.get(column_id)
79    }
80
81    /// Inserts a new value into the [`SparseValues`].
82    pub fn insert(&mut self, column_id: ColumnId, value: Value) {
83        self.values.insert(column_id, value);
84    }
85}
86
87/// The column id of the tsid.
88const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
89/// The column id of the table id.
90const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
91/// The size of the column id in the encoded sparse row.
92pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
93
94impl SparsePrimaryKeyCodec {
95    /// Creates a new [`SparsePrimaryKeyCodec`] instance.
96    pub fn new(region_metadata: &RegionMetadataRef) -> Self {
97        Self {
98            inner: Arc::new(SparsePrimaryKeyCodecInner {
99                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
100                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
101                label_field: SortField::new(ConcreteDataType::string_datatype()),
102                columns: Some(
103                    region_metadata
104                        .primary_key_columns()
105                        .map(|c| c.column_id)
106                        .collect(),
107                ),
108            }),
109        }
110    }
111
112    /// Returns a new [`SparsePrimaryKeyCodec`] instance.
113    ///
114    /// It treats all unknown columns as primary key(label field).
115    pub fn schemaless() -> Self {
116        Self {
117            inner: Arc::new(SparsePrimaryKeyCodecInner {
118                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
119                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
120                label_field: SortField::new(ConcreteDataType::string_datatype()),
121                columns: None,
122            }),
123        }
124    }
125
126    pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
127        Self {
128            inner: Arc::new(SparsePrimaryKeyCodecInner {
129                columns: Some(fields.iter().map(|f| f.0).collect()),
130                table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
131                tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
132                label_field: SortField::new(ConcreteDataType::string_datatype()),
133            }),
134        }
135    }
136
137    /// Returns the field of the given column id.
138    fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
139        // if the `columns` is not specified, all unknown columns is primary key(label field).
140        if let Some(columns) = &self.inner.columns {
141            if !columns.contains(&column_id) {
142                return None;
143            }
144        }
145
146        match column_id {
147            RESERVED_COLUMN_ID_TABLE_ID => Some(&self.inner.table_id_field),
148            RESERVED_COLUMN_ID_TSID => Some(&self.inner.tsid_field),
149            _ => Some(&self.inner.label_field),
150        }
151    }
152
153    /// Encodes the given bytes into a [`SparseValues`].
154    pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
155    where
156        I: Iterator<Item = (ColumnId, ValueRef<'a>)>,
157    {
158        let mut serializer = Serializer::new(buffer);
159        for (column_id, value) in row {
160            if value.is_null() {
161                continue;
162            }
163
164            if let Some(field) = self.get_field(column_id) {
165                column_id
166                    .serialize(&mut serializer)
167                    .context(SerializeFieldSnafu)?;
168                field.serialize(&mut serializer, &value)?;
169            } else {
170                // TODO(weny): handle the error.
171                common_telemetry::warn!("Column {} is not in primary key, skipping", column_id);
172            }
173        }
174        Ok(())
175    }
176
177    /// Decodes the given bytes into a [`SparseValues`].
178    fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
179        let mut deserializer = Deserializer::new(bytes);
180        let mut values = SparseValues::new(HashMap::new());
181
182        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
183        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
184        values.insert(column_id, value);
185
186        let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
187        let value = self.inner.tsid_field.deserialize(&mut deserializer)?;
188        values.insert(column_id, value);
189        while deserializer.has_remaining() {
190            let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
191            let value = self.inner.label_field.deserialize(&mut deserializer)?;
192            values.insert(column_id, value);
193        }
194
195        Ok(values)
196    }
197
198    /// Decodes the given bytes into a [`Value`].
199    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
200        let mut deserializer = Deserializer::new(bytes);
201        // Skip the column id.
202        deserializer.advance(COLUMN_ID_ENCODE_SIZE);
203        let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
204        Ok(Some(value))
205    }
206
207    /// Returns the offset of the given column id in the given primary key.
208    pub(crate) fn has_column(
209        &self,
210        pk: &[u8],
211        offsets_map: &mut HashMap<u32, usize>,
212        column_id: ColumnId,
213    ) -> Option<usize> {
214        if offsets_map.is_empty() {
215            let mut deserializer = Deserializer::new(pk);
216            let mut offset = 0;
217            while deserializer.has_remaining() {
218                let column_id = u32::deserialize(&mut deserializer).unwrap();
219                offset += 4;
220                offsets_map.insert(column_id, offset);
221                let Some(field) = self.get_field(column_id) else {
222                    break;
223                };
224
225                let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
226                offset += skip;
227            }
228
229            offsets_map.get(&column_id).copied()
230        } else {
231            offsets_map.get(&column_id).copied()
232        }
233    }
234
235    /// Decode value at `offset` in `pk`.
236    pub(crate) fn decode_value_at(
237        &self,
238        pk: &[u8],
239        offset: usize,
240        column_id: ColumnId,
241    ) -> Result<Value> {
242        let mut deserializer = Deserializer::new(pk);
243        deserializer.advance(offset);
244        // Safety: checked by `has_column`
245        let field = self.get_field(column_id).unwrap();
246        field.deserialize(&mut deserializer)
247    }
248}
249
250impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
251    fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
252        UnsupportedOperationSnafu {
253            err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
254        }
255        .fail()
256    }
257
258    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
259        self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
260    }
261
262    fn encode_value_refs(
263        &self,
264        values: &[(ColumnId, ValueRef)],
265        buffer: &mut Vec<u8>,
266    ) -> Result<()> {
267        self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
268    }
269
270    fn estimated_size(&self) -> Option<usize> {
271        None
272    }
273
274    fn num_fields(&self) -> Option<usize> {
275        None
276    }
277
278    fn encoding(&self) -> PrimaryKeyEncoding {
279        PrimaryKeyEncoding::Sparse
280    }
281
282    fn primary_key_filter(
283        &self,
284        metadata: &RegionMetadataRef,
285        filters: Arc<Vec<SimpleFilterEvaluator>>,
286    ) -> Box<dyn PrimaryKeyFilter> {
287        Box::new(SparsePrimaryKeyFilter::new(
288            metadata.clone(),
289            filters,
290            self.clone(),
291        ))
292    }
293
294    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
295        Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
296    }
297
298    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
299        self.decode_leftmost(bytes)
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use std::sync::Arc;
306
307    use api::v1::SemanticType;
308    use common_time::timestamp::TimeUnit;
309    use common_time::Timestamp;
310    use datatypes::schema::ColumnSchema;
311    use datatypes::value::{OrderedFloat, Value};
312    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
313    use store_api::metric_engine_consts::{
314        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
315    };
316    use store_api::storage::{ColumnId, RegionId};
317
318    use super::*;
319
320    fn test_region_metadata() -> RegionMetadataRef {
321        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
322        builder
323            .push_column_metadata(ColumnMetadata {
324                column_schema: ColumnSchema::new(
325                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
326                    ConcreteDataType::uint32_datatype(),
327                    false,
328                ),
329                semantic_type: SemanticType::Tag,
330                column_id: ReservedColumnId::table_id(),
331            })
332            .push_column_metadata(ColumnMetadata {
333                column_schema: ColumnSchema::new(
334                    DATA_SCHEMA_TSID_COLUMN_NAME,
335                    ConcreteDataType::uint64_datatype(),
336                    false,
337                ),
338                semantic_type: SemanticType::Tag,
339                column_id: ReservedColumnId::tsid(),
340            })
341            .push_column_metadata(ColumnMetadata {
342                column_schema: ColumnSchema::new("pod", ConcreteDataType::string_datatype(), true),
343                semantic_type: SemanticType::Tag,
344                column_id: 1,
345            })
346            .push_column_metadata(ColumnMetadata {
347                column_schema: ColumnSchema::new(
348                    "namespace",
349                    ConcreteDataType::string_datatype(),
350                    true,
351                ),
352                semantic_type: SemanticType::Tag,
353                column_id: 2,
354            })
355            .push_column_metadata(ColumnMetadata {
356                column_schema: ColumnSchema::new(
357                    "container",
358                    ConcreteDataType::string_datatype(),
359                    true,
360                ),
361                semantic_type: SemanticType::Tag,
362                column_id: 3,
363            })
364            .push_column_metadata(ColumnMetadata {
365                column_schema: ColumnSchema::new(
366                    "pod_name",
367                    ConcreteDataType::string_datatype(),
368                    true,
369                ),
370                semantic_type: SemanticType::Tag,
371                column_id: 4,
372            })
373            .push_column_metadata(ColumnMetadata {
374                column_schema: ColumnSchema::new(
375                    "pod_ip",
376                    ConcreteDataType::string_datatype(),
377                    true,
378                ),
379                semantic_type: SemanticType::Tag,
380                column_id: 5,
381            })
382            .push_column_metadata(ColumnMetadata {
383                column_schema: ColumnSchema::new(
384                    "greptime_value",
385                    ConcreteDataType::float64_datatype(),
386                    false,
387                ),
388                semantic_type: SemanticType::Field,
389                column_id: 6,
390            })
391            .push_column_metadata(ColumnMetadata {
392                column_schema: ColumnSchema::new(
393                    "greptime_timestamp",
394                    ConcreteDataType::timestamp_nanosecond_datatype(),
395                    false,
396                ),
397                semantic_type: SemanticType::Timestamp,
398                column_id: 7,
399            })
400            .primary_key(vec![
401                ReservedColumnId::table_id(),
402                ReservedColumnId::tsid(),
403                1,
404                2,
405                3,
406                4,
407                5,
408            ]);
409        let metadata = builder.build().unwrap();
410        Arc::new(metadata)
411    }
412
413    #[test]
414    fn test_sparse_value_new_and_get_or_null() {
415        let mut values = HashMap::new();
416        values.insert(1, Value::Int32(42));
417        let sparse_value = SparseValues::new(values);
418
419        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
420        assert_eq!(sparse_value.get_or_null(2), &Value::Null);
421    }
422
423    #[test]
424    fn test_sparse_value_insert() {
425        let mut sparse_value = SparseValues::new(HashMap::new());
426        sparse_value.insert(1, Value::Int32(42));
427
428        assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
429    }
430
431    fn test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
432        vec![
433            (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(42)),
434            (
435                RESERVED_COLUMN_ID_TSID,
436                ValueRef::UInt64(123843349035232323),
437            ),
438            // label: pod
439            (1, ValueRef::String("greptime-frontend-6989d9899-22222")),
440            // label: namespace
441            (2, ValueRef::String("greptime-cluster")),
442            // label: container
443            (3, ValueRef::String("greptime-frontend-6989d9899-22222")),
444            // label: pod_name
445            (4, ValueRef::String("greptime-frontend-6989d9899-22222")),
446            // label: pod_ip
447            (5, ValueRef::String("10.10.10.10")),
448            // field: greptime_value
449            (6, ValueRef::Float64(OrderedFloat(1.0))),
450            // field: greptime_timestamp
451            (
452                7,
453                ValueRef::Timestamp(Timestamp::new(1618876800000000000, TimeUnit::Nanosecond)),
454            ),
455        ]
456    }
457
458    #[test]
459    fn test_encode_to_vec() {
460        let region_metadata = test_region_metadata();
461        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
462        let mut buffer = Vec::new();
463
464        let row = test_row();
465        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
466        assert!(!buffer.is_empty());
467        let sparse_value = codec.decode_sparse(&buffer).unwrap();
468        assert_eq!(
469            sparse_value.get_or_null(RESERVED_COLUMN_ID_TABLE_ID),
470            &Value::UInt32(42)
471        );
472        assert_eq!(
473            sparse_value.get_or_null(1),
474            &Value::String("greptime-frontend-6989d9899-22222".into())
475        );
476        assert_eq!(
477            sparse_value.get_or_null(2),
478            &Value::String("greptime-cluster".into())
479        );
480        assert_eq!(
481            sparse_value.get_or_null(3),
482            &Value::String("greptime-frontend-6989d9899-22222".into())
483        );
484        assert_eq!(
485            sparse_value.get_or_null(4),
486            &Value::String("greptime-frontend-6989d9899-22222".into())
487        );
488        assert_eq!(
489            sparse_value.get_or_null(5),
490            &Value::String("10.10.10.10".into())
491        );
492    }
493
494    #[test]
495    fn test_decode_leftmost() {
496        let region_metadata = test_region_metadata();
497        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
498        let mut buffer = Vec::new();
499        let row = test_row();
500        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
501        assert!(!buffer.is_empty());
502        let result = codec.decode_leftmost(&buffer).unwrap().unwrap();
503        assert_eq!(result, Value::UInt32(42));
504    }
505
506    #[test]
507    fn test_has_column() {
508        let region_metadata = test_region_metadata();
509        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
510        let mut buffer = Vec::new();
511        let row = test_row();
512        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
513        assert!(!buffer.is_empty());
514
515        let mut offsets_map = HashMap::new();
516        for column_id in [
517            RESERVED_COLUMN_ID_TABLE_ID,
518            RESERVED_COLUMN_ID_TSID,
519            1,
520            2,
521            3,
522            4,
523            5,
524        ] {
525            let offset = codec.has_column(&buffer, &mut offsets_map, column_id);
526            assert!(offset.is_some());
527        }
528
529        let offset = codec.has_column(&buffer, &mut offsets_map, 6);
530        assert!(offset.is_none());
531    }
532
533    #[test]
534    fn test_decode_value_at() {
535        let region_metadata = test_region_metadata();
536        let codec = SparsePrimaryKeyCodec::new(&region_metadata);
537        let mut buffer = Vec::new();
538        let row = test_row();
539        codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
540        assert!(!buffer.is_empty());
541
542        let row = test_row();
543        let mut offsets_map = HashMap::new();
544        for column_id in [
545            RESERVED_COLUMN_ID_TABLE_ID,
546            RESERVED_COLUMN_ID_TSID,
547            1,
548            2,
549            3,
550            4,
551            5,
552        ] {
553            let offset = codec
554                .has_column(&buffer, &mut offsets_map, column_id)
555                .unwrap();
556            let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
557            let expected_value = row.iter().find(|(id, _)| *id == column_id).unwrap().1;
558            assert_eq!(value.as_value_ref(), expected_value);
559        }
560    }
561}