1use std::collections::{BTreeMap, HashMap};
16use std::hash::Hash;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use mito_codec::row_converter::SparsePrimaryKeyCodec;
22use smallvec::SmallVec;
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{
26 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
27};
28use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
29use store_api::storage::{ColumnId, TableId};
30
31use crate::error::{EncodePrimaryKeySnafu, Result};
32
33const TSID_HASH_SEED: u32 = 846793005;
35
36pub struct RowModifier {
44 codec: SparsePrimaryKeyCodec,
45}
46
47impl Default for RowModifier {
48 fn default() -> Self {
49 Self {
50 codec: SparsePrimaryKeyCodec::schemaless(),
51 }
52 }
53}
54
55impl RowModifier {
56 pub(crate) fn modify_rows(
58 &self,
59 iter: RowsIter,
60 table_id: TableId,
61 encoding: PrimaryKeyEncoding,
62 ) -> Result<Rows> {
63 match encoding {
64 PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
65 PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
66 }
67 }
68
69 fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
72 let num_column = iter.rows.schema.len();
73 let num_primary_key_column = iter.index.num_primary_key_column;
74 let num_output_column = num_column - num_primary_key_column + 1;
76
77 let mut buffer = vec![];
78 for mut iter in iter.iter_mut() {
79 let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
80 let mut values = Vec::with_capacity(num_output_column);
81 buffer.clear();
82 let internal_columns = [
83 (
84 ReservedColumnId::table_id(),
85 api::helper::pb_value_to_value_ref(&table_id, &None),
86 ),
87 (
88 ReservedColumnId::tsid(),
89 api::helper::pb_value_to_value_ref(&tsid, &None),
90 ),
91 ];
92 self.codec
93 .encode_to_vec(internal_columns.into_iter(), &mut buffer)
94 .context(EncodePrimaryKeySnafu)?;
95 self.codec
96 .encode_to_vec(iter.primary_keys(), &mut buffer)
97 .context(EncodePrimaryKeySnafu)?;
98
99 values.push(ValueData::BinaryValue(buffer.clone()).into());
100 values.extend(iter.remaining());
101 *iter.row = Row { values };
103 }
104
105 let mut schema = Vec::with_capacity(num_output_column);
107 schema.push(ColumnSchema {
108 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
109 datatype: ColumnDataType::Binary as i32,
110 semantic_type: SemanticType::Tag as _,
111 datatype_extension: None,
112 options: None,
113 });
114 schema.extend(iter.remaining_columns());
115 iter.rows.schema = schema;
116
117 Ok(iter.rows)
118 }
119
120 fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
123 iter.rows.schema.push(ColumnSchema {
125 column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
126 datatype: ColumnDataType::Uint32 as i32,
127 semantic_type: SemanticType::Tag as _,
128 datatype_extension: None,
129 options: None,
130 });
131 iter.rows.schema.push(ColumnSchema {
133 column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
134 datatype: ColumnDataType::Uint64 as i32,
135 semantic_type: SemanticType::Tag as _,
136 datatype_extension: None,
137 options: None,
138 });
139 for iter in iter.iter_mut() {
140 let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
141 iter.row.values.push(table_id);
142 iter.row.values.push(tsid);
143 }
144
145 Ok(iter.rows)
146 }
147
148 pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
150 let mut hasher = TsidGenerator::default();
151 for (name, value) in iter.primary_keys_with_name() {
152 if let Some(ValueData::StringValue(string)) = &value.value_data {
154 hasher.write_label(name, string);
155 }
156 }
157 let hash = hasher.finish();
158
159 (
160 ValueData::U32Value(table_id).into(),
161 ValueData::U64Value(hash).into(),
162 )
163 }
164}
165
166pub struct TsidGenerator {
168 hasher: mur3::Hasher128,
169}
170
171impl Default for TsidGenerator {
172 fn default() -> Self {
173 Self {
174 hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
175 }
176 }
177}
178
179impl TsidGenerator {
180 pub fn write_label(&mut self, name: &str, value: &str) {
182 name.hash(&mut self.hasher);
183 value.hash(&mut self.hasher);
184 }
185
186 pub fn finish(&mut self) -> u64 {
188 let (hash, _) = self.hasher.finish128();
190 hash
191 }
192}
193
194#[derive(Debug, Clone, Copy)]
196struct ValueIndex {
197 column_id: ColumnId,
198 index: usize,
199}
200
201struct IterIndex {
203 indices: Vec<ValueIndex>,
204 num_primary_key_column: usize,
205}
206
207impl IterIndex {
208 fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
209 let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
210 let mut primary_key_indices = BTreeMap::new();
212 let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
213 let mut ts_index = None;
214 for (idx, col) in row_schema.iter().enumerate() {
215 match col.semantic_type() {
216 SemanticType::Tag => match col.column_name.as_str() {
217 DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
218 reserved_indices.push(ValueIndex {
219 column_id: ReservedColumnId::table_id(),
220 index: idx,
221 });
222 }
223 DATA_SCHEMA_TSID_COLUMN_NAME => {
224 reserved_indices.push(ValueIndex {
225 column_id: ReservedColumnId::tsid(),
226 index: idx,
227 });
228 }
229 _ => {
230 primary_key_indices.insert(
232 col.column_name.as_str(),
233 ValueIndex {
234 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
235 index: idx,
236 },
237 );
238 }
239 },
240 SemanticType::Field => {
241 field_indices.push(ValueIndex {
242 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
243 index: idx,
244 });
245 }
246 SemanticType::Timestamp => {
247 ts_index = Some(ValueIndex {
248 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
249 index: idx,
250 });
251 }
252 }
253 }
254 let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
255 let indices = reserved_indices
256 .into_iter()
257 .chain(primary_key_indices.values().cloned())
258 .chain(ts_index)
259 .chain(field_indices)
260 .collect();
261 IterIndex {
262 indices,
263 num_primary_key_column,
264 }
265 }
266}
267
268pub struct RowsIter {
270 rows: Rows,
271 index: IterIndex,
272}
273
274impl RowsIter {
275 pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
276 let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
277 Self { rows, index }
278 }
279
280 pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
282 self.rows.rows.iter_mut().map(|row| RowIter {
283 row,
284 index: &self.index,
285 schema: &self.rows.schema,
286 })
287 }
288
289 fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
291 self.index.indices[self.index.num_primary_key_column..]
292 .iter()
293 .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
294 }
295}
296
297pub struct RowIter<'a> {
299 row: &'a mut Row,
300 index: &'a IterIndex,
301 schema: &'a Vec<ColumnSchema>,
302}
303
304impl RowIter<'_> {
305 fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
307 self.index.indices[..self.index.num_primary_key_column]
308 .iter()
309 .map(|idx| {
310 (
311 &self.schema[idx.index].column_name,
312 &self.row.values[idx.index],
313 )
314 })
315 }
316
317 pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
319 self.index.indices[..self.index.num_primary_key_column]
320 .iter()
321 .map(|idx| {
322 (
323 idx.column_id,
324 api::helper::pb_value_to_value_ref(
325 &self.row.values[idx.index],
326 &self.schema[idx.index].datatype_extension,
327 ),
328 )
329 })
330 }
331
332 fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
334 self.index.indices[self.index.num_primary_key_column..]
335 .iter()
336 .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
337 }
338
339 pub fn value_at(&self, idx: usize) -> &Value {
343 &self.row.values[idx]
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use std::collections::HashMap;
350
351 use api::v1::{Row, Rows};
352
353 use super::*;
354
355 fn test_schema() -> Vec<ColumnSchema> {
356 vec![
357 ColumnSchema {
358 column_name: "namespace".to_string(),
359 datatype: ColumnDataType::String as i32,
360 semantic_type: SemanticType::Tag as _,
361 datatype_extension: None,
362 options: None,
363 },
364 ColumnSchema {
365 column_name: "host".to_string(),
366 datatype: ColumnDataType::String as i32,
367 semantic_type: SemanticType::Tag as _,
368 datatype_extension: None,
369 options: None,
370 },
371 ]
372 }
373
374 fn test_row(v1: &str, v2: &str) -> Row {
375 Row {
376 values: vec![
377 ValueData::StringValue(v1.to_string()).into(),
378 ValueData::StringValue(v2.to_string()).into(),
379 ],
380 }
381 }
382
383 fn test_name_to_column_id() -> HashMap<String, ColumnId> {
384 HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
385 }
386
387 #[test]
388 fn test_encode_sparse() {
389 let name_to_column_id = test_name_to_column_id();
390 let encoder = RowModifier::default();
391 let table_id = 1025;
392 let schema = test_schema();
393 let row = test_row("greptimedb", "127.0.0.1");
394 let rows = Rows {
395 schema,
396 rows: vec![row],
397 };
398 let rows_iter = RowsIter::new(rows, &name_to_column_id);
399 let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
400 assert_eq!(result.rows[0].values.len(), 1);
401 let encoded_primary_key = vec![
402 128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
403 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
404 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
405 ];
406 assert_eq!(
407 result.rows[0].values[0],
408 ValueData::BinaryValue(encoded_primary_key).into()
409 );
410 assert_eq!(result.schema, expected_sparse_schema());
411 }
412
413 fn expected_sparse_schema() -> Vec<ColumnSchema> {
414 vec![ColumnSchema {
415 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
416 datatype: ColumnDataType::Binary as i32,
417 semantic_type: SemanticType::Tag as _,
418 datatype_extension: None,
419 options: None,
420 }]
421 }
422
423 fn expected_dense_schema() -> Vec<ColumnSchema> {
424 vec![
425 ColumnSchema {
426 column_name: "namespace".to_string(),
427 datatype: ColumnDataType::String as i32,
428 semantic_type: SemanticType::Tag as _,
429 datatype_extension: None,
430 options: None,
431 },
432 ColumnSchema {
433 column_name: "host".to_string(),
434 datatype: ColumnDataType::String as i32,
435 semantic_type: SemanticType::Tag as _,
436 datatype_extension: None,
437 options: None,
438 },
439 ColumnSchema {
440 column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
441 datatype: ColumnDataType::Uint32 as i32,
442 semantic_type: SemanticType::Tag as _,
443 datatype_extension: None,
444 options: None,
445 },
446 ColumnSchema {
447 column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
448 datatype: ColumnDataType::Uint64 as i32,
449 semantic_type: SemanticType::Tag as _,
450 datatype_extension: None,
451 options: None,
452 },
453 ]
454 }
455
456 #[test]
457 fn test_encode_dense() {
458 let name_to_column_id = test_name_to_column_id();
459 let encoder = RowModifier::default();
460 let table_id = 1025;
461 let schema = test_schema();
462 let row = test_row("greptimedb", "127.0.0.1");
463 let rows = Rows {
464 schema,
465 rows: vec![row],
466 };
467 let rows_iter = RowsIter::new(rows, &name_to_column_id);
468 let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
469 assert_eq!(
470 result.rows[0].values[0],
471 ValueData::StringValue("greptimedb".to_string()).into()
472 );
473 assert_eq!(
474 result.rows[0].values[1],
475 ValueData::StringValue("127.0.0.1".to_string()).into()
476 );
477 assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
478 assert_eq!(
479 result.rows[0].values[3],
480 ValueData::U64Value(9442261431637846000).into()
481 );
482 assert_eq!(result.schema, expected_dense_schema());
483 }
484
485 #[test]
486 fn test_fill_internal_columns() {
487 let name_to_column_id = test_name_to_column_id();
488 let table_id = 1025;
489 let schema = test_schema();
490 let row = test_row("greptimedb", "127.0.0.1");
491 let rows = Rows {
492 schema,
493 rows: vec![row],
494 };
495 let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
496 let row_iter = rows_iter.iter_mut().next().unwrap();
497 let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
498 assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
499 assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
500
501 let schema = vec![
503 ColumnSchema {
504 column_name: "host".to_string(),
505 datatype: ColumnDataType::String as i32,
506 semantic_type: SemanticType::Tag as _,
507 datatype_extension: None,
508 options: None,
509 },
510 ColumnSchema {
511 column_name: "namespace".to_string(),
512 datatype: ColumnDataType::String as i32,
513 semantic_type: SemanticType::Tag as _,
514 datatype_extension: None,
515 options: None,
516 },
517 ];
518 let row = test_row("127.0.0.1", "greptimedb");
519 let rows = Rows {
520 schema,
521 rows: vec![row],
522 };
523 let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
524 let row_iter = rows_iter.iter_mut().next().unwrap();
525 let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
526 assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
527 assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
528 }
529}