metric_engine/
row_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::hash::Hash;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use mito2::row_converter::SparsePrimaryKeyCodec;
22use smallvec::SmallVec;
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{
26    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
27};
28use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
29use store_api::storage::{ColumnId, TableId};
30
31use crate::error::{EncodePrimaryKeySnafu, Result};
32
33// A random number
34const TSID_HASH_SEED: u32 = 846793005;
35
36/// A row modifier modifies [`Rows`].
37///
38/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
39///   it replaces the primary key columns with the encoded primary key column(`__primary_key`).
40///
41/// - For [`PrimaryKeyEncoding::Dense`] encoding,
42///   it adds two columns(`__table_id`, `__tsid`) to the row.
43pub(crate) struct RowModifier {
44    codec: SparsePrimaryKeyCodec,
45}
46
47impl RowModifier {
48    pub fn new() -> Self {
49        Self {
50            codec: SparsePrimaryKeyCodec::schemaless(),
51        }
52    }
53
54    /// Modify rows with the given primary key encoding.
55    pub(crate) fn modify_rows(
56        &self,
57        iter: RowsIter,
58        table_id: TableId,
59        encoding: PrimaryKeyEncoding,
60    ) -> Result<Rows> {
61        match encoding {
62            PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
63            PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
64        }
65    }
66
67    /// Modifies rows with sparse primary key encoding.
68    /// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
69    fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
70        let num_column = iter.rows.schema.len();
71        let num_primary_key_column = iter.index.num_primary_key_column;
72        // num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column)
73        let num_output_column = num_column - num_primary_key_column + 1;
74
75        let mut buffer = vec![];
76        for mut iter in iter.iter_mut() {
77            let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
78            let mut values = Vec::with_capacity(num_output_column);
79            buffer.clear();
80            let internal_columns = [
81                (
82                    ReservedColumnId::table_id(),
83                    api::helper::pb_value_to_value_ref(&table_id, &None),
84                ),
85                (
86                    ReservedColumnId::tsid(),
87                    api::helper::pb_value_to_value_ref(&tsid, &None),
88                ),
89            ];
90            self.codec
91                .encode_to_vec(internal_columns.into_iter(), &mut buffer)
92                .context(EncodePrimaryKeySnafu)?;
93            self.codec
94                .encode_to_vec(iter.primary_keys(), &mut buffer)
95                .context(EncodePrimaryKeySnafu)?;
96
97            values.push(ValueData::BinaryValue(buffer.clone()).into());
98            values.extend(iter.remaining());
99            // Replace the row with the encoded row
100            *iter.row = Row { values };
101        }
102
103        // Update the schema
104        let mut schema = Vec::with_capacity(num_output_column);
105        schema.push(ColumnSchema {
106            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
107            datatype: ColumnDataType::Binary as i32,
108            semantic_type: SemanticType::Tag as _,
109            datatype_extension: None,
110            options: None,
111        });
112        schema.extend(iter.remaining_columns());
113        iter.rows.schema = schema;
114
115        Ok(iter.rows)
116    }
117
118    /// Modifies rows with dense primary key encoding.
119    /// It adds two columns(`__table_id`, `__tsid`) to the row.
120    fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
121        // add table_name column
122        iter.rows.schema.push(ColumnSchema {
123            column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
124            datatype: ColumnDataType::Uint32 as i32,
125            semantic_type: SemanticType::Tag as _,
126            datatype_extension: None,
127            options: None,
128        });
129        // add tsid column
130        iter.rows.schema.push(ColumnSchema {
131            column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
132            datatype: ColumnDataType::Uint64 as i32,
133            semantic_type: SemanticType::Tag as _,
134            datatype_extension: None,
135            options: None,
136        });
137        for iter in iter.iter_mut() {
138            let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
139            iter.row.values.push(table_id);
140            iter.row.values.push(tsid);
141        }
142
143        Ok(iter.rows)
144    }
145
146    /// Fills internal columns of a row with table name and a hash of tag values.
147    fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
148        let mut hasher = TsidGenerator::default();
149        for (name, value) in iter.primary_keys_with_name() {
150            // The type is checked before. So only null is ignored.
151            if let Some(ValueData::StringValue(string)) = &value.value_data {
152                hasher.write_label(name, string);
153            }
154        }
155        let hash = hasher.finish();
156
157        (
158            ValueData::U32Value(table_id).into(),
159            ValueData::U64Value(hash).into(),
160        )
161    }
162}
163
164/// Tsid generator.
165pub struct TsidGenerator {
166    hasher: mur3::Hasher128,
167}
168
169impl Default for TsidGenerator {
170    fn default() -> Self {
171        Self {
172            hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
173        }
174    }
175}
176
177impl TsidGenerator {
178    /// Writes a label pair to the generator.
179    pub fn write_label(&mut self, name: &str, value: &str) {
180        name.hash(&mut self.hasher);
181        value.hash(&mut self.hasher);
182    }
183
184    /// Generates a new TSID.
185    pub fn finish(&mut self) -> u64 {
186        // TSID is 64 bits, simply truncate the 128 bits hash
187        let (hash, _) = self.hasher.finish128();
188        hash
189    }
190}
191
192/// Index of a value.
193#[derive(Debug, Clone, Copy)]
194struct ValueIndex {
195    column_id: ColumnId,
196    index: usize,
197}
198
199/// Index of a row.
200struct IterIndex {
201    indices: Vec<ValueIndex>,
202    num_primary_key_column: usize,
203}
204
205impl IterIndex {
206    fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
207        let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
208        // Uses BTreeMap to keep the primary key column name order (lexicographical)
209        let mut primary_key_indices = BTreeMap::new();
210        let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
211        let mut ts_index = None;
212        for (idx, col) in row_schema.iter().enumerate() {
213            match col.semantic_type() {
214                SemanticType::Tag => match col.column_name.as_str() {
215                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
216                        reserved_indices.push(ValueIndex {
217                            column_id: ReservedColumnId::table_id(),
218                            index: idx,
219                        });
220                    }
221                    DATA_SCHEMA_TSID_COLUMN_NAME => {
222                        reserved_indices.push(ValueIndex {
223                            column_id: ReservedColumnId::tsid(),
224                            index: idx,
225                        });
226                    }
227                    _ => {
228                        // Inserts primary key column name follower the column name order (lexicographical)
229                        primary_key_indices.insert(
230                            col.column_name.as_str(),
231                            ValueIndex {
232                                column_id: *name_to_column_id.get(&col.column_name).unwrap(),
233                                index: idx,
234                            },
235                        );
236                    }
237                },
238                SemanticType::Field => {
239                    field_indices.push(ValueIndex {
240                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
241                        index: idx,
242                    });
243                }
244                SemanticType::Timestamp => {
245                    ts_index = Some(ValueIndex {
246                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
247                        index: idx,
248                    });
249                }
250            }
251        }
252        let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
253        let indices = reserved_indices
254            .into_iter()
255            .chain(primary_key_indices.values().cloned())
256            .chain(ts_index)
257            .chain(field_indices)
258            .collect();
259        IterIndex {
260            indices,
261            num_primary_key_column,
262        }
263    }
264}
265
266/// Iterator of rows.
267pub(crate) struct RowsIter {
268    rows: Rows,
269    index: IterIndex,
270}
271
272impl RowsIter {
273    pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
274        let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
275        Self { rows, index }
276    }
277
278    /// Returns the iterator of rows.
279    fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
280        self.rows.rows.iter_mut().map(|row| RowIter {
281            row,
282            index: &self.index,
283            schema: &self.rows.schema,
284        })
285    }
286
287    /// Returns the remaining columns.
288    fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
289        self.index.indices[self.index.num_primary_key_column..]
290            .iter()
291            .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
292    }
293}
294
295/// Iterator of a row.
296struct RowIter<'a> {
297    row: &'a mut Row,
298    index: &'a IterIndex,
299    schema: &'a Vec<ColumnSchema>,
300}
301
302impl RowIter<'_> {
303    /// Returns the primary keys with their names.
304    fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
305        self.index.indices[..self.index.num_primary_key_column]
306            .iter()
307            .map(|idx| {
308                (
309                    &self.schema[idx.index].column_name,
310                    &self.row.values[idx.index],
311                )
312            })
313    }
314
315    /// Returns the primary keys.
316    fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
317        self.index.indices[..self.index.num_primary_key_column]
318            .iter()
319            .map(|idx| {
320                (
321                    idx.column_id,
322                    api::helper::pb_value_to_value_ref(
323                        &self.row.values[idx.index],
324                        &self.schema[idx.index].datatype_extension,
325                    ),
326                )
327            })
328    }
329
330    /// Returns the remaining columns.
331    fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
332        self.index.indices[self.index.num_primary_key_column..]
333            .iter()
334            .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use std::collections::HashMap;
341
342    use api::v1::{Row, Rows};
343
344    use super::*;
345
346    fn test_schema() -> Vec<ColumnSchema> {
347        vec![
348            ColumnSchema {
349                column_name: "namespace".to_string(),
350                datatype: ColumnDataType::String as i32,
351                semantic_type: SemanticType::Tag as _,
352                datatype_extension: None,
353                options: None,
354            },
355            ColumnSchema {
356                column_name: "host".to_string(),
357                datatype: ColumnDataType::String as i32,
358                semantic_type: SemanticType::Tag as _,
359                datatype_extension: None,
360                options: None,
361            },
362        ]
363    }
364
365    fn test_row(v1: &str, v2: &str) -> Row {
366        Row {
367            values: vec![
368                ValueData::StringValue(v1.to_string()).into(),
369                ValueData::StringValue(v2.to_string()).into(),
370            ],
371        }
372    }
373
374    fn test_name_to_column_id() -> HashMap<String, ColumnId> {
375        HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
376    }
377
378    #[test]
379    fn test_encode_sparse() {
380        let name_to_column_id = test_name_to_column_id();
381        let encoder = RowModifier::new();
382        let table_id = 1025;
383        let schema = test_schema();
384        let row = test_row("greptimedb", "127.0.0.1");
385        let rows = Rows {
386            schema,
387            rows: vec![row],
388        };
389        let rows_iter = RowsIter::new(rows, &name_to_column_id);
390        let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
391        assert_eq!(result.rows[0].values.len(), 1);
392        let encoded_primary_key = vec![
393            128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
394            0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
395            1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
396        ];
397        assert_eq!(
398            result.rows[0].values[0],
399            ValueData::BinaryValue(encoded_primary_key).into()
400        );
401        assert_eq!(result.schema, expected_sparse_schema());
402    }
403
404    fn expected_sparse_schema() -> Vec<ColumnSchema> {
405        vec![ColumnSchema {
406            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
407            datatype: ColumnDataType::Binary as i32,
408            semantic_type: SemanticType::Tag as _,
409            datatype_extension: None,
410            options: None,
411        }]
412    }
413
414    fn expected_dense_schema() -> Vec<ColumnSchema> {
415        vec![
416            ColumnSchema {
417                column_name: "namespace".to_string(),
418                datatype: ColumnDataType::String as i32,
419                semantic_type: SemanticType::Tag as _,
420                datatype_extension: None,
421                options: None,
422            },
423            ColumnSchema {
424                column_name: "host".to_string(),
425                datatype: ColumnDataType::String as i32,
426                semantic_type: SemanticType::Tag as _,
427                datatype_extension: None,
428                options: None,
429            },
430            ColumnSchema {
431                column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
432                datatype: ColumnDataType::Uint32 as i32,
433                semantic_type: SemanticType::Tag as _,
434                datatype_extension: None,
435                options: None,
436            },
437            ColumnSchema {
438                column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
439                datatype: ColumnDataType::Uint64 as i32,
440                semantic_type: SemanticType::Tag as _,
441                datatype_extension: None,
442                options: None,
443            },
444        ]
445    }
446
447    #[test]
448    fn test_encode_dense() {
449        let name_to_column_id = test_name_to_column_id();
450        let encoder = RowModifier::new();
451        let table_id = 1025;
452        let schema = test_schema();
453        let row = test_row("greptimedb", "127.0.0.1");
454        let rows = Rows {
455            schema,
456            rows: vec![row],
457        };
458        let rows_iter = RowsIter::new(rows, &name_to_column_id);
459        let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
460        assert_eq!(
461            result.rows[0].values[0],
462            ValueData::StringValue("greptimedb".to_string()).into()
463        );
464        assert_eq!(
465            result.rows[0].values[1],
466            ValueData::StringValue("127.0.0.1".to_string()).into()
467        );
468        assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
469        assert_eq!(
470            result.rows[0].values[3],
471            ValueData::U64Value(9442261431637846000).into()
472        );
473        assert_eq!(result.schema, expected_dense_schema());
474    }
475
476    #[test]
477    fn test_fill_internal_columns() {
478        let name_to_column_id = test_name_to_column_id();
479        let encoder = RowModifier::new();
480        let table_id = 1025;
481        let schema = test_schema();
482        let row = test_row("greptimedb", "127.0.0.1");
483        let rows = Rows {
484            schema,
485            rows: vec![row],
486        };
487        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
488        let row_iter = rows_iter.iter_mut().next().unwrap();
489        let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
490        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
491        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
492
493        // Change the column order
494        let schema = vec![
495            ColumnSchema {
496                column_name: "host".to_string(),
497                datatype: ColumnDataType::String as i32,
498                semantic_type: SemanticType::Tag as _,
499                datatype_extension: None,
500                options: None,
501            },
502            ColumnSchema {
503                column_name: "namespace".to_string(),
504                datatype: ColumnDataType::String as i32,
505                semantic_type: SemanticType::Tag as _,
506                datatype_extension: None,
507                options: None,
508            },
509        ];
510        let row = test_row("127.0.0.1", "greptimedb");
511        let rows = Rows {
512            schema,
513            rows: vec![row],
514        };
515        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
516        let row_iter = rows_iter.iter_mut().next().unwrap();
517        let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
518        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
519        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
520    }
521}