1use std::collections::{BTreeMap, HashMap};
16use std::hash::Hash;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use mito2::row_converter::SparsePrimaryKeyCodec;
22use smallvec::SmallVec;
23use snafu::ResultExt;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{
26 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
27};
28use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
29use store_api::storage::{ColumnId, TableId};
30
31use crate::error::{EncodePrimaryKeySnafu, Result};
32
33const TSID_HASH_SEED: u32 = 846793005;
35
36pub(crate) struct RowModifier {
44 codec: SparsePrimaryKeyCodec,
45}
46
47impl RowModifier {
48 pub fn new() -> Self {
49 Self {
50 codec: SparsePrimaryKeyCodec::schemaless(),
51 }
52 }
53
54 pub(crate) fn modify_rows(
56 &self,
57 iter: RowsIter,
58 table_id: TableId,
59 encoding: PrimaryKeyEncoding,
60 ) -> Result<Rows> {
61 match encoding {
62 PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
63 PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
64 }
65 }
66
67 fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
70 let num_column = iter.rows.schema.len();
71 let num_primary_key_column = iter.index.num_primary_key_column;
72 let num_output_column = num_column - num_primary_key_column + 1;
74
75 let mut buffer = vec![];
76 for mut iter in iter.iter_mut() {
77 let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
78 let mut values = Vec::with_capacity(num_output_column);
79 buffer.clear();
80 let internal_columns = [
81 (
82 ReservedColumnId::table_id(),
83 api::helper::pb_value_to_value_ref(&table_id, &None),
84 ),
85 (
86 ReservedColumnId::tsid(),
87 api::helper::pb_value_to_value_ref(&tsid, &None),
88 ),
89 ];
90 self.codec
91 .encode_to_vec(internal_columns.into_iter(), &mut buffer)
92 .context(EncodePrimaryKeySnafu)?;
93 self.codec
94 .encode_to_vec(iter.primary_keys(), &mut buffer)
95 .context(EncodePrimaryKeySnafu)?;
96
97 values.push(ValueData::BinaryValue(buffer.clone()).into());
98 values.extend(iter.remaining());
99 *iter.row = Row { values };
101 }
102
103 let mut schema = Vec::with_capacity(num_output_column);
105 schema.push(ColumnSchema {
106 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
107 datatype: ColumnDataType::Binary as i32,
108 semantic_type: SemanticType::Tag as _,
109 datatype_extension: None,
110 options: None,
111 });
112 schema.extend(iter.remaining_columns());
113 iter.rows.schema = schema;
114
115 Ok(iter.rows)
116 }
117
118 fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
121 iter.rows.schema.push(ColumnSchema {
123 column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
124 datatype: ColumnDataType::Uint32 as i32,
125 semantic_type: SemanticType::Tag as _,
126 datatype_extension: None,
127 options: None,
128 });
129 iter.rows.schema.push(ColumnSchema {
131 column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
132 datatype: ColumnDataType::Uint64 as i32,
133 semantic_type: SemanticType::Tag as _,
134 datatype_extension: None,
135 options: None,
136 });
137 for iter in iter.iter_mut() {
138 let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
139 iter.row.values.push(table_id);
140 iter.row.values.push(tsid);
141 }
142
143 Ok(iter.rows)
144 }
145
146 fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
148 let mut hasher = TsidGenerator::default();
149 for (name, value) in iter.primary_keys_with_name() {
150 if let Some(ValueData::StringValue(string)) = &value.value_data {
152 hasher.write_label(name, string);
153 }
154 }
155 let hash = hasher.finish();
156
157 (
158 ValueData::U32Value(table_id).into(),
159 ValueData::U64Value(hash).into(),
160 )
161 }
162}
163
164pub struct TsidGenerator {
166 hasher: mur3::Hasher128,
167}
168
169impl Default for TsidGenerator {
170 fn default() -> Self {
171 Self {
172 hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
173 }
174 }
175}
176
177impl TsidGenerator {
178 pub fn write_label(&mut self, name: &str, value: &str) {
180 name.hash(&mut self.hasher);
181 value.hash(&mut self.hasher);
182 }
183
184 pub fn finish(&mut self) -> u64 {
186 let (hash, _) = self.hasher.finish128();
188 hash
189 }
190}
191
192#[derive(Debug, Clone, Copy)]
194struct ValueIndex {
195 column_id: ColumnId,
196 index: usize,
197}
198
199struct IterIndex {
201 indices: Vec<ValueIndex>,
202 num_primary_key_column: usize,
203}
204
205impl IterIndex {
206 fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
207 let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
208 let mut primary_key_indices = BTreeMap::new();
210 let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
211 let mut ts_index = None;
212 for (idx, col) in row_schema.iter().enumerate() {
213 match col.semantic_type() {
214 SemanticType::Tag => match col.column_name.as_str() {
215 DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
216 reserved_indices.push(ValueIndex {
217 column_id: ReservedColumnId::table_id(),
218 index: idx,
219 });
220 }
221 DATA_SCHEMA_TSID_COLUMN_NAME => {
222 reserved_indices.push(ValueIndex {
223 column_id: ReservedColumnId::tsid(),
224 index: idx,
225 });
226 }
227 _ => {
228 primary_key_indices.insert(
230 col.column_name.as_str(),
231 ValueIndex {
232 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
233 index: idx,
234 },
235 );
236 }
237 },
238 SemanticType::Field => {
239 field_indices.push(ValueIndex {
240 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
241 index: idx,
242 });
243 }
244 SemanticType::Timestamp => {
245 ts_index = Some(ValueIndex {
246 column_id: *name_to_column_id.get(&col.column_name).unwrap(),
247 index: idx,
248 });
249 }
250 }
251 }
252 let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
253 let indices = reserved_indices
254 .into_iter()
255 .chain(primary_key_indices.values().cloned())
256 .chain(ts_index)
257 .chain(field_indices)
258 .collect();
259 IterIndex {
260 indices,
261 num_primary_key_column,
262 }
263 }
264}
265
266pub(crate) struct RowsIter {
268 rows: Rows,
269 index: IterIndex,
270}
271
272impl RowsIter {
273 pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
274 let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
275 Self { rows, index }
276 }
277
278 fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
280 self.rows.rows.iter_mut().map(|row| RowIter {
281 row,
282 index: &self.index,
283 schema: &self.rows.schema,
284 })
285 }
286
287 fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
289 self.index.indices[self.index.num_primary_key_column..]
290 .iter()
291 .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
292 }
293}
294
295struct RowIter<'a> {
297 row: &'a mut Row,
298 index: &'a IterIndex,
299 schema: &'a Vec<ColumnSchema>,
300}
301
302impl RowIter<'_> {
303 fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
305 self.index.indices[..self.index.num_primary_key_column]
306 .iter()
307 .map(|idx| {
308 (
309 &self.schema[idx.index].column_name,
310 &self.row.values[idx.index],
311 )
312 })
313 }
314
315 fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
317 self.index.indices[..self.index.num_primary_key_column]
318 .iter()
319 .map(|idx| {
320 (
321 idx.column_id,
322 api::helper::pb_value_to_value_ref(
323 &self.row.values[idx.index],
324 &self.schema[idx.index].datatype_extension,
325 ),
326 )
327 })
328 }
329
330 fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
332 self.index.indices[self.index.num_primary_key_column..]
333 .iter()
334 .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use std::collections::HashMap;
341
342 use api::v1::{Row, Rows};
343
344 use super::*;
345
346 fn test_schema() -> Vec<ColumnSchema> {
347 vec![
348 ColumnSchema {
349 column_name: "namespace".to_string(),
350 datatype: ColumnDataType::String as i32,
351 semantic_type: SemanticType::Tag as _,
352 datatype_extension: None,
353 options: None,
354 },
355 ColumnSchema {
356 column_name: "host".to_string(),
357 datatype: ColumnDataType::String as i32,
358 semantic_type: SemanticType::Tag as _,
359 datatype_extension: None,
360 options: None,
361 },
362 ]
363 }
364
365 fn test_row(v1: &str, v2: &str) -> Row {
366 Row {
367 values: vec![
368 ValueData::StringValue(v1.to_string()).into(),
369 ValueData::StringValue(v2.to_string()).into(),
370 ],
371 }
372 }
373
374 fn test_name_to_column_id() -> HashMap<String, ColumnId> {
375 HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
376 }
377
378 #[test]
379 fn test_encode_sparse() {
380 let name_to_column_id = test_name_to_column_id();
381 let encoder = RowModifier::new();
382 let table_id = 1025;
383 let schema = test_schema();
384 let row = test_row("greptimedb", "127.0.0.1");
385 let rows = Rows {
386 schema,
387 rows: vec![row],
388 };
389 let rows_iter = RowsIter::new(rows, &name_to_column_id);
390 let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
391 assert_eq!(result.rows[0].values.len(), 1);
392 let encoded_primary_key = vec![
393 128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0,
394 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1,
395 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
396 ];
397 assert_eq!(
398 result.rows[0].values[0],
399 ValueData::BinaryValue(encoded_primary_key).into()
400 );
401 assert_eq!(result.schema, expected_sparse_schema());
402 }
403
404 fn expected_sparse_schema() -> Vec<ColumnSchema> {
405 vec![ColumnSchema {
406 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
407 datatype: ColumnDataType::Binary as i32,
408 semantic_type: SemanticType::Tag as _,
409 datatype_extension: None,
410 options: None,
411 }]
412 }
413
414 fn expected_dense_schema() -> Vec<ColumnSchema> {
415 vec![
416 ColumnSchema {
417 column_name: "namespace".to_string(),
418 datatype: ColumnDataType::String as i32,
419 semantic_type: SemanticType::Tag as _,
420 datatype_extension: None,
421 options: None,
422 },
423 ColumnSchema {
424 column_name: "host".to_string(),
425 datatype: ColumnDataType::String as i32,
426 semantic_type: SemanticType::Tag as _,
427 datatype_extension: None,
428 options: None,
429 },
430 ColumnSchema {
431 column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
432 datatype: ColumnDataType::Uint32 as i32,
433 semantic_type: SemanticType::Tag as _,
434 datatype_extension: None,
435 options: None,
436 },
437 ColumnSchema {
438 column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
439 datatype: ColumnDataType::Uint64 as i32,
440 semantic_type: SemanticType::Tag as _,
441 datatype_extension: None,
442 options: None,
443 },
444 ]
445 }
446
447 #[test]
448 fn test_encode_dense() {
449 let name_to_column_id = test_name_to_column_id();
450 let encoder = RowModifier::new();
451 let table_id = 1025;
452 let schema = test_schema();
453 let row = test_row("greptimedb", "127.0.0.1");
454 let rows = Rows {
455 schema,
456 rows: vec![row],
457 };
458 let rows_iter = RowsIter::new(rows, &name_to_column_id);
459 let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
460 assert_eq!(
461 result.rows[0].values[0],
462 ValueData::StringValue("greptimedb".to_string()).into()
463 );
464 assert_eq!(
465 result.rows[0].values[1],
466 ValueData::StringValue("127.0.0.1".to_string()).into()
467 );
468 assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
469 assert_eq!(
470 result.rows[0].values[3],
471 ValueData::U64Value(9442261431637846000).into()
472 );
473 assert_eq!(result.schema, expected_dense_schema());
474 }
475
476 #[test]
477 fn test_fill_internal_columns() {
478 let name_to_column_id = test_name_to_column_id();
479 let encoder = RowModifier::new();
480 let table_id = 1025;
481 let schema = test_schema();
482 let row = test_row("greptimedb", "127.0.0.1");
483 let rows = Rows {
484 schema,
485 rows: vec![row],
486 };
487 let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
488 let row_iter = rows_iter.iter_mut().next().unwrap();
489 let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
490 assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
491 assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
492
493 let schema = vec![
495 ColumnSchema {
496 column_name: "host".to_string(),
497 datatype: ColumnDataType::String as i32,
498 semantic_type: SemanticType::Tag as _,
499 datatype_extension: None,
500 options: None,
501 },
502 ColumnSchema {
503 column_name: "namespace".to_string(),
504 datatype: ColumnDataType::String as i32,
505 semantic_type: SemanticType::Tag as _,
506 datatype_extension: None,
507 options: None,
508 },
509 ];
510 let row = test_row("127.0.0.1", "greptimedb");
511 let rows = Rows {
512 schema,
513 rows: vec![row],
514 };
515 let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
516 let row_iter = rows_iter.iter_mut().next().unwrap();
517 let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
518 assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
519 assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
520 }
521}