Skip to main content

metric_engine/
batch_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hasher;
16use std::sync::Arc;
17
18use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
19use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
20use datatypes::arrow::record_batch::RecordBatch;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use snafu::ResultExt;
24use store_api::storage::ColumnId;
25use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
26
27use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
28
29/// Info about a tag column for TSID computation and sparse primary key encoding.
30#[allow(dead_code)]
31pub struct TagColumnInfo {
32    /// Column name (used for label-name hash).
33    pub name: String,
34    /// Column index in the RecordBatch.
35    pub index: usize,
36    /// Column ID in the physical region.
37    pub column_id: ColumnId,
38}
39
40/// Computes the TSID for each row in a [RecordBatch].
41///
42/// The TSID is a stable hash of the set of labels (tags) present in each row.
43/// It accounts for both the names and values of all non-null tag columns.
44///
45/// # Logic
46/// - If a row has no nulls across all `sorted_tag_columns`, it uses a precomputed hash of all label names.
47/// - If a row has nulls, it dynamically computes a hash of the names of labels that are present (non-null).
48/// - In both cases, it then hashes the values of those present labels in the order specified by `sorted_tag_columns`.
49pub fn compute_tsid_array(
50    batch: &RecordBatch,
51    sorted_tag_columns: &[TagColumnInfo],
52    tag_arrays: &[&StringArray],
53) -> UInt64Array {
54    let num_rows = batch.num_rows();
55
56    let label_name_hash = {
57        let mut hasher = FxHasher::default();
58        for tag_col in sorted_tag_columns {
59            hasher.write(tag_col.name.as_bytes());
60            hasher.write_u8(0xff);
61        }
62        hasher.finish()
63    };
64
65    let mut tsid_values = Vec::with_capacity(num_rows);
66    for row in 0..num_rows {
67        let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
68
69        let tsid = if !has_null {
70            let mut hasher = FxHasher::default();
71            hasher.write_u64(label_name_hash);
72            for arr in tag_arrays {
73                hasher.write(arr.value(row).as_bytes());
74                hasher.write_u8(0xff);
75            }
76            hasher.finish()
77        } else {
78            let mut name_hasher = FxHasher::default();
79            for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
80                if !arr.is_null(row) {
81                    name_hasher.write(tc.name.as_bytes());
82                    name_hasher.write_u8(0xff);
83                }
84            }
85            let row_label_hash = name_hasher.finish();
86
87            let mut val_hasher = FxHasher::default();
88            val_hasher.write_u64(row_label_hash);
89            for arr in tag_arrays {
90                if !arr.is_null(row) {
91                    val_hasher.write(arr.value(row).as_bytes());
92                    val_hasher.write_u8(0xff);
93                }
94            }
95            val_hasher.finish()
96        };
97
98        tsid_values.push(tsid);
99    }
100
101    UInt64Array::from(tsid_values)
102}
103
104fn build_tag_arrays<'a>(
105    batch: &'a RecordBatch,
106    sorted_tag_columns: &[TagColumnInfo],
107) -> Vec<&'a StringArray> {
108    sorted_tag_columns
109        .iter()
110        .map(|tc| {
111            batch
112                .column(tc.index)
113                .as_any()
114                .downcast_ref::<StringArray>()
115                .expect("tag column must be utf8")
116        })
117        .collect()
118}
119
120/// Modifies a [RecordBatch] to include a sparse primary key column.
121///
122/// This function transforms the input `batch` into a new `RecordBatch` where the first column
123/// is the generated primary key (named [PRIMARY_KEY_COLUMN_NAME]), followed by columns
124/// indicated by `extra_column_indices`.
125///
126/// The primary key uses a "sparse" encoding, which compactly represents the row's identity
127/// by only including non-null tag values. The encoding, handled by [SparsePrimaryKeyCodec],
128/// consists of:
129/// 1. The `table_id`.
130/// 2. A `tsid` (Time Series ID), which is a hash of the present tags.
131/// 3. The actual non-null tag values paired with their `column_id`.
132///
133/// # Parameters
134/// - `batch`: The source [RecordBatch].
135/// - `table_id`: The ID of the table.
136/// - `sorted_tag_columns`: Metadata for tag columns, used for both TSID computation and PK encoding.
137/// - `extra_column_indices`: Indices of columns from the original batch to keep in the output
138///   (typically the timestamp and value fields).
139pub fn modify_batch_sparse(
140    batch: RecordBatch,
141    table_id: u32,
142    sorted_tag_columns: &[TagColumnInfo],
143    extra_column_indices: &[usize],
144) -> Result<RecordBatch> {
145    let num_rows = batch.num_rows();
146    let codec = SparsePrimaryKeyCodec::schemaless();
147    let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
148    let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
149
150    let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
151    let mut buffer = Vec::new();
152    for row in 0..num_rows {
153        buffer.clear();
154        codec
155            .encode_internal(table_id, tsid_array.value(row), &mut buffer)
156            .context(EncodePrimaryKeySnafu)?;
157
158        let tags = sorted_tag_columns
159            .iter()
160            .zip(tag_arrays.iter())
161            .filter(|(_, arr)| !arr.is_null(row))
162            .map(|(tc, arr)| (tc.column_id, arr.value(row).as_bytes()));
163        codec
164            .encode_raw_tag_value(tags, &mut buffer)
165            .context(EncodePrimaryKeySnafu)?;
166
167        pk_builder.append_value(&buffer);
168    }
169
170    let pk_array = pk_builder.finish();
171
172    let mut fields = vec![Arc::new(Field::new(
173        PRIMARY_KEY_COLUMN_NAME,
174        DataType::Binary,
175        false,
176    ))];
177    let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
178
179    for &idx in extra_column_indices {
180        fields.push(batch.schema().fields()[idx].clone());
181        columns.push(batch.column(idx).clone());
182    }
183
184    let new_schema = Arc::new(ArrowSchema::new(fields));
185    RecordBatch::try_new(new_schema, columns).map_err(|e| {
186        UnexpectedRequestSnafu {
187            reason: format!("Failed to build modified sparse RecordBatch: {e}"),
188        }
189        .build()
190    })
191}
192
193#[cfg(test)]
194mod tests {
195    use std::collections::HashMap;
196    use std::sync::Arc;
197
198    use api::v1::value::ValueData;
199    use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
200    use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
201    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
202    use datatypes::arrow::record_batch::RecordBatch;
203    use store_api::codec::PrimaryKeyEncoding;
204    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
205
206    use super::*;
207    use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
208
209    fn build_sparse_test_batch() -> RecordBatch {
210        let schema = Arc::new(ArrowSchema::new(vec![
211            Field::new("greptime_timestamp", DataType::Int64, false),
212            Field::new("greptime_value", DataType::Float64, true),
213            Field::new("namespace", DataType::Utf8, true),
214            Field::new("host", DataType::Utf8, true),
215        ]));
216        RecordBatch::try_new(
217            schema,
218            vec![
219                Arc::new(Int64Array::from(vec![1000])),
220                Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
221                Arc::new(StringArray::from(vec!["greptimedb"])),
222                Arc::new(StringArray::from(vec!["127.0.0.1"])),
223            ],
224        )
225        .unwrap()
226    }
227
228    fn sparse_tag_columns() -> Vec<TagColumnInfo> {
229        vec![
230            TagColumnInfo {
231                name: "host".to_string(),
232                index: 3,
233                column_id: 3,
234            },
235            TagColumnInfo {
236                name: "namespace".to_string(),
237                index: 2,
238                column_id: 2,
239            },
240        ]
241    }
242
243    #[test]
244    fn test_compute_tsid_basic() {
245        let schema = Arc::new(ArrowSchema::new(vec![
246            Field::new("namespace", DataType::Utf8, true),
247            Field::new("host", DataType::Utf8, true),
248        ]));
249        let batch = RecordBatch::try_new(
250            schema,
251            vec![
252                Arc::new(StringArray::from(vec!["greptimedb"])),
253                Arc::new(StringArray::from(vec!["127.0.0.1"])),
254            ],
255        )
256        .unwrap();
257
258        let tag_columns: Vec<TagColumnInfo> = vec![
259            TagColumnInfo {
260                name: "host".to_string(),
261                index: 1,
262                column_id: 2,
263            },
264            TagColumnInfo {
265                name: "namespace".to_string(),
266                index: 0,
267                column_id: 1,
268            },
269        ];
270        let tag_arrays = build_tag_arrays(&batch, &tag_columns);
271        let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
272
273        assert_eq!(tsid_array.value(0), 2721566936019240841);
274    }
275
276    #[test]
277    fn test_compute_tsid_with_nulls() {
278        let schema = Arc::new(ArrowSchema::new(vec![
279            Field::new("a", DataType::Utf8, true),
280            Field::new("b", DataType::Utf8, true),
281        ]));
282        let batch_no_null = RecordBatch::try_new(
283            schema.clone(),
284            vec![
285                Arc::new(StringArray::from(vec!["A"])),
286                Arc::new(StringArray::from(vec!["B"])),
287            ],
288        )
289        .unwrap();
290        let tag_cols_2: Vec<TagColumnInfo> = vec![
291            TagColumnInfo {
292                name: "a".to_string(),
293                index: 0,
294                column_id: 1,
295            },
296            TagColumnInfo {
297                name: "b".to_string(),
298                index: 1,
299                column_id: 2,
300            },
301        ];
302        let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
303        let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
304
305        let schema3 = Arc::new(ArrowSchema::new(vec![
306            Field::new("a", DataType::Utf8, true),
307            Field::new("b", DataType::Utf8, true),
308            Field::new("c", DataType::Utf8, true),
309        ]));
310        let batch_with_null = RecordBatch::try_new(
311            schema3,
312            vec![
313                Arc::new(StringArray::from(vec!["A"])),
314                Arc::new(StringArray::from(vec!["B"])),
315                Arc::new(StringArray::from(vec![None as Option<&str>])),
316            ],
317        )
318        .unwrap();
319        let tag_cols_3: Vec<TagColumnInfo> = vec![
320            TagColumnInfo {
321                name: "a".to_string(),
322                index: 0,
323                column_id: 1,
324            },
325            TagColumnInfo {
326                name: "b".to_string(),
327                index: 1,
328                column_id: 2,
329            },
330            TagColumnInfo {
331                name: "c".to_string(),
332                index: 2,
333                column_id: 3,
334            },
335        ];
336        let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
337        let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
338
339        assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
340    }
341
342    #[test]
343    fn test_modify_batch_sparse() {
344        let batch = build_sparse_test_batch();
345        let tag_columns = sparse_tag_columns();
346        let non_tag_indices = vec![0, 1];
347        let table_id: u32 = 1025;
348
349        let modified =
350            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
351
352        assert_eq!(modified.num_columns(), 3);
353        assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
354        assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
355        assert_eq!(modified.schema().field(2).name(), "greptime_value");
356    }
357
358    #[test]
359    fn test_modify_batch_sparse_matches_row_modifier() {
360        let batch = build_sparse_test_batch();
361        let tag_columns = sparse_tag_columns();
362        let non_tag_indices = vec![0, 1];
363        let table_id: u32 = 1025;
364        let modified =
365            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
366
367        let name_to_column_id: HashMap<String, ColumnId> = [
368            ("greptime_timestamp".to_string(), 0),
369            ("greptime_value".to_string(), 1),
370            ("namespace".to_string(), 2),
371            ("host".to_string(), 3),
372        ]
373        .into_iter()
374        .collect();
375
376        let rows = Rows {
377            schema: vec![
378                ColumnSchema {
379                    column_name: "greptime_timestamp".to_string(),
380                    datatype: ColumnDataType::TimestampMillisecond as i32,
381                    semantic_type: SemanticType::Timestamp as i32,
382                    ..Default::default()
383                },
384                ColumnSchema {
385                    column_name: "greptime_value".to_string(),
386                    datatype: ColumnDataType::Float64 as i32,
387                    semantic_type: SemanticType::Field as i32,
388                    ..Default::default()
389                },
390                ColumnSchema {
391                    column_name: "namespace".to_string(),
392                    datatype: ColumnDataType::String as i32,
393                    semantic_type: SemanticType::Tag as i32,
394                    ..Default::default()
395                },
396                ColumnSchema {
397                    column_name: "host".to_string(),
398                    datatype: ColumnDataType::String as i32,
399                    semantic_type: SemanticType::Tag as i32,
400                    ..Default::default()
401                },
402            ],
403            rows: vec![Row {
404                values: vec![
405                    Value {
406                        value_data: Some(ValueData::TimestampMillisecondValue(1000)),
407                    },
408                    Value {
409                        value_data: Some(ValueData::F64Value(42.0)),
410                    },
411                    Value {
412                        value_data: Some(ValueData::StringValue("greptimedb".to_string())),
413                    },
414                    Value {
415                        value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
416                    },
417                ],
418            }],
419        };
420
421        let row_iter = RowsIter::new(rows, &name_to_column_id);
422        let rows = RowModifier::default()
423            .modify_rows(
424                row_iter,
425                TableIdInput::Single(table_id),
426                PrimaryKeyEncoding::Sparse,
427            )
428            .unwrap();
429        let ValueData::BinaryValue(expected_pk) =
430            rows.rows[0].values[0].value_data.clone().unwrap()
431        else {
432            panic!("expected binary primary key");
433        };
434
435        let actual_array = modified
436            .column(0)
437            .as_any()
438            .downcast_ref::<BinaryArray>()
439            .unwrap();
440        assert_eq!(actual_array.value(0), expected_pk.as_slice());
441    }
442}