metric_engine/
row_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::hash::Hasher;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metric_engine_consts::{
27    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
28};
29use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
30use store_api::storage::{ColumnId, TableId};
31
32use crate::error::{EncodePrimaryKeySnafu, Result, TableIdCountMismatchSnafu};
33
34/// A row modifier modifies [`Rows`].
35///
36/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
37///   it replaces the primary key columns with the encoded primary key column(`__primary_key`).
38///
39/// - For [`PrimaryKeyEncoding::Dense`] encoding,
40///   it adds two columns(`__table_id`, `__tsid`) to the row.
41pub struct RowModifier {
42    codec: SparsePrimaryKeyCodec,
43}
44
45/// Table id input for row modification.
46#[derive(Clone, Copy)]
47pub(crate) enum TableIdInput<'a> {
48    Single(TableId),
49    Batch(&'a [TableId]),
50}
51
52impl<'a> TableIdInput<'a> {
53    fn table_id_for_row(&self, row_idx: usize) -> TableId {
54        match self {
55            TableIdInput::Single(table_id) => *table_id,
56            TableIdInput::Batch(table_ids) => table_ids[row_idx],
57        }
58    }
59}
60
61impl Default for RowModifier {
62    fn default() -> Self {
63        Self {
64            codec: SparsePrimaryKeyCodec::schemaless(),
65        }
66    }
67}
68
69impl RowModifier {
70    /// Modify rows with the given primary key encoding and table ids.
71    pub(crate) fn modify_rows(
72        &self,
73        iter: RowsIter,
74        table_ids: TableIdInput<'_>,
75        encoding: PrimaryKeyEncoding,
76    ) -> Result<Rows> {
77        let row_count = iter.rows.rows.len();
78        Self::validate_table_id_count(table_ids, row_count)?;
79        match encoding {
80            PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_ids),
81            PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_ids),
82        }
83    }
84
85    /// Modifies rows with sparse primary key encoding.
86    /// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
87    fn modify_rows_sparse(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result<Rows> {
88        let num_column = iter.rows.schema.len();
89        let num_primary_key_column = iter.index.num_primary_key_column;
90        // num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column)
91        let num_output_column = num_column - num_primary_key_column + 1;
92
93        let mut buffer = vec![];
94
95        for (row_index, mut row_iter) in iter.iter_mut().enumerate() {
96            let table_id = table_ids.table_id_for_row(row_index);
97            let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter);
98            let mut values = Vec::with_capacity(num_output_column);
99            buffer.clear();
100            let internal_columns = [
101                (
102                    ReservedColumnId::table_id(),
103                    api::helper::pb_value_to_value_ref(&table_id_value, None),
104                ),
105                (
106                    ReservedColumnId::tsid(),
107                    api::helper::pb_value_to_value_ref(&tsid, None),
108                ),
109            ];
110            self.codec
111                .encode_to_vec(internal_columns.into_iter(), &mut buffer)
112                .context(EncodePrimaryKeySnafu)?;
113            self.codec
114                .encode_to_vec(row_iter.primary_keys(), &mut buffer)
115                .context(EncodePrimaryKeySnafu)?;
116
117            values.push(ValueData::BinaryValue(buffer.clone()).into());
118            values.extend(row_iter.remaining());
119            // Replace the row with the encoded row
120            *row_iter.row = Row { values };
121        }
122
123        // Update the schema
124        let mut schema = Vec::with_capacity(num_output_column);
125        schema.push(ColumnSchema {
126            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
127            datatype: ColumnDataType::Binary as i32,
128            semantic_type: SemanticType::Tag as _,
129            datatype_extension: None,
130            options: None,
131        });
132        schema.extend(iter.remaining_columns());
133        iter.rows.schema = schema;
134
135        Ok(iter.rows)
136    }
137
138    /// Modifies rows with dense primary key encoding.
139    /// It adds two columns(`__table_id`, `__tsid`) to the row.
140    fn modify_rows_dense(&self, mut iter: RowsIter, table_ids: TableIdInput<'_>) -> Result<Rows> {
141        // add table_name column
142        iter.rows.schema.push(ColumnSchema {
143            column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
144            datatype: ColumnDataType::Uint32 as i32,
145            semantic_type: SemanticType::Tag as _,
146            datatype_extension: None,
147            options: None,
148        });
149        // add tsid column
150        iter.rows.schema.push(ColumnSchema {
151            column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
152            datatype: ColumnDataType::Uint64 as i32,
153            semantic_type: SemanticType::Tag as _,
154            datatype_extension: None,
155            options: None,
156        });
157        for (row_index, row_iter) in iter.iter_mut().enumerate() {
158            let table_id = table_ids.table_id_for_row(row_index);
159            let (table_id_value, tsid) = Self::fill_internal_columns(table_id, &row_iter);
160            row_iter.row.values.push(table_id_value);
161            row_iter.row.values.push(tsid);
162        }
163
164        Ok(iter.rows)
165    }
166
167    fn validate_table_id_count(table_ids: TableIdInput<'_>, row_count: usize) -> Result<()> {
168        if let TableIdInput::Batch(table_ids) = table_ids
169            && table_ids.len() != row_count
170        {
171            return TableIdCountMismatchSnafu {
172                expected: row_count,
173                actual: table_ids.len(),
174            }
175            .fail();
176        }
177        Ok(())
178    }
179
180    /// Fills internal columns of a row with table name and a hash of tag values.
181    pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
182        let ts_id = if !iter.has_null_labels() {
183            // No null labels in row, we can safely reuse the precomputed label name hash.
184            let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
185            for (_, value) in iter.primary_keys_with_name() {
186                // The type is checked before. So only null is ignored.
187                if let Some(ValueData::StringValue(string)) = &value.value_data {
188                    ts_id_gen.write_str(string);
189                } else {
190                    unreachable!(
191                        "Should not contain null or non-string value: {:?}, table id: {}",
192                        value, table_id
193                    );
194                }
195            }
196            ts_id_gen.finish()
197        } else {
198            // Slow path: row contains null, recompute label hash
199            let mut hasher = TsidGenerator::default();
200            // 1. Find out label names with non-null values and get the hash.
201            for (name, value) in iter.primary_keys_with_name() {
202                // The type is checked before. So only null is ignored.
203                if let Some(ValueData::StringValue(_)) = &value.value_data {
204                    hasher.write_str(name);
205                }
206            }
207            let label_name_hash = hasher.finish();
208
209            // 2. Use label name hash as seed and continue with label values.
210            let mut final_hasher = TsidGenerator::new(label_name_hash);
211            for (_, value) in iter.primary_keys_with_name() {
212                if let Some(ValueData::StringValue(value)) = &value.value_data {
213                    final_hasher.write_str(value);
214                }
215            }
216            final_hasher.finish()
217        };
218
219        (
220            ValueData::U32Value(table_id).into(),
221            ValueData::U64Value(ts_id).into(),
222        )
223    }
224}
225
226/// Tsid generator.
227#[derive(Default)]
228pub struct TsidGenerator {
229    hasher: FxHasher,
230}
231
232impl TsidGenerator {
233    pub fn new(label_name_hash: u64) -> Self {
234        let mut hasher = FxHasher::default();
235        hasher.write_u64(label_name_hash);
236        Self { hasher }
237    }
238
239    /// Writes a label pair to the generator.
240    pub fn write_str(&mut self, value: &str) {
241        self.hasher.write(value.as_bytes());
242        self.hasher.write_u8(0xff);
243    }
244
245    /// Generates a new TSID.
246    pub fn finish(&mut self) -> u64 {
247        self.hasher.finish()
248    }
249}
250
251/// Index of a value.
252#[derive(Debug, Clone, Copy)]
253struct ValueIndex {
254    column_id: ColumnId,
255    index: usize,
256}
257
258/// Index of a row.
259struct IterIndex {
260    indices: Vec<ValueIndex>,
261    num_primary_key_column: usize,
262    /// Precomputed hash for label names.
263    label_name_hash: u64,
264}
265
266impl IterIndex {
267    fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
268        let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
269        // Uses BTreeMap to keep the primary key column name order (lexicographical)
270        let mut primary_key_indices = BTreeMap::new();
271        let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
272        let mut ts_index = None;
273        for (idx, col) in row_schema.iter().enumerate() {
274            match col.semantic_type() {
275                SemanticType::Tag => match col.column_name.as_str() {
276                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
277                        reserved_indices.push(ValueIndex {
278                            column_id: ReservedColumnId::table_id(),
279                            index: idx,
280                        });
281                    }
282                    DATA_SCHEMA_TSID_COLUMN_NAME => {
283                        reserved_indices.push(ValueIndex {
284                            column_id: ReservedColumnId::tsid(),
285                            index: idx,
286                        });
287                    }
288                    _ => {
289                        // Inserts primary key column name follower the column name order (lexicographical)
290                        primary_key_indices.insert(
291                            col.column_name.as_str(),
292                            ValueIndex {
293                                column_id: *name_to_column_id.get(&col.column_name).unwrap(),
294                                index: idx,
295                            },
296                        );
297                    }
298                },
299                SemanticType::Field => {
300                    field_indices.push(ValueIndex {
301                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
302                        index: idx,
303                    });
304                }
305                SemanticType::Timestamp => {
306                    ts_index = Some(ValueIndex {
307                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
308                        index: idx,
309                    });
310                }
311            }
312        }
313        let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
314        let mut indices = Vec::with_capacity(num_primary_key_column + 2);
315        indices.extend(reserved_indices);
316        let mut label_name_hasher = TsidGenerator::default();
317        for (pk_name, pk_index) in primary_key_indices {
318            // primary_key_indices already sorted.
319            label_name_hasher.write_str(pk_name);
320            indices.push(pk_index);
321        }
322        let label_name_hash = label_name_hasher.finish();
323
324        indices.extend(ts_index);
325        indices.extend(field_indices);
326        IterIndex {
327            indices,
328            num_primary_key_column,
329            label_name_hash,
330        }
331    }
332}
333
334/// Iterator of rows.
335pub struct RowsIter {
336    rows: Rows,
337    index: IterIndex,
338}
339
340impl RowsIter {
341    pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
342        let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
343        Self { rows, index }
344    }
345
346    /// Returns the iterator of rows.
347    pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter<'_>> {
348        self.rows.rows.iter_mut().map(|row| RowIter {
349            row,
350            index: &self.index,
351            schema: &self.rows.schema,
352        })
353    }
354
355    /// Returns the remaining columns.
356    fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
357        self.index.indices[self.index.num_primary_key_column..]
358            .iter()
359            .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
360    }
361}
362
363/// Iterator of a row.
364pub struct RowIter<'a> {
365    row: &'a mut Row,
366    index: &'a IterIndex,
367    schema: &'a Vec<ColumnSchema>,
368}
369
370impl RowIter<'_> {
371    /// Returns the primary keys with their names.
372    fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
373        self.index.indices[..self.index.num_primary_key_column]
374            .iter()
375            .map(|idx| {
376                (
377                    &self.schema[idx.index].column_name,
378                    &self.row.values[idx.index],
379                )
380            })
381    }
382
383    /// Returns true if any label in current row is null.
384    fn has_null_labels(&self) -> bool {
385        self.index.indices[..self.index.num_primary_key_column]
386            .iter()
387            .any(|idx| self.row.values[idx.index].value_data.is_none())
388    }
389
390    /// Returns the primary keys.
391    pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
392        self.index.indices[..self.index.num_primary_key_column]
393            .iter()
394            .map(|idx| {
395                (
396                    idx.column_id,
397                    api::helper::pb_value_to_value_ref(
398                        &self.row.values[idx.index],
399                        self.schema[idx.index].datatype_extension.as_ref(),
400                    ),
401                )
402            })
403    }
404
405    /// Returns the remaining columns.
406    fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
407        self.index.indices[self.index.num_primary_key_column..]
408            .iter()
409            .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
410    }
411
412    /// Returns value at given offset.
413    /// # Panics
414    /// Panics if offset out-of-bound
415    pub fn value_at(&self, idx: usize) -> &Value {
416        &self.row.values[idx]
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::collections::HashMap;
423
424    use api::v1::{Row, Rows};
425    use store_api::codec::PrimaryKeyEncoding;
426
427    use super::*;
428    use crate::error::Error;
429
430    fn test_schema() -> Vec<ColumnSchema> {
431        vec![
432            ColumnSchema {
433                column_name: "namespace".to_string(),
434                datatype: ColumnDataType::String as i32,
435                semantic_type: SemanticType::Tag as _,
436                datatype_extension: None,
437                options: None,
438            },
439            ColumnSchema {
440                column_name: "host".to_string(),
441                datatype: ColumnDataType::String as i32,
442                semantic_type: SemanticType::Tag as _,
443                datatype_extension: None,
444                options: None,
445            },
446        ]
447    }
448
449    fn test_row(v1: &str, v2: &str) -> Row {
450        Row {
451            values: vec![
452                ValueData::StringValue(v1.to_string()).into(),
453                ValueData::StringValue(v2.to_string()).into(),
454            ],
455        }
456    }
457
458    fn test_name_to_column_id() -> HashMap<String, ColumnId> {
459        HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
460    }
461
462    #[test]
463    fn test_encode_sparse() {
464        let name_to_column_id = test_name_to_column_id();
465        let encoder = RowModifier::default();
466        let table_id = 1025;
467        let schema = test_schema();
468        let row = test_row("greptimedb", "127.0.0.1");
469        let rows = Rows {
470            schema,
471            rows: vec![row],
472        };
473        let rows_iter = RowsIter::new(rows, &name_to_column_id);
474        let result = encoder
475            .modify_rows(
476                rows_iter,
477                TableIdInput::Single(table_id),
478                PrimaryKeyEncoding::Sparse,
479            )
480            .unwrap();
481        assert_eq!(result.rows[0].values.len(), 1);
482        let encoded_primary_key = vec![
483            128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 37, 196, 242, 181, 117, 224, 7, 137, 0,
484            0, 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
485            1, 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
486        ];
487        assert_eq!(
488            result.rows[0].values[0],
489            ValueData::BinaryValue(encoded_primary_key).into()
490        );
491        assert_eq!(result.schema, expected_sparse_schema());
492    }
493
494    fn expected_sparse_schema() -> Vec<ColumnSchema> {
495        vec![ColumnSchema {
496            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
497            datatype: ColumnDataType::Binary as i32,
498            semantic_type: SemanticType::Tag as _,
499            datatype_extension: None,
500            options: None,
501        }]
502    }
503
504    fn expected_dense_schema() -> Vec<ColumnSchema> {
505        vec![
506            ColumnSchema {
507                column_name: "namespace".to_string(),
508                datatype: ColumnDataType::String as i32,
509                semantic_type: SemanticType::Tag as _,
510                datatype_extension: None,
511                options: None,
512            },
513            ColumnSchema {
514                column_name: "host".to_string(),
515                datatype: ColumnDataType::String as i32,
516                semantic_type: SemanticType::Tag as _,
517                datatype_extension: None,
518                options: None,
519            },
520            ColumnSchema {
521                column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
522                datatype: ColumnDataType::Uint32 as i32,
523                semantic_type: SemanticType::Tag as _,
524                datatype_extension: None,
525                options: None,
526            },
527            ColumnSchema {
528                column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
529                datatype: ColumnDataType::Uint64 as i32,
530                semantic_type: SemanticType::Tag as _,
531                datatype_extension: None,
532                options: None,
533            },
534        ]
535    }
536
537    #[test]
538    fn test_encode_dense() {
539        let name_to_column_id = test_name_to_column_id();
540        let encoder = RowModifier::default();
541        let table_id = 1025;
542        let schema = test_schema();
543        let row = test_row("greptimedb", "127.0.0.1");
544        let rows = Rows {
545            schema,
546            rows: vec![row],
547        };
548        let rows_iter = RowsIter::new(rows, &name_to_column_id);
549        let result = encoder
550            .modify_rows(
551                rows_iter,
552                TableIdInput::Single(table_id),
553                PrimaryKeyEncoding::Dense,
554            )
555            .unwrap();
556        assert_eq!(
557            result.rows[0].values[0],
558            ValueData::StringValue("greptimedb".to_string()).into()
559        );
560        assert_eq!(
561            result.rows[0].values[1],
562            ValueData::StringValue("127.0.0.1".to_string()).into()
563        );
564        assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
565        assert_eq!(
566            result.rows[0].values[3],
567            ValueData::U64Value(2721566936019240841).into()
568        );
569        assert_eq!(result.schema, expected_dense_schema());
570    }
571
572    #[test]
573    fn test_table_id_count_mismatch() {
574        let name_to_column_id = test_name_to_column_id();
575        let encoder = RowModifier::default();
576        let schema = test_schema();
577        let rows = Rows {
578            schema,
579            rows: vec![test_row("a", "b"), test_row("c", "d")],
580        };
581        let rows_iter = RowsIter::new(rows, &name_to_column_id);
582        let table_ids = [1025];
583        let err = encoder
584            .modify_rows(
585                rows_iter,
586                TableIdInput::Batch(&table_ids),
587                PrimaryKeyEncoding::Dense,
588            )
589            .unwrap_err();
590        assert!(matches!(
591            err,
592            Error::TableIdCountMismatch {
593                expected: 2,
594                actual: 1,
595                ..
596            }
597        ));
598    }
599
600    #[test]
601    fn test_fill_internal_columns() {
602        let name_to_column_id = test_name_to_column_id();
603        let table_id = 1025;
604        let schema = test_schema();
605        let row = test_row("greptimedb", "127.0.0.1");
606        let rows = Rows {
607            schema,
608            rows: vec![row],
609        };
610        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
611        let row_iter = rows_iter.iter_mut().next().unwrap();
612        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
613        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
614        assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
615
616        // Change the column order
617        let schema = vec![
618            ColumnSchema {
619                column_name: "host".to_string(),
620                datatype: ColumnDataType::String as i32,
621                semantic_type: SemanticType::Tag as _,
622                datatype_extension: None,
623                options: None,
624            },
625            ColumnSchema {
626                column_name: "namespace".to_string(),
627                datatype: ColumnDataType::String as i32,
628                semantic_type: SemanticType::Tag as _,
629                datatype_extension: None,
630                options: None,
631            },
632        ];
633        let row = test_row("127.0.0.1", "greptimedb");
634        let rows = Rows {
635            schema,
636            rows: vec![row],
637        };
638        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
639        let row_iter = rows_iter.iter_mut().next().unwrap();
640        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
641        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
642        assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
643    }
644
645    /// Helper function to create a schema with multiple label columns
646    fn create_multi_label_schema(labels: &[&str]) -> Vec<ColumnSchema> {
647        labels
648            .iter()
649            .map(|name| ColumnSchema {
650                column_name: name.to_string(),
651                datatype: ColumnDataType::String as i32,
652                semantic_type: SemanticType::Tag as _,
653                datatype_extension: None,
654                options: None,
655            })
656            .collect()
657    }
658
659    /// Helper function to create a name_to_column_id map
660    fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnId> {
661        labels
662            .iter()
663            .enumerate()
664            .map(|(idx, name)| (name.to_string(), idx as ColumnId + 1))
665            .collect()
666    }
667
668    /// Helper function to create a row with string values
669    fn create_row_with_values(values: &[&str]) -> Row {
670        Row {
671            values: values
672                .iter()
673                .map(|v| ValueData::StringValue(v.to_string()).into())
674                .collect(),
675        }
676    }
677
678    /// Helper function to create a row with some null values
679    fn create_row_with_nulls(values: &[Option<&str>]) -> Row {
680        Row {
681            values: values
682                .iter()
683                .map(|v| {
684                    v.map(|s| ValueData::StringValue(s.to_string()).into())
685                        .unwrap_or(Value { value_data: None })
686                })
687                .collect(),
688        }
689    }
690
691    /// Helper function to extract TSID from a row
692    fn extract_tsid(
693        schema: Vec<ColumnSchema>,
694        row: Row,
695        name_to_column_id: &HashMap<String, ColumnId>,
696        table_id: TableId,
697    ) -> u64 {
698        let rows = Rows {
699            schema,
700            rows: vec![row],
701        };
702        let mut rows_iter = RowsIter::new(rows, name_to_column_id);
703        let row_iter = rows_iter.iter_mut().next().unwrap();
704        let (_, tsid_value) = RowModifier::fill_internal_columns(table_id, &row_iter);
705        match tsid_value.value_data {
706            Some(ValueData::U64Value(tsid)) => tsid,
707            _ => panic!("Expected U64Value for TSID"),
708        }
709    }
710
711    #[test]
712    fn test_tsid_same_for_different_label_orders() {
713        // Test that rows with the same label name-value pairs but in different orders
714        // produce the same TSID
715        let table_id = 1025;
716
717        // Schema 1: a, b, c
718        let schema1 = create_multi_label_schema(&["a", "b", "c"]);
719        let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
720        let row1 = create_row_with_values(&["A", "B", "C"]);
721        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
722
723        // Schema 2: b, a, c (different order)
724        let schema2 = create_multi_label_schema(&["b", "a", "c"]);
725        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
726        let row2 = create_row_with_values(&["B", "A", "C"]);
727        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
728
729        // Schema 3: c, b, a (another different order)
730        let schema3 = create_multi_label_schema(&["c", "b", "a"]);
731        let name_to_column_id3 = create_name_to_column_id(&["a", "b", "c"]);
732        let row3 = create_row_with_values(&["C", "B", "A"]);
733        let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
734
735        // All should have the same TSID since label names are sorted lexicographically
736        // and we're using the same label name-value pairs
737        assert_eq!(
738            tsid1, tsid2,
739            "TSID should be same for different column orders"
740        );
741        assert_eq!(
742            tsid2, tsid3,
743            "TSID should be same for different column orders"
744        );
745    }
746
747    #[test]
748    fn test_tsid_same_with_null_labels() {
749        // Test that rows that differ only by null label values produce the same TSID
750        let table_id = 1025;
751
752        // Row 1: a=A, b=B (no nulls, fast path)
753        let schema1 = create_multi_label_schema(&["a", "b"]);
754        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
755        let row1 = create_row_with_values(&["A", "B"]);
756        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
757
758        // Row 2: a=A, b=B, c=null (has null, slow path)
759        let schema2 = create_multi_label_schema(&["a", "b", "c"]);
760        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
761        let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None]);
762        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
763
764        // Both should have the same TSID since null labels are ignored
765        assert_eq!(
766            tsid1, tsid2,
767            "TSID should be same when only difference is null label values"
768        );
769    }
770
771    #[test]
772    fn test_tsid_same_with_multiple_null_labels() {
773        // Test with multiple null labels
774        let table_id = 1025;
775
776        // Row 1: a=A, b=B (no nulls)
777        let schema1 = create_multi_label_schema(&["a", "b"]);
778        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
779        let row1 = create_row_with_values(&["A", "B"]);
780        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
781
782        // Row 2: a=A, b=B, c=null, d=null (multiple nulls)
783        let schema2 = create_multi_label_schema(&["a", "b", "c", "d"]);
784        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c", "d"]);
785        let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None, None]);
786        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
787
788        assert_eq!(
789            tsid1, tsid2,
790            "TSID should be same when only difference is multiple null label values"
791        );
792    }
793
794    #[test]
795    fn test_tsid_different_with_different_non_null_values() {
796        // Test that rows with different non-null values produce different TSIDs
797        let table_id = 1025;
798
799        // Row 1: a=A, b=B
800        let schema1 = create_multi_label_schema(&["a", "b"]);
801        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
802        let row1 = create_row_with_values(&["A", "B"]);
803        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
804
805        // Row 2: a=A, b=C (different value for b)
806        let schema2 = create_multi_label_schema(&["a", "b"]);
807        let name_to_column_id2 = create_name_to_column_id(&["a", "b"]);
808        let row2 = create_row_with_values(&["A", "C"]);
809        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
810
811        assert_ne!(
812            tsid1, tsid2,
813            "TSID should be different when label values differ"
814        );
815    }
816
817    #[test]
818    fn test_tsid_fast_path_vs_slow_path_consistency() {
819        // Test that fast path (no nulls) and slow path (with nulls) produce
820        // the same TSID for the same non-null label values
821        let table_id = 1025;
822
823        // Fast path: a=A, b=B (no nulls)
824        let schema_fast = create_multi_label_schema(&["a", "b"]);
825        let name_to_column_id_fast = create_name_to_column_id(&["a", "b"]);
826        let row_fast = create_row_with_values(&["A", "B"]);
827        let tsid_fast = extract_tsid(schema_fast, row_fast, &name_to_column_id_fast, table_id);
828
829        // Slow path: a=A, b=B, c=null (has null, triggers slow path)
830        let schema_slow = create_multi_label_schema(&["a", "b", "c"]);
831        let name_to_column_id_slow = create_name_to_column_id(&["a", "b", "c"]);
832        let row_slow = create_row_with_nulls(&[Some("A"), Some("B"), None]);
833        let tsid_slow = extract_tsid(schema_slow, row_slow, &name_to_column_id_slow, table_id);
834
835        assert_eq!(
836            tsid_fast, tsid_slow,
837            "Fast path and slow path should produce same TSID for same non-null values"
838        );
839    }
840
841    #[test]
842    fn test_tsid_with_null_in_middle() {
843        // Test with null in the middle of labels
844        let table_id = 1025;
845
846        // Row 1: a=A, b=B, c=C
847        let schema1 = create_multi_label_schema(&["a", "b", "c"]);
848        let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
849        let row1 = create_row_with_values(&["A", "B", "C"]);
850        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
851
852        // Row 2: a=A, b=null, c=C (null in middle)
853        let schema2 = create_multi_label_schema(&["a", "b", "c"]);
854        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
855        let row2 = create_row_with_nulls(&[Some("A"), None, Some("C")]);
856        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
857
858        // Should be different because b is null in row2 but B in row1
859        // Actually wait, let me reconsider - if b is null, it should be ignored
860        // So row2 should be equivalent to a=A, c=C
861        // But row1 is a=A, b=B, c=C, so they should be different
862        assert_ne!(
863            tsid1, tsid2,
864            "TSID should be different when a non-null value becomes null"
865        );
866
867        // Row 3: a=A, c=C (no b at all, equivalent to row2)
868        let schema3 = create_multi_label_schema(&["a", "c"]);
869        let name_to_column_id3 = create_name_to_column_id(&["a", "c"]);
870        let row3 = create_row_with_values(&["A", "C"]);
871        let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
872
873        // Row2 (a=A, b=null, c=C) should be same as row3 (a=A, c=C)
874        assert_eq!(
875            tsid2, tsid3,
876            "TSID should be same when null label is ignored"
877        );
878    }
879
880    #[test]
881    fn test_tsid_all_null_labels() {
882        // Test with all labels being null
883        let table_id = 1025;
884
885        // Row with all nulls
886        let schema = create_multi_label_schema(&["a", "b", "c"]);
887        let name_to_column_id = create_name_to_column_id(&["a", "b", "c"]);
888        let row = create_row_with_nulls(&[None, None, None]);
889        let tsid = extract_tsid(schema.clone(), row, &name_to_column_id, table_id);
890
891        // Should still produce a TSID (based on label names only when all values are null)
892        // This tests that the slow path handles the case where all values are null
893        // The TSID will be based on the label name hash only
894        // Test that it's consistent - same schema with all nulls should produce same TSID
895        let row2 = create_row_with_nulls(&[None, None, None]);
896        let tsid2 = extract_tsid(schema, row2, &name_to_column_id, table_id);
897        assert_eq!(
898            tsid, tsid2,
899            "TSID should be consistent when all label values are null"
900        );
901    }
902}