metric_engine/
row_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::hash::Hash;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use mito_codec::row_converter::SparsePrimaryKeyCodec;
22use smallvec::SmallVec;
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{
26    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
27};
28use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
29use store_api::storage::{ColumnId, TableId};
30
31use crate::error::{EncodePrimaryKeySnafu, Result};
32
33// A random number
34const TSID_HASH_SEED: u32 = 846793005;
35
36/// A row modifier modifies [`Rows`].
37///
38/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
39///   it replaces the primary key columns with the encoded primary key column(`__primary_key`).
40///
41/// - For [`PrimaryKeyEncoding::Dense`] encoding,
42///   it adds two columns(`__table_id`, `__tsid`) to the row.
43pub struct RowModifier {
44    codec: SparsePrimaryKeyCodec,
45}
46
47impl Default for RowModifier {
48    fn default() -> Self {
49        Self {
50            codec: SparsePrimaryKeyCodec::schemaless(),
51        }
52    }
53}
54
55impl RowModifier {
56    /// Modify rows with the given primary key encoding.
57    pub(crate) fn modify_rows(
58        &self,
59        iter: RowsIter,
60        table_id: TableId,
61        encoding: PrimaryKeyEncoding,
62    ) -> Result<Rows> {
63        match encoding {
64            PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
65            PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
66        }
67    }
68
69    /// Modifies rows with sparse primary key encoding.
70    /// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
71    fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
72        let num_column = iter.rows.schema.len();
73        let num_primary_key_column = iter.index.num_primary_key_column;
74        // num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column)
75        let num_output_column = num_column - num_primary_key_column + 1;
76
77        let mut buffer = vec![];
78        for mut iter in iter.iter_mut() {
79            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
80            let mut values = Vec::with_capacity(num_output_column);
81            buffer.clear();
82            let internal_columns = [
83                (
84                    ReservedColumnId::table_id(),
85                    api::helper::pb_value_to_value_ref(&table_id, &None),
86                ),
87                (
88                    ReservedColumnId::tsid(),
89                    api::helper::pb_value_to_value_ref(&tsid, &None),
90                ),
91            ];
92            self.codec
93                .encode_to_vec(internal_columns.into_iter(), &mut buffer)
94                .context(EncodePrimaryKeySnafu)?;
95            self.codec
96                .encode_to_vec(iter.primary_keys(), &mut buffer)
97                .context(EncodePrimaryKeySnafu)?;
98
99            values.push(ValueData::BinaryValue(buffer.clone()).into());
100            values.extend(iter.remaining());
101            // Replace the row with the encoded row
102            *iter.row = Row { values };
103        }
104
105        // Update the schema
106        let mut schema = Vec::with_capacity(num_output_column);
107        schema.push(ColumnSchema {
108            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
109            datatype: ColumnDataType::Binary as i32,
110            semantic_type: SemanticType::Tag as _,
111            datatype_extension: None,
112            options: None,
113        });
114        schema.extend(iter.remaining_columns());
115        iter.rows.schema = schema;
116
117        Ok(iter.rows)
118    }
119
120    /// Modifies rows with dense primary key encoding.
121    /// It adds two columns(`__table_id`, `__tsid`) to the row.
122    fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
123        // add table_name column
124        iter.rows.schema.push(ColumnSchema {
125            column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
126            datatype: ColumnDataType::Uint32 as i32,
127            semantic_type: SemanticType::Tag as _,
128            datatype_extension: None,
129            options: None,
130        });
131        // add tsid column
132        iter.rows.schema.push(ColumnSchema {
133            column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
134            datatype: ColumnDataType::Uint64 as i32,
135            semantic_type: SemanticType::Tag as _,
136            datatype_extension: None,
137            options: None,
138        });
139        for iter in iter.iter_mut() {
140            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
141            iter.row.values.push(table_id);
142            iter.row.values.push(tsid);
143        }
144
145        Ok(iter.rows)
146    }
147
148    /// Fills internal columns of a row with table name and a hash of tag values.
149    pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
150        let mut hasher = TsidGenerator::default();
151        for (name, value) in iter.primary_keys_with_name() {
152            // The type is checked before. So only null is ignored.
153            if let Some(ValueData::StringValue(string)) = &value.value_data {
154                hasher.write_label(name, string);
155            }
156        }
157        let hash = hasher.finish();
158
159        (
160            ValueData::U32Value(table_id).into(),
161            ValueData::U64Value(hash).into(),
162        )
163    }
164}
165
166/// Tsid generator.
167pub struct TsidGenerator {
168    hasher: mur3::Hasher128,
169}
170
171impl Default for TsidGenerator {
172    fn default() -> Self {
173        Self {
174            hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
175        }
176    }
177}
178
179impl TsidGenerator {
180    /// Writes a label pair to the generator.
181    pub fn write_label(&mut self, name: &str, value: &str) {
182        name.hash(&mut self.hasher);
183        value.hash(&mut self.hasher);
184    }
185
186    /// Generates a new TSID.
187    pub fn finish(&mut self) -> u64 {
188        // TSID is 64 bits, simply truncate the 128 bits hash
189        let (hash, _) = self.hasher.finish128();
190        hash
191    }
192}
193
194/// Index of a value.
195#[derive(Debug, Clone, Copy)]
196struct ValueIndex {
197    column_id: ColumnId,
198    index: usize,
199}
200
201/// Index of a row.
202struct IterIndex {
203    indices: Vec<ValueIndex>,
204    num_primary_key_column: usize,
205}
206
207impl IterIndex {
208    fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
209        let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
210        // Uses BTreeMap to keep the primary key column name order (lexicographical)
211        let mut primary_key_indices = BTreeMap::new();
212        let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
213        let mut ts_index = None;
214        for (idx, col) in row_schema.iter().enumerate() {
215            match col.semantic_type() {
216                SemanticType::Tag => match col.column_name.as_str() {
217                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
218                        reserved_indices.push(ValueIndex {
219                            column_id: ReservedColumnId::table_id(),
220                            index: idx,
221                        });
222                    }
223                    DATA_SCHEMA_TSID_COLUMN_NAME => {
224                        reserved_indices.push(ValueIndex {
225                            column_id: ReservedColumnId::tsid(),
226                            index: idx,
227                        });
228                    }
229                    _ => {
230                        // Inserts primary key column name follower the column name order (lexicographical)
231                        primary_key_indices.insert(
232                            col.column_name.as_str(),
233                            ValueIndex {
234                                column_id: *name_to_column_id.get(&col.column_name).unwrap(),
235                                index: idx,
236                            },
237                        );
238                    }
239                },
240                SemanticType::Field => {
241                    field_indices.push(ValueIndex {
242                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
243                        index: idx,
244                    });
245                }
246                SemanticType::Timestamp => {
247                    ts_index = Some(ValueIndex {
248                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
249                        index: idx,
250                    });
251                }
252            }
253        }
254        let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
255        let indices = reserved_indices
256            .into_iter()
257            .chain(primary_key_indices.values().cloned())
258            .chain(ts_index)
259            .chain(field_indices)
260            .collect();
261        IterIndex {
262            indices,
263            num_primary_key_column,
264        }
265    }
266}
267
268/// Iterator of rows.
269pub struct RowsIter {
270    rows: Rows,
271    index: IterIndex,
272}
273
274impl RowsIter {
275    pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
276        let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
277        Self { rows, index }
278    }
279
280    /// Returns the iterator of rows.
281    pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
282        self.rows.rows.iter_mut().map(|row| RowIter {
283            row,
284            index: &self.index,
285            schema: &self.rows.schema,
286        })
287    }
288
289    /// Returns the remaining columns.
290    fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
291        self.index.indices[self.index.num_primary_key_column..]
292            .iter()
293            .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
294    }
295}
296
297/// Iterator of a row.
298pub struct RowIter<'a> {
299    row: &'a mut Row,
300    index: &'a IterIndex,
301    schema: &'a Vec<ColumnSchema>,
302}
303
304impl RowIter<'_> {
305    /// Returns the primary keys with their names.
306    fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
307        self.index.indices[..self.index.num_primary_key_column]
308            .iter()
309            .map(|idx| {
310                (
311                    &self.schema[idx.index].column_name,
312                    &self.row.values[idx.index],
313                )
314            })
315    }
316
317    /// Returns the primary keys.
318    pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
319        self.index.indices[..self.index.num_primary_key_column]
320            .iter()
321            .map(|idx| {
322                (
323                    idx.column_id,
324                    api::helper::pb_value_to_value_ref(
325                        &self.row.values[idx.index],
326                        &self.schema[idx.index].datatype_extension,
327                    ),
328                )
329            })
330    }
331
332    /// Returns the remaining columns.
333    fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
334        self.index.indices[self.index.num_primary_key_column..]
335            .iter()
336            .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
337    }
338
339    /// Returns value at given offset.
340    /// # Panics
341    /// Panics if offset out-of-bound
342    pub fn value_at(&self, idx: usize) -> &Value {
343        &self.row.values[idx]
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use std::collections::HashMap;
350
351    use api::v1::{Row, Rows};
352
353    use super::*;
354
355    fn test_schema() -> Vec<ColumnSchema> {
356        vec![
357            ColumnSchema {
358                column_name: "namespace".to_string(),
359                datatype: ColumnDataType::String as i32,
360                semantic_type: SemanticType::Tag as _,
361                datatype_extension: None,
362                options: None,
363            },
364            ColumnSchema {
365                column_name: "host".to_string(),
366                datatype: ColumnDataType::String as i32,
367                semantic_type: SemanticType::Tag as _,
368                datatype_extension: None,
369                options: None,
370            },
371        ]
372    }
373
374    fn test_row(v1: &str, v2: &str) -> Row {
375        Row {
376            values: vec![
377                ValueData::StringValue(v1.to_string()).into(),
378                ValueData::StringValue(v2.to_string()).into(),
379            ],
380        }
381    }
382
383    fn test_name_to_column_id() -> HashMap<String, ColumnId> {
384        HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
385    }
386
387    #[test]
388    fn test_encode_sparse() {
389        let name_to_column_id = test_name_to_column_id();
390        let encoder = RowModifier::default();
391        let table_id = 1025;
392        let schema = test_schema();
393        let row = test_row("greptimedb", "127.0.0.1");
394        let rows = Rows {
395            schema,
396            rows: vec![row],
397        };
398        let rows_iter = RowsIter::new(rows, &name_to_column_id);
399        let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
400        assert_eq!(result.rows[0].values.len(), 1);
401        let encoded_primary_key = vec![
402            128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
403            0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
404            1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
405        ];
406        assert_eq!(
407            result.rows[0].values[0],
408            ValueData::BinaryValue(encoded_primary_key).into()
409        );
410        assert_eq!(result.schema, expected_sparse_schema());
411    }
412
413    fn expected_sparse_schema() -> Vec<ColumnSchema> {
414        vec![ColumnSchema {
415            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
416            datatype: ColumnDataType::Binary as i32,
417            semantic_type: SemanticType::Tag as _,
418            datatype_extension: None,
419            options: None,
420        }]
421    }
422
423    fn expected_dense_schema() -> Vec<ColumnSchema> {
424        vec![
425            ColumnSchema {
426                column_name: "namespace".to_string(),
427                datatype: ColumnDataType::String as i32,
428                semantic_type: SemanticType::Tag as _,
429                datatype_extension: None,
430                options: None,
431            },
432            ColumnSchema {
433                column_name: "host".to_string(),
434                datatype: ColumnDataType::String as i32,
435                semantic_type: SemanticType::Tag as _,
436                datatype_extension: None,
437                options: None,
438            },
439            ColumnSchema {
440                column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
441                datatype: ColumnDataType::Uint32 as i32,
442                semantic_type: SemanticType::Tag as _,
443                datatype_extension: None,
444                options: None,
445            },
446            ColumnSchema {
447                column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
448                datatype: ColumnDataType::Uint64 as i32,
449                semantic_type: SemanticType::Tag as _,
450                datatype_extension: None,
451                options: None,
452            },
453        ]
454    }
455
456    #[test]
457    fn test_encode_dense() {
458        let name_to_column_id = test_name_to_column_id();
459        let encoder = RowModifier::default();
460        let table_id = 1025;
461        let schema = test_schema();
462        let row = test_row("greptimedb", "127.0.0.1");
463        let rows = Rows {
464            schema,
465            rows: vec![row],
466        };
467        let rows_iter = RowsIter::new(rows, &name_to_column_id);
468        let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
469        assert_eq!(
470            result.rows[0].values[0],
471            ValueData::StringValue("greptimedb".to_string()).into()
472        );
473        assert_eq!(
474            result.rows[0].values[1],
475            ValueData::StringValue("127.0.0.1".to_string()).into()
476        );
477        assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
478        assert_eq!(
479            result.rows[0].values[3],
480            ValueData::U64Value(9442261431637846000).into()
481        );
482        assert_eq!(result.schema, expected_dense_schema());
483    }
484
485    #[test]
486    fn test_fill_internal_columns() {
487        let name_to_column_id = test_name_to_column_id();
488        let table_id = 1025;
489        let schema = test_schema();
490        let row = test_row("greptimedb", "127.0.0.1");
491        let rows = Rows {
492            schema,
493            rows: vec![row],
494        };
495        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
496        let row_iter = rows_iter.iter_mut().next().unwrap();
497        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
498        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
499        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
500
501        // Change the column order
502        let schema = vec![
503            ColumnSchema {
504                column_name: "host".to_string(),
505                datatype: ColumnDataType::String as i32,
506                semantic_type: SemanticType::Tag as _,
507                datatype_extension: None,
508                options: None,
509            },
510            ColumnSchema {
511                column_name: "namespace".to_string(),
512                datatype: ColumnDataType::String as i32,
513                semantic_type: SemanticType::Tag as _,
514                datatype_extension: None,
515                options: None,
516            },
517        ];
518        let row = test_row("127.0.0.1", "greptimedb");
519        let rows = Rows {
520            schema,
521            rows: vec![row],
522        };
523        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
524        let row_iter = rows_iter.iter_mut().next().unwrap();
525        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
526        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
527        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
528    }
529}