metric_engine/
row_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::hash::Hasher;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
20use datatypes::value::ValueRef;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use smallvec::SmallVec;
24use snafu::ResultExt;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metric_engine_consts::{
27    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
28};
29use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
30use store_api::storage::{ColumnId, TableId};
31
32use crate::error::{EncodePrimaryKeySnafu, Result};
33
34/// A row modifier modifies [`Rows`].
35///
36/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
37///   it replaces the primary key columns with the encoded primary key column(`__primary_key`).
38///
39/// - For [`PrimaryKeyEncoding::Dense`] encoding,
40///   it adds two columns(`__table_id`, `__tsid`) to the row.
41pub struct RowModifier {
42    codec: SparsePrimaryKeyCodec,
43}
44
45impl Default for RowModifier {
46    fn default() -> Self {
47        Self {
48            codec: SparsePrimaryKeyCodec::schemaless(),
49        }
50    }
51}
52
53impl RowModifier {
54    /// Modify rows with the given primary key encoding.
55    pub(crate) fn modify_rows(
56        &self,
57        iter: RowsIter,
58        table_id: TableId,
59        encoding: PrimaryKeyEncoding,
60    ) -> Result<Rows> {
61        match encoding {
62            PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id),
63            PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id),
64        }
65    }
66
67    /// Modifies rows with sparse primary key encoding.
68    /// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
69    fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
70        let num_column = iter.rows.schema.len();
71        let num_primary_key_column = iter.index.num_primary_key_column;
72        // num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column)
73        let num_output_column = num_column - num_primary_key_column + 1;
74
75        let mut buffer = vec![];
76
77        for mut iter in iter.iter_mut() {
78            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
79            let mut values = Vec::with_capacity(num_output_column);
80            buffer.clear();
81            let internal_columns = [
82                (
83                    ReservedColumnId::table_id(),
84                    api::helper::pb_value_to_value_ref(&table_id, None),
85                ),
86                (
87                    ReservedColumnId::tsid(),
88                    api::helper::pb_value_to_value_ref(&tsid, None),
89                ),
90            ];
91            self.codec
92                .encode_to_vec(internal_columns.into_iter(), &mut buffer)
93                .context(EncodePrimaryKeySnafu)?;
94            self.codec
95                .encode_to_vec(iter.primary_keys(), &mut buffer)
96                .context(EncodePrimaryKeySnafu)?;
97
98            values.push(ValueData::BinaryValue(buffer.clone()).into());
99            values.extend(iter.remaining());
100            // Replace the row with the encoded row
101            *iter.row = Row { values };
102        }
103
104        // Update the schema
105        let mut schema = Vec::with_capacity(num_output_column);
106        schema.push(ColumnSchema {
107            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
108            datatype: ColumnDataType::Binary as i32,
109            semantic_type: SemanticType::Tag as _,
110            datatype_extension: None,
111            options: None,
112        });
113        schema.extend(iter.remaining_columns());
114        iter.rows.schema = schema;
115
116        Ok(iter.rows)
117    }
118
119    /// Modifies rows with dense primary key encoding.
120    /// It adds two columns(`__table_id`, `__tsid`) to the row.
121    fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
122        // add table_name column
123        iter.rows.schema.push(ColumnSchema {
124            column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
125            datatype: ColumnDataType::Uint32 as i32,
126            semantic_type: SemanticType::Tag as _,
127            datatype_extension: None,
128            options: None,
129        });
130        // add tsid column
131        iter.rows.schema.push(ColumnSchema {
132            column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
133            datatype: ColumnDataType::Uint64 as i32,
134            semantic_type: SemanticType::Tag as _,
135            datatype_extension: None,
136            options: None,
137        });
138        for iter in iter.iter_mut() {
139            let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
140            iter.row.values.push(table_id);
141            iter.row.values.push(tsid);
142        }
143
144        Ok(iter.rows)
145    }
146
147    /// Fills internal columns of a row with table name and a hash of tag values.
148    pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
149        let ts_id = if !iter.has_null_labels() {
150            // No null labels in row, we can safely reuse the precomputed label name hash.
151            let mut ts_id_gen = TsidGenerator::new(iter.index.label_name_hash);
152            for (_, value) in iter.primary_keys_with_name() {
153                // The type is checked before. So only null is ignored.
154                if let Some(ValueData::StringValue(string)) = &value.value_data {
155                    ts_id_gen.write_str(string);
156                } else {
157                    unreachable!(
158                        "Should not contain null or non-string value: {:?}, table id: {}",
159                        value, table_id
160                    );
161                }
162            }
163            ts_id_gen.finish()
164        } else {
165            // Slow path: row contains null, recompute label hash
166            let mut hasher = TsidGenerator::default();
167            // 1. Find out label names with non-null values and get the hash.
168            for (name, value) in iter.primary_keys_with_name() {
169                // The type is checked before. So only null is ignored.
170                if let Some(ValueData::StringValue(_)) = &value.value_data {
171                    hasher.write_str(name);
172                }
173            }
174            let label_name_hash = hasher.finish();
175
176            // 2. Use label name hash as seed and continue with label values.
177            let mut final_hasher = TsidGenerator::new(label_name_hash);
178            for (_, value) in iter.primary_keys_with_name() {
179                if let Some(ValueData::StringValue(value)) = &value.value_data {
180                    final_hasher.write_str(value);
181                }
182            }
183            final_hasher.finish()
184        };
185
186        (
187            ValueData::U32Value(table_id).into(),
188            ValueData::U64Value(ts_id).into(),
189        )
190    }
191}
192
193/// Tsid generator.
194#[derive(Default)]
195pub struct TsidGenerator {
196    hasher: FxHasher,
197}
198
199impl TsidGenerator {
200    pub fn new(label_name_hash: u64) -> Self {
201        let mut hasher = FxHasher::default();
202        hasher.write_u64(label_name_hash);
203        Self { hasher }
204    }
205
206    /// Writes a label pair to the generator.
207    pub fn write_str(&mut self, value: &str) {
208        self.hasher.write(value.as_bytes());
209        self.hasher.write_u8(0xff);
210    }
211
212    /// Generates a new TSID.
213    pub fn finish(&mut self) -> u64 {
214        self.hasher.finish()
215    }
216}
217
218/// Index of a value.
219#[derive(Debug, Clone, Copy)]
220struct ValueIndex {
221    column_id: ColumnId,
222    index: usize,
223}
224
225/// Index of a row.
226struct IterIndex {
227    indices: Vec<ValueIndex>,
228    num_primary_key_column: usize,
229    /// Precomputed hash for label names.
230    label_name_hash: u64,
231}
232
233impl IterIndex {
234    fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
235        let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
236        // Uses BTreeMap to keep the primary key column name order (lexicographical)
237        let mut primary_key_indices = BTreeMap::new();
238        let mut field_indices = SmallVec::<[ValueIndex; 1]>::new();
239        let mut ts_index = None;
240        for (idx, col) in row_schema.iter().enumerate() {
241            match col.semantic_type() {
242                SemanticType::Tag => match col.column_name.as_str() {
243                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME => {
244                        reserved_indices.push(ValueIndex {
245                            column_id: ReservedColumnId::table_id(),
246                            index: idx,
247                        });
248                    }
249                    DATA_SCHEMA_TSID_COLUMN_NAME => {
250                        reserved_indices.push(ValueIndex {
251                            column_id: ReservedColumnId::tsid(),
252                            index: idx,
253                        });
254                    }
255                    _ => {
256                        // Inserts primary key column name follower the column name order (lexicographical)
257                        primary_key_indices.insert(
258                            col.column_name.as_str(),
259                            ValueIndex {
260                                column_id: *name_to_column_id.get(&col.column_name).unwrap(),
261                                index: idx,
262                            },
263                        );
264                    }
265                },
266                SemanticType::Field => {
267                    field_indices.push(ValueIndex {
268                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
269                        index: idx,
270                    });
271                }
272                SemanticType::Timestamp => {
273                    ts_index = Some(ValueIndex {
274                        column_id: *name_to_column_id.get(&col.column_name).unwrap(),
275                        index: idx,
276                    });
277                }
278            }
279        }
280        let num_primary_key_column = primary_key_indices.len() + reserved_indices.len();
281        let mut indices = Vec::with_capacity(num_primary_key_column + 2);
282        indices.extend(reserved_indices);
283        let mut label_name_hasher = TsidGenerator::default();
284        for (pk_name, pk_index) in primary_key_indices {
285            // primary_key_indices already sorted.
286            label_name_hasher.write_str(pk_name);
287            indices.push(pk_index);
288        }
289        let label_name_hash = label_name_hasher.finish();
290
291        indices.extend(ts_index);
292        indices.extend(field_indices);
293        IterIndex {
294            indices,
295            num_primary_key_column,
296            label_name_hash,
297        }
298    }
299}
300
301/// Iterator of rows.
302pub struct RowsIter {
303    rows: Rows,
304    index: IterIndex,
305}
306
307impl RowsIter {
308    pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
309        let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
310        Self { rows, index }
311    }
312
313    /// Returns the iterator of rows.
314    pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter<'_>> {
315        self.rows.rows.iter_mut().map(|row| RowIter {
316            row,
317            index: &self.index,
318            schema: &self.rows.schema,
319        })
320    }
321
322    /// Returns the remaining columns.
323    fn remaining_columns(&mut self) -> impl Iterator<Item = ColumnSchema> + '_ {
324        self.index.indices[self.index.num_primary_key_column..]
325            .iter()
326            .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
327    }
328}
329
330/// Iterator of a row.
331pub struct RowIter<'a> {
332    row: &'a mut Row,
333    index: &'a IterIndex,
334    schema: &'a Vec<ColumnSchema>,
335}
336
337impl RowIter<'_> {
338    /// Returns the primary keys with their names.
339    fn primary_keys_with_name(&self) -> impl Iterator<Item = (&String, &Value)> {
340        self.index.indices[..self.index.num_primary_key_column]
341            .iter()
342            .map(|idx| {
343                (
344                    &self.schema[idx.index].column_name,
345                    &self.row.values[idx.index],
346                )
347            })
348    }
349
350    /// Returns true if any label in current row is null.
351    fn has_null_labels(&self) -> bool {
352        self.index.indices[..self.index.num_primary_key_column]
353            .iter()
354            .any(|idx| self.row.values[idx.index].value_data.is_none())
355    }
356
357    /// Returns the primary keys.
358    pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef<'_>)> {
359        self.index.indices[..self.index.num_primary_key_column]
360            .iter()
361            .map(|idx| {
362                (
363                    idx.column_id,
364                    api::helper::pb_value_to_value_ref(
365                        &self.row.values[idx.index],
366                        self.schema[idx.index].datatype_extension.as_ref(),
367                    ),
368                )
369            })
370    }
371
372    /// Returns the remaining columns.
373    fn remaining(&mut self) -> impl Iterator<Item = Value> + '_ {
374        self.index.indices[self.index.num_primary_key_column..]
375            .iter()
376            .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
377    }
378
379    /// Returns value at given offset.
380    /// # Panics
381    /// Panics if offset out-of-bound
382    pub fn value_at(&self, idx: usize) -> &Value {
383        &self.row.values[idx]
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use std::collections::HashMap;
390
391    use api::v1::{Row, Rows};
392
393    use super::*;
394
395    fn test_schema() -> Vec<ColumnSchema> {
396        vec![
397            ColumnSchema {
398                column_name: "namespace".to_string(),
399                datatype: ColumnDataType::String as i32,
400                semantic_type: SemanticType::Tag as _,
401                datatype_extension: None,
402                options: None,
403            },
404            ColumnSchema {
405                column_name: "host".to_string(),
406                datatype: ColumnDataType::String as i32,
407                semantic_type: SemanticType::Tag as _,
408                datatype_extension: None,
409                options: None,
410            },
411        ]
412    }
413
414    fn test_row(v1: &str, v2: &str) -> Row {
415        Row {
416            values: vec![
417                ValueData::StringValue(v1.to_string()).into(),
418                ValueData::StringValue(v2.to_string()).into(),
419            ],
420        }
421    }
422
423    fn test_name_to_column_id() -> HashMap<String, ColumnId> {
424        HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
425    }
426
427    #[test]
428    fn test_encode_sparse() {
429        let name_to_column_id = test_name_to_column_id();
430        let encoder = RowModifier::default();
431        let table_id = 1025;
432        let schema = test_schema();
433        let row = test_row("greptimedb", "127.0.0.1");
434        let rows = Rows {
435            schema,
436            rows: vec![row],
437        };
438        let rows_iter = RowsIter::new(rows, &name_to_column_id);
439        let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap();
440        assert_eq!(result.rows[0].values.len(), 1);
441        let encoded_primary_key = vec![
442            128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 37, 196, 242, 181, 117, 224, 7, 137, 0,
443            0, 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
444            1, 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2,
445        ];
446        assert_eq!(
447            result.rows[0].values[0],
448            ValueData::BinaryValue(encoded_primary_key).into()
449        );
450        assert_eq!(result.schema, expected_sparse_schema());
451    }
452
453    fn expected_sparse_schema() -> Vec<ColumnSchema> {
454        vec![ColumnSchema {
455            column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
456            datatype: ColumnDataType::Binary as i32,
457            semantic_type: SemanticType::Tag as _,
458            datatype_extension: None,
459            options: None,
460        }]
461    }
462
463    fn expected_dense_schema() -> Vec<ColumnSchema> {
464        vec![
465            ColumnSchema {
466                column_name: "namespace".to_string(),
467                datatype: ColumnDataType::String as i32,
468                semantic_type: SemanticType::Tag as _,
469                datatype_extension: None,
470                options: None,
471            },
472            ColumnSchema {
473                column_name: "host".to_string(),
474                datatype: ColumnDataType::String as i32,
475                semantic_type: SemanticType::Tag as _,
476                datatype_extension: None,
477                options: None,
478            },
479            ColumnSchema {
480                column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
481                datatype: ColumnDataType::Uint32 as i32,
482                semantic_type: SemanticType::Tag as _,
483                datatype_extension: None,
484                options: None,
485            },
486            ColumnSchema {
487                column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
488                datatype: ColumnDataType::Uint64 as i32,
489                semantic_type: SemanticType::Tag as _,
490                datatype_extension: None,
491                options: None,
492            },
493        ]
494    }
495
496    #[test]
497    fn test_encode_dense() {
498        let name_to_column_id = test_name_to_column_id();
499        let encoder = RowModifier::default();
500        let table_id = 1025;
501        let schema = test_schema();
502        let row = test_row("greptimedb", "127.0.0.1");
503        let rows = Rows {
504            schema,
505            rows: vec![row],
506        };
507        let rows_iter = RowsIter::new(rows, &name_to_column_id);
508        let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap();
509        assert_eq!(
510            result.rows[0].values[0],
511            ValueData::StringValue("greptimedb".to_string()).into()
512        );
513        assert_eq!(
514            result.rows[0].values[1],
515            ValueData::StringValue("127.0.0.1".to_string()).into()
516        );
517        assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into());
518        assert_eq!(
519            result.rows[0].values[3],
520            ValueData::U64Value(2721566936019240841).into()
521        );
522        assert_eq!(result.schema, expected_dense_schema());
523    }
524
525    #[test]
526    fn test_fill_internal_columns() {
527        let name_to_column_id = test_name_to_column_id();
528        let table_id = 1025;
529        let schema = test_schema();
530        let row = test_row("greptimedb", "127.0.0.1");
531        let rows = Rows {
532            schema,
533            rows: vec![row],
534        };
535        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
536        let row_iter = rows_iter.iter_mut().next().unwrap();
537        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
538        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
539        assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
540
541        // Change the column order
542        let schema = vec![
543            ColumnSchema {
544                column_name: "host".to_string(),
545                datatype: ColumnDataType::String as i32,
546                semantic_type: SemanticType::Tag as _,
547                datatype_extension: None,
548                options: None,
549            },
550            ColumnSchema {
551                column_name: "namespace".to_string(),
552                datatype: ColumnDataType::String as i32,
553                semantic_type: SemanticType::Tag as _,
554                datatype_extension: None,
555                options: None,
556            },
557        ];
558        let row = test_row("127.0.0.1", "greptimedb");
559        let rows = Rows {
560            schema,
561            rows: vec![row],
562        };
563        let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
564        let row_iter = rows_iter.iter_mut().next().unwrap();
565        let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
566        assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
567        assert_eq!(tsid, ValueData::U64Value(2721566936019240841).into());
568    }
569
570    /// Helper function to create a schema with multiple label columns
571    fn create_multi_label_schema(labels: &[&str]) -> Vec<ColumnSchema> {
572        labels
573            .iter()
574            .map(|name| ColumnSchema {
575                column_name: name.to_string(),
576                datatype: ColumnDataType::String as i32,
577                semantic_type: SemanticType::Tag as _,
578                datatype_extension: None,
579                options: None,
580            })
581            .collect()
582    }
583
584    /// Helper function to create a name_to_column_id map
585    fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnId> {
586        labels
587            .iter()
588            .enumerate()
589            .map(|(idx, name)| (name.to_string(), idx as ColumnId + 1))
590            .collect()
591    }
592
593    /// Helper function to create a row with string values
594    fn create_row_with_values(values: &[&str]) -> Row {
595        Row {
596            values: values
597                .iter()
598                .map(|v| ValueData::StringValue(v.to_string()).into())
599                .collect(),
600        }
601    }
602
603    /// Helper function to create a row with some null values
604    fn create_row_with_nulls(values: &[Option<&str>]) -> Row {
605        Row {
606            values: values
607                .iter()
608                .map(|v| {
609                    v.map(|s| ValueData::StringValue(s.to_string()).into())
610                        .unwrap_or(Value { value_data: None })
611                })
612                .collect(),
613        }
614    }
615
616    /// Helper function to extract TSID from a row
617    fn extract_tsid(
618        schema: Vec<ColumnSchema>,
619        row: Row,
620        name_to_column_id: &HashMap<String, ColumnId>,
621        table_id: TableId,
622    ) -> u64 {
623        let rows = Rows {
624            schema,
625            rows: vec![row],
626        };
627        let mut rows_iter = RowsIter::new(rows, name_to_column_id);
628        let row_iter = rows_iter.iter_mut().next().unwrap();
629        let (_, tsid_value) = RowModifier::fill_internal_columns(table_id, &row_iter);
630        match tsid_value.value_data {
631            Some(ValueData::U64Value(tsid)) => tsid,
632            _ => panic!("Expected U64Value for TSID"),
633        }
634    }
635
636    #[test]
637    fn test_tsid_same_for_different_label_orders() {
638        // Test that rows with the same label name-value pairs but in different orders
639        // produce the same TSID
640        let table_id = 1025;
641
642        // Schema 1: a, b, c
643        let schema1 = create_multi_label_schema(&["a", "b", "c"]);
644        let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
645        let row1 = create_row_with_values(&["A", "B", "C"]);
646        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
647
648        // Schema 2: b, a, c (different order)
649        let schema2 = create_multi_label_schema(&["b", "a", "c"]);
650        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
651        let row2 = create_row_with_values(&["B", "A", "C"]);
652        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
653
654        // Schema 3: c, b, a (another different order)
655        let schema3 = create_multi_label_schema(&["c", "b", "a"]);
656        let name_to_column_id3 = create_name_to_column_id(&["a", "b", "c"]);
657        let row3 = create_row_with_values(&["C", "B", "A"]);
658        let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
659
660        // All should have the same TSID since label names are sorted lexicographically
661        // and we're using the same label name-value pairs
662        assert_eq!(
663            tsid1, tsid2,
664            "TSID should be same for different column orders"
665        );
666        assert_eq!(
667            tsid2, tsid3,
668            "TSID should be same for different column orders"
669        );
670    }
671
672    #[test]
673    fn test_tsid_same_with_null_labels() {
674        // Test that rows that differ only by null label values produce the same TSID
675        let table_id = 1025;
676
677        // Row 1: a=A, b=B (no nulls, fast path)
678        let schema1 = create_multi_label_schema(&["a", "b"]);
679        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
680        let row1 = create_row_with_values(&["A", "B"]);
681        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
682
683        // Row 2: a=A, b=B, c=null (has null, slow path)
684        let schema2 = create_multi_label_schema(&["a", "b", "c"]);
685        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
686        let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None]);
687        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
688
689        // Both should have the same TSID since null labels are ignored
690        assert_eq!(
691            tsid1, tsid2,
692            "TSID should be same when only difference is null label values"
693        );
694    }
695
696    #[test]
697    fn test_tsid_same_with_multiple_null_labels() {
698        // Test with multiple null labels
699        let table_id = 1025;
700
701        // Row 1: a=A, b=B (no nulls)
702        let schema1 = create_multi_label_schema(&["a", "b"]);
703        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
704        let row1 = create_row_with_values(&["A", "B"]);
705        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
706
707        // Row 2: a=A, b=B, c=null, d=null (multiple nulls)
708        let schema2 = create_multi_label_schema(&["a", "b", "c", "d"]);
709        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c", "d"]);
710        let row2 = create_row_with_nulls(&[Some("A"), Some("B"), None, None]);
711        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
712
713        assert_eq!(
714            tsid1, tsid2,
715            "TSID should be same when only difference is multiple null label values"
716        );
717    }
718
719    #[test]
720    fn test_tsid_different_with_different_non_null_values() {
721        // Test that rows with different non-null values produce different TSIDs
722        let table_id = 1025;
723
724        // Row 1: a=A, b=B
725        let schema1 = create_multi_label_schema(&["a", "b"]);
726        let name_to_column_id1 = create_name_to_column_id(&["a", "b"]);
727        let row1 = create_row_with_values(&["A", "B"]);
728        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
729
730        // Row 2: a=A, b=C (different value for b)
731        let schema2 = create_multi_label_schema(&["a", "b"]);
732        let name_to_column_id2 = create_name_to_column_id(&["a", "b"]);
733        let row2 = create_row_with_values(&["A", "C"]);
734        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
735
736        assert_ne!(
737            tsid1, tsid2,
738            "TSID should be different when label values differ"
739        );
740    }
741
742    #[test]
743    fn test_tsid_fast_path_vs_slow_path_consistency() {
744        // Test that fast path (no nulls) and slow path (with nulls) produce
745        // the same TSID for the same non-null label values
746        let table_id = 1025;
747
748        // Fast path: a=A, b=B (no nulls)
749        let schema_fast = create_multi_label_schema(&["a", "b"]);
750        let name_to_column_id_fast = create_name_to_column_id(&["a", "b"]);
751        let row_fast = create_row_with_values(&["A", "B"]);
752        let tsid_fast = extract_tsid(schema_fast, row_fast, &name_to_column_id_fast, table_id);
753
754        // Slow path: a=A, b=B, c=null (has null, triggers slow path)
755        let schema_slow = create_multi_label_schema(&["a", "b", "c"]);
756        let name_to_column_id_slow = create_name_to_column_id(&["a", "b", "c"]);
757        let row_slow = create_row_with_nulls(&[Some("A"), Some("B"), None]);
758        let tsid_slow = extract_tsid(schema_slow, row_slow, &name_to_column_id_slow, table_id);
759
760        assert_eq!(
761            tsid_fast, tsid_slow,
762            "Fast path and slow path should produce same TSID for same non-null values"
763        );
764    }
765
766    #[test]
767    fn test_tsid_with_null_in_middle() {
768        // Test with null in the middle of labels
769        let table_id = 1025;
770
771        // Row 1: a=A, b=B, c=C
772        let schema1 = create_multi_label_schema(&["a", "b", "c"]);
773        let name_to_column_id1 = create_name_to_column_id(&["a", "b", "c"]);
774        let row1 = create_row_with_values(&["A", "B", "C"]);
775        let tsid1 = extract_tsid(schema1, row1, &name_to_column_id1, table_id);
776
777        // Row 2: a=A, b=null, c=C (null in middle)
778        let schema2 = create_multi_label_schema(&["a", "b", "c"]);
779        let name_to_column_id2 = create_name_to_column_id(&["a", "b", "c"]);
780        let row2 = create_row_with_nulls(&[Some("A"), None, Some("C")]);
781        let tsid2 = extract_tsid(schema2, row2, &name_to_column_id2, table_id);
782
783        // Should be different because b is null in row2 but B in row1
784        // Actually wait, let me reconsider - if b is null, it should be ignored
785        // So row2 should be equivalent to a=A, c=C
786        // But row1 is a=A, b=B, c=C, so they should be different
787        assert_ne!(
788            tsid1, tsid2,
789            "TSID should be different when a non-null value becomes null"
790        );
791
792        // Row 3: a=A, c=C (no b at all, equivalent to row2)
793        let schema3 = create_multi_label_schema(&["a", "c"]);
794        let name_to_column_id3 = create_name_to_column_id(&["a", "c"]);
795        let row3 = create_row_with_values(&["A", "C"]);
796        let tsid3 = extract_tsid(schema3, row3, &name_to_column_id3, table_id);
797
798        // Row2 (a=A, b=null, c=C) should be same as row3 (a=A, c=C)
799        assert_eq!(
800            tsid2, tsid3,
801            "TSID should be same when null label is ignored"
802        );
803    }
804
805    #[test]
806    fn test_tsid_all_null_labels() {
807        // Test with all labels being null
808        let table_id = 1025;
809
810        // Row with all nulls
811        let schema = create_multi_label_schema(&["a", "b", "c"]);
812        let name_to_column_id = create_name_to_column_id(&["a", "b", "c"]);
813        let row = create_row_with_nulls(&[None, None, None]);
814        let tsid = extract_tsid(schema.clone(), row, &name_to_column_id, table_id);
815
816        // Should still produce a TSID (based on label names only when all values are null)
817        // This tests that the slow path handles the case where all values are null
818        // The TSID will be based on the label name hash only
819        // Test that it's consistent - same schema with all nulls should produce same TSID
820        let row2 = create_row_with_nulls(&[None, None, None]);
821        let tsid2 = extract_tsid(schema, row2, &name_to_column_id, table_id);
822        assert_eq!(
823            tsid, tsid2,
824            "TSID should be consistent when all label values are null"
825        );
826    }
827}