1use std::collections::{BTreeMap, HashMap};
16use std::hash::Hash;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use mito_codec::row_converter::SparsePrimaryKeyCodec;
22use smallvec::SmallVec;
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{
26    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
27};
28use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
29use store_api::storage::{ColumnId, TableId};
30
31use crate::error::{EncodePrimaryKeySnafu, Result};
32
33const TSID_HASH_SEED: u32 = 846793005;
35
36pub struct RowModifier {
44    codec: SparsePrimaryKeyCodec,
45}
46
47impl Default for RowModifier {
48    fn default() -> Self {
49        Self {
50            codec: SparsePrimaryKeyCodec::schemaless(),
51        }
52    }
53}
54
55impl RowModifier {
56    pub(crate) fn modify_rows(
58        &self,
59        iter: RowsIter,
60        table_id: TableId,
61        encoding: PrimaryKeyEncoding,
62    ) -> Result<Rows> {
63        match encoding {
64            PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
65            PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
66        }
67    }
68
69    fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
72        let num_column = iter.rows.schema.len();
73        let num_primary_key_column = iter.index.num_primary_key_column;
74        let num_output_column = num_column - num_primary_key_column + 1;
76
77        let mut buffer = vec![];
78        for mut iter in iter.iter_mut() {
79            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
80            let mut values = Vec::with_capacity(num_output_column);
81            buffer.clear();
82            let internal_columns = [
83                (
84                    ReservedColumnId::table_id(),
85                    api::helper::pb_value_to_value_ref(&table_id, None),
86                ),
87                (
88                    ReservedColumnId::tsid(),
89                    api::helper::pb_value_to_value_ref(&tsid, None),
90                ),
91            ];
92            self.codec
93                .encode_to_vec(internal_columns.into_iter(), &mut buffer)
94                .context(EncodePrimaryKeySnafu)?;
95            self.codec
96                .encode_to_vec(iter.primary_keys(), &mut buffer)
97                .context(EncodePrimaryKeySnafu)?;
98
99            values.push(ValueData::BinaryValue(buffer.clone()).into());
100            values.extend(iter.remaining());
101            *iter.row = Row { values };
103        }
104
105        let mut schema = Vec::with_capacity(num_output_column);
107        schema.push(ColumnSchema {
108            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
109            datatype: ColumnDataType::Binary as i32,
110            semantic_type: SemanticType::Tag as _,
111            datatype_extension: None,
112            options: None,
113        });
114        schema.extend(iter.remaining_columns());
115        iter.rows.schema = schema;
116
117        Ok(iter.rows)
118    }
119
120    fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
123        iter.rows.schema.push(ColumnSchema {
125            column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
126            datatype: ColumnDataType::Uint32 as i32,
127            semantic_type: SemanticType::Tag as _,
128            datatype_extension: None,
129            options: None,
130        });
131        iter.rows.schema.push(ColumnSchema {
133            column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
134            datatype: ColumnDataType::Uint64 as i32,
135            semantic_type: SemanticType::Tag as _,
136            datatype_extension: None,
137            options: None,
138        });
139        for iter in iter.iter_mut() {
140            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
141            iter.row.values.push(table_id);
142            iter.row.values.push(tsid);
143        }
144
145        Ok(iter.rows)
146    }
147
148    pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
150        let mut hasher = TsidGenerator::default();
151        for (name, value) in iter.primary_keys_with_name() {
152            if let Some(ValueData::StringValue(string)) = &value.value_data {
154                hasher.write_label(name, string);
155            }
156        }
157        let hash = hasher.finish();
158
159        (
160            ValueData::U32Value(table_id).into(),
161            ValueData::U64Value(hash).into(),
162        )
163    }
164}
165
166pub struct TsidGenerator {
168    hasher: mur3::Hasher128,
169}
170
171impl Default for TsidGenerator {
172    fn default() -> Self {
173        Self {
174            hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
175        }
176    }
177}
178
179impl TsidGenerator {
180    pub fn write_label(&mut self, name: &str, value: &str) {
182        name.hash(&mut self.hasher);
183        value.hash(&mut self.hasher);
184    }
185
186    pub fn finish(&mut self) -> u64 {
188        let (hash, _) = self.hasher.finish128();
190        hash
191    }
192}
193
194#[derive(Debug, Clone, Copy)]
196struct ValueIndex {
197    column_id: ColumnId,
198    index: usize,
199}
200
201struct IterIndex {
203    indices: Vec<ValueIndex>,
204    num_primary_key_column: usize,
205}
206
207impl IterIndex {
208    fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
209        let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
210        let mut primary_key_indices = BTreeMap::new();
212        let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
213        let mut ts_index = None;
214        for (idx, col) in row_schema.iter().enumerate() {
215            match col.semantic_type() {
216                SemanticType::Tag => match col.column_name.as_str() {
217                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
218                        reserved_indices.push(ValueIndex {
219                            column_id: ReservedColumnId::table_id(),
220                            index: idx,
221                        });
222                    }
223                    DATA_SCHEMA_TSID_COLUMN_NAME => {
224                        reserved_indices.push(ValueIndex {
225                            column_id: ReservedColumnId::tsid(),
226                            index: idx,
227                        });
228                    }
229                    _ => {
230                        primary_key_indices.insert(
232                            col.column_name.as_str(),
233                            ValueIndex {
234                                column_id: *name_to_column_id.get(&col.column_name).unwrap(),
235                                index: idx,
236                            },
237                        );
238                    }
239                },
240                SemanticType::Field => {
241                    field_indices.push(ValueIndex {
242                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
243                        index: idx,
244                    });
245                }
246                SemanticType::Timestamp => {
247                    ts_index = Some(ValueIndex {
248                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
249                        index: idx,
250                    });
251                }
252            }
253        }
254        let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
255        let indices = reserved_indices
256            .into_iter()
257            .chain(primary_key_indices.values().cloned())
258            .chain(ts_index)
259            .chain(field_indices)
260            .collect();
261        IterIndex {
262            indices,
263            num_primary_key_column,
264        }
265    }
266}
267
268pub struct RowsIter {
270    rows: Rows,
271    index: IterIndex,
272}
273
274impl RowsIter {
275    pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
276        let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
277        Self { rows, index }
278    }
279
280    pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter<'_>> {
282        self.rows.rows.iter_mut().map(|row| RowIter {
283            row,
284            index: &self.index,
285            schema: &self.rows.schema,
286        })
287    }
288
289    fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
291        self.index.indices[self.index.num_primary_key_column..]
292            .iter()
293            .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
294    }
295}
296
297pub struct RowIter<'a> {
299    row: &'a mut Row,
300    index: &'a IterIndex,
301    schema: &'a Vec<ColumnSchema>,
302}
303
304impl RowIter<'_> {
305    fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
307        self.index.indices[..self.index.num_primary_key_column]
308            .iter()
309            .map(|idx| {
310                (
311                    &self.schema[idx.index].column_name,
312                    &self.row.values[idx.index],
313                )
314            })
315    }
316
317    pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
319        self.index.indices[..self.index.num_primary_key_column]
320            .iter()
321            .map(|idx| {
322                (
323                    idx.column_id,
324                    api::helper::pb_value_to_value_ref(
325                        &self.row.values[idx.index],
326                        self.schema[idx.index].datatype_extension.as_ref(),
327                    ),
328                )
329            })
330    }
331
332    fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
334        self.index.indices[self.index.num_primary_key_column..]
335            .iter()
336            .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
337    }
338
339    pub fn value_at(&self, idx: usize) -> &Value {
343        &self.row.values[idx]
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use std::collections::HashMap;
350
351    use api::v1::{Row, Rows};
352
353    use super::*;
354
355    fn test_schema() -> Vec<ColumnSchema> {
356        vec![
357            ColumnSchema {
358                column_name: "namespace".to_string(),
359                datatype: ColumnDataType::String as i32,
360                semantic_type: SemanticType::Tag as _,
361                datatype_extension: None,
362                options: None,
363            },
364            ColumnSchema {
365                column_name: "host".to_string(),
366                datatype: ColumnDataType::String as i32,
367                semantic_type: SemanticType::Tag as _,
368                datatype_extension: None,
369                options: None,
370            },
371        ]
372    }
373
374    fn test_row(v1: &str, v2: &str) -> Row {
375        Row {
376            values: vec![
377                ValueData::StringValue(v1.to_string()).into(),
378                ValueData::StringValue(v2.to_string()).into(),
379            ],
380        }
381    }
382
383    fn test_name_to_column_id() -> HashMap<String, ColumnId> {
384        HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
385    }
386
387    #[test]
388    fn test_encode_sparse() {
389        let name_to_column_id = test_name_to_column_id();
390        let encoder = RowModifier::default();
391        let table_id = 1025;
392        let schema = test_schema();
393        let row = test_row("greptimedb", "127.0.0.1");
394        let rows = Rows {
395            schema,
396            rows: vec![row],
397        };
398        let rows_iter = RowsIter::new(rows, &name_to_column_id);
399        let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
400        assert_eq!(result.rows[0].values.len(), 1);
401        let encoded_primary_key = vec![
402            128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
403            0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
404            1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
405        ];
406        assert_eq!(
407            result.rows[0].values[0],
408            ValueData::BinaryValue(encoded_primary_key).into()
409        );
410        assert_eq!(result.schema, expected_sparse_schema());
411    }
412
413    fn expected_sparse_schema() -> Vec<ColumnSchema> {
414        vec![ColumnSchema {
415            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
416            datatype: ColumnDataType::Binary as i32,
417            semantic_type: SemanticType::Tag as _,
418            datatype_extension: None,
419            options: None,
420        }]
421    }
422
423    fn expected_dense_schema() -> Vec<ColumnSchema> {
424        vec![
425            ColumnSchema {
426                column_name: "namespace".to_string(),
427                datatype: ColumnDataType::String as i32,
428                semantic_type: SemanticType::Tag as _,
429                datatype_extension: None,
430                options: None,
431            },
432            ColumnSchema {
433                column_name: "host".to_string(),
434                datatype: ColumnDataType::String as i32,
435                semantic_type: SemanticType::Tag as _,
436                datatype_extension: None,
437                options: None,
438            },
439            ColumnSchema {
440                column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
441                datatype: ColumnDataType::Uint32 as i32,
442                semantic_type: SemanticType::Tag as _,
443                datatype_extension: None,
444                options: None,
445            },
446            ColumnSchema {
447                column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
448                datatype: ColumnDataType::Uint64 as i32,
449                semantic_type: SemanticType::Tag as _,
450                datatype_extension: None,
451                options: None,
452            },
453        ]
454    }
455
456    #[test]
457    fn test_encode_dense() {
458        let name_to_column_id = test_name_to_column_id();
459        let encoder = RowModifier::default();
460        let table_id = 1025;
461        let schema = test_schema();
462        let row = test_row("greptimedb", "127.0.0.1");
463        let rows = Rows {
464            schema,
465            rows: vec![row],
466        };
467        let rows_iter = RowsIter::new(rows, &name_to_column_id);
468        let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
469        assert_eq!(
470            result.rows[0].values[0],
471            ValueData::StringValue("greptimedb".to_string()).into()
472        );
473        assert_eq!(
474            result.rows[0].values[1],
475            ValueData::StringValue("127.0.0.1".to_string()).into()
476        );
477        assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
478        assert_eq!(
479            result.rows[0].values[3],
480            ValueData::U64Value(9442261431637846000).into()
481        );
482        assert_eq!(result.schema, expected_dense_schema());
483    }
484
485    #[test]
486    fn test_fill_internal_columns() {
487        let name_to_column_id = test_name_to_column_id();
488        let table_id = 1025;
489        let schema = test_schema();
490        let row = test_row("greptimedb", "127.0.0.1");
491        let rows = Rows {
492            schema,
493            rows: vec![row],
494        };
495        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
496        let row_iter = rows_iter.iter_mut().next().unwrap();
497        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
498        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
499        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
500
501        let schema = vec![
503            ColumnSchema {
504                column_name: "host".to_string(),
505                datatype: ColumnDataType::String as i32,
506                semantic_type: SemanticType::Tag as _,
507                datatype_extension: None,
508                options: None,
509            },
510            ColumnSchema {
511                column_name: "namespace".to_string(),
512                datatype: ColumnDataType::String as i32,
513                semantic_type: SemanticType::Tag as _,
514                datatype_extension: None,
515                options: None,
516            },
517        ];
518        let row = test_row("127.0.0.1", "greptimedb");
519        let rows = Rows {
520            schema,
521            rows: vec![row],
522        };
523        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
524        let row_iter = rows_iter.iter_mut().next().unwrap();
525        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
526        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
527        assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
528    }
529}