metric_engine/
batch_modifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hasher;
16use std::sync::Arc;
17
18use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
19use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
20use datatypes::arrow::record_batch::RecordBatch;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use snafu::ResultExt;
24use store_api::storage::ColumnId;
25use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
26
27use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
28
29/// Info about a tag column for TSID computation and sparse primary key encoding.
30#[allow(dead_code)]
31pub(crate) struct TagColumnInfo {
32    /// Column name (used for label-name hash).
33    pub name: String,
34    /// Column index in the RecordBatch.
35    pub index: usize,
36    /// Column ID in the physical region.
37    pub column_id: ColumnId,
38}
39
40/// Computes `__tsid` values for each row.
41#[allow(dead_code)]
42pub(crate) fn compute_tsid_array(
43    batch: &RecordBatch,
44    sorted_tag_columns: &[TagColumnInfo],
45    tag_arrays: &[&StringArray],
46) -> UInt64Array {
47    let num_rows = batch.num_rows();
48
49    let label_name_hash = {
50        let mut hasher = FxHasher::default();
51        for tag_col in sorted_tag_columns {
52            hasher.write(tag_col.name.as_bytes());
53            hasher.write_u8(0xff);
54        }
55        hasher.finish()
56    };
57
58    let mut tsid_values = Vec::with_capacity(num_rows);
59    for row in 0..num_rows {
60        let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
61
62        let tsid = if !has_null {
63            let mut hasher = FxHasher::default();
64            hasher.write_u64(label_name_hash);
65            for arr in tag_arrays {
66                hasher.write(arr.value(row).as_bytes());
67                hasher.write_u8(0xff);
68            }
69            hasher.finish()
70        } else {
71            let mut name_hasher = FxHasher::default();
72            for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
73                if !arr.is_null(row) {
74                    name_hasher.write(tc.name.as_bytes());
75                    name_hasher.write_u8(0xff);
76                }
77            }
78            let row_label_hash = name_hasher.finish();
79
80            let mut val_hasher = FxHasher::default();
81            val_hasher.write_u64(row_label_hash);
82            for arr in tag_arrays {
83                if !arr.is_null(row) {
84                    val_hasher.write(arr.value(row).as_bytes());
85                    val_hasher.write_u8(0xff);
86                }
87            }
88            val_hasher.finish()
89        };
90
91        tsid_values.push(tsid);
92    }
93
94    UInt64Array::from(tsid_values)
95}
96
97fn build_tag_arrays<'a>(
98    batch: &'a RecordBatch,
99    sorted_tag_columns: &[TagColumnInfo],
100) -> Vec<&'a StringArray> {
101    sorted_tag_columns
102        .iter()
103        .map(|tc| {
104            batch
105                .column(tc.index)
106                .as_any()
107                .downcast_ref::<StringArray>()
108                .expect("tag column must be utf8")
109        })
110        .collect()
111}
112
113/// Modifies a RecordBatch for sparse primary key encoding.
114pub(crate) fn modify_batch_sparse(
115    batch: RecordBatch,
116    table_id: u32,
117    sorted_tag_columns: &[TagColumnInfo],
118    non_tag_column_indices: &[usize],
119) -> Result<RecordBatch> {
120    let num_rows = batch.num_rows();
121    let codec = SparsePrimaryKeyCodec::schemaless();
122    let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
123    let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
124
125    let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
126    let mut buffer = Vec::new();
127    for row in 0..num_rows {
128        buffer.clear();
129        codec
130            .encode_internal(table_id, tsid_array.value(row), &mut buffer)
131            .context(EncodePrimaryKeySnafu)?;
132
133        let tags = sorted_tag_columns
134            .iter()
135            .zip(tag_arrays.iter())
136            .filter(|(_, arr)| !arr.is_null(row))
137            .map(|(tc, arr)| (tc.column_id, arr.value(row).as_bytes()));
138        codec
139            .encode_raw_tag_value(tags, &mut buffer)
140            .context(EncodePrimaryKeySnafu)?;
141
142        pk_builder.append_value(&buffer);
143    }
144
145    let pk_array = pk_builder.finish();
146
147    let mut fields = vec![Arc::new(Field::new(
148        PRIMARY_KEY_COLUMN_NAME,
149        DataType::Binary,
150        false,
151    ))];
152    let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
153
154    for &idx in non_tag_column_indices {
155        fields.push(batch.schema().fields()[idx].clone());
156        columns.push(batch.column(idx).clone());
157    }
158
159    let new_schema = Arc::new(ArrowSchema::new(fields));
160    RecordBatch::try_new(new_schema, columns).map_err(|e| {
161        UnexpectedRequestSnafu {
162            reason: format!("Failed to build modified sparse RecordBatch: {e}"),
163        }
164        .build()
165    })
166}
167
168#[cfg(test)]
169mod tests {
170    use std::collections::HashMap;
171    use std::sync::Arc;
172
173    use api::v1::value::ValueData;
174    use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
175    use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
176    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
177    use datatypes::arrow::record_batch::RecordBatch;
178    use store_api::codec::PrimaryKeyEncoding;
179    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
180
181    use super::*;
182    use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
183
184    fn build_sparse_test_batch() -> RecordBatch {
185        let schema = Arc::new(ArrowSchema::new(vec![
186            Field::new("greptime_timestamp", DataType::Int64, false),
187            Field::new("greptime_value", DataType::Float64, true),
188            Field::new("namespace", DataType::Utf8, true),
189            Field::new("host", DataType::Utf8, true),
190        ]));
191        RecordBatch::try_new(
192            schema,
193            vec![
194                Arc::new(Int64Array::from(vec![1000])),
195                Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
196                Arc::new(StringArray::from(vec!["greptimedb"])),
197                Arc::new(StringArray::from(vec!["127.0.0.1"])),
198            ],
199        )
200        .unwrap()
201    }
202
203    fn sparse_tag_columns() -> Vec<TagColumnInfo> {
204        vec![
205            TagColumnInfo {
206                name: "host".to_string(),
207                index: 3,
208                column_id: 3,
209            },
210            TagColumnInfo {
211                name: "namespace".to_string(),
212                index: 2,
213                column_id: 2,
214            },
215        ]
216    }
217
218    #[test]
219    fn test_compute_tsid_basic() {
220        let schema = Arc::new(ArrowSchema::new(vec![
221            Field::new("namespace", DataType::Utf8, true),
222            Field::new("host", DataType::Utf8, true),
223        ]));
224        let batch = RecordBatch::try_new(
225            schema,
226            vec![
227                Arc::new(StringArray::from(vec!["greptimedb"])),
228                Arc::new(StringArray::from(vec!["127.0.0.1"])),
229            ],
230        )
231        .unwrap();
232
233        let tag_columns: Vec<TagColumnInfo> = vec![
234            TagColumnInfo {
235                name: "host".to_string(),
236                index: 1,
237                column_id: 2,
238            },
239            TagColumnInfo {
240                name: "namespace".to_string(),
241                index: 0,
242                column_id: 1,
243            },
244        ];
245        let tag_arrays = build_tag_arrays(&batch, &tag_columns);
246        let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
247
248        assert_eq!(tsid_array.value(0), 2721566936019240841);
249    }
250
251    #[test]
252    fn test_compute_tsid_with_nulls() {
253        let schema = Arc::new(ArrowSchema::new(vec![
254            Field::new("a", DataType::Utf8, true),
255            Field::new("b", DataType::Utf8, true),
256        ]));
257        let batch_no_null = RecordBatch::try_new(
258            schema.clone(),
259            vec![
260                Arc::new(StringArray::from(vec!["A"])),
261                Arc::new(StringArray::from(vec!["B"])),
262            ],
263        )
264        .unwrap();
265        let tag_cols_2: Vec<TagColumnInfo> = vec![
266            TagColumnInfo {
267                name: "a".to_string(),
268                index: 0,
269                column_id: 1,
270            },
271            TagColumnInfo {
272                name: "b".to_string(),
273                index: 1,
274                column_id: 2,
275            },
276        ];
277        let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
278        let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
279
280        let schema3 = Arc::new(ArrowSchema::new(vec![
281            Field::new("a", DataType::Utf8, true),
282            Field::new("b", DataType::Utf8, true),
283            Field::new("c", DataType::Utf8, true),
284        ]));
285        let batch_with_null = RecordBatch::try_new(
286            schema3,
287            vec![
288                Arc::new(StringArray::from(vec!["A"])),
289                Arc::new(StringArray::from(vec!["B"])),
290                Arc::new(StringArray::from(vec![None as Option<&str>])),
291            ],
292        )
293        .unwrap();
294        let tag_cols_3: Vec<TagColumnInfo> = vec![
295            TagColumnInfo {
296                name: "a".to_string(),
297                index: 0,
298                column_id: 1,
299            },
300            TagColumnInfo {
301                name: "b".to_string(),
302                index: 1,
303                column_id: 2,
304            },
305            TagColumnInfo {
306                name: "c".to_string(),
307                index: 2,
308                column_id: 3,
309            },
310        ];
311        let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
312        let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
313
314        assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
315    }
316
317    #[test]
318    fn test_modify_batch_sparse() {
319        let batch = build_sparse_test_batch();
320        let tag_columns = sparse_tag_columns();
321        let non_tag_indices = vec![0, 1];
322        let table_id: u32 = 1025;
323
324        let modified =
325            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
326
327        assert_eq!(modified.num_columns(), 3);
328        assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
329        assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
330        assert_eq!(modified.schema().field(2).name(), "greptime_value");
331    }
332
333    #[test]
334    fn test_modify_batch_sparse_matches_row_modifier() {
335        let batch = build_sparse_test_batch();
336        let tag_columns = sparse_tag_columns();
337        let non_tag_indices = vec![0, 1];
338        let table_id: u32 = 1025;
339        let modified =
340            modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
341
342        let name_to_column_id: HashMap<String, ColumnId> = [
343            ("greptime_timestamp".to_string(), 0),
344            ("greptime_value".to_string(), 1),
345            ("namespace".to_string(), 2),
346            ("host".to_string(), 3),
347        ]
348        .into_iter()
349        .collect();
350
351        let rows = Rows {
352            schema: vec![
353                ColumnSchema {
354                    column_name: "greptime_timestamp".to_string(),
355                    datatype: ColumnDataType::TimestampMillisecond as i32,
356                    semantic_type: SemanticType::Timestamp as i32,
357                    ..Default::default()
358                },
359                ColumnSchema {
360                    column_name: "greptime_value".to_string(),
361                    datatype: ColumnDataType::Float64 as i32,
362                    semantic_type: SemanticType::Field as i32,
363                    ..Default::default()
364                },
365                ColumnSchema {
366                    column_name: "namespace".to_string(),
367                    datatype: ColumnDataType::String as i32,
368                    semantic_type: SemanticType::Tag as i32,
369                    ..Default::default()
370                },
371                ColumnSchema {
372                    column_name: "host".to_string(),
373                    datatype: ColumnDataType::String as i32,
374                    semantic_type: SemanticType::Tag as i32,
375                    ..Default::default()
376                },
377            ],
378            rows: vec![Row {
379                values: vec![
380                    Value {
381                        value_data: Some(ValueData::TimestampMillisecondValue(1000)),
382                    },
383                    Value {
384                        value_data: Some(ValueData::F64Value(42.0)),
385                    },
386                    Value {
387                        value_data: Some(ValueData::StringValue("greptimedb".to_string())),
388                    },
389                    Value {
390                        value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
391                    },
392                ],
393            }],
394        };
395
396        let row_iter = RowsIter::new(rows, &name_to_column_id);
397        let rows = RowModifier::default()
398            .modify_rows(
399                row_iter,
400                TableIdInput::Single(table_id),
401                PrimaryKeyEncoding::Sparse,
402            )
403            .unwrap();
404        let ValueData::BinaryValue(expected_pk) =
405            rows.rows[0].values[0].value_data.clone().unwrap()
406        else {
407            panic!("expected binary primary key");
408        };
409
410        let actual_array = modified
411            .column(0)
412            .as_any()
413            .downcast_ref::<BinaryArray>()
414            .unwrap();
415        assert_eq!(actual_array.value(0), expected_pk.as_slice());
416    }
417}