1use std::hash::Hasher;
16use std::sync::Arc;
17
18use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
19use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
20use datatypes::arrow::record_batch::RecordBatch;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use snafu::ResultExt;
24use store_api::storage::ColumnId;
25use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
26
27use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
28
29#[allow(dead_code)]
31pub struct TagColumnInfo {
32 pub name: String,
34 pub index: usize,
36 pub column_id: ColumnId,
38}
39
40pub fn compute_tsid_array(
50 batch: &RecordBatch,
51 sorted_tag_columns: &[TagColumnInfo],
52 tag_arrays: &[&StringArray],
53) -> UInt64Array {
54 let num_rows = batch.num_rows();
55
56 let label_name_hash = {
57 let mut hasher = FxHasher::default();
58 for tag_col in sorted_tag_columns {
59 hasher.write(tag_col.name.as_bytes());
60 hasher.write_u8(0xff);
61 }
62 hasher.finish()
63 };
64
65 let mut tsid_values = Vec::with_capacity(num_rows);
66 for row in 0..num_rows {
67 let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
68
69 let tsid = if !has_null {
70 let mut hasher = FxHasher::default();
71 hasher.write_u64(label_name_hash);
72 for arr in tag_arrays {
73 hasher.write(arr.value(row).as_bytes());
74 hasher.write_u8(0xff);
75 }
76 hasher.finish()
77 } else {
78 let mut name_hasher = FxHasher::default();
79 for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
80 if !arr.is_null(row) {
81 name_hasher.write(tc.name.as_bytes());
82 name_hasher.write_u8(0xff);
83 }
84 }
85 let row_label_hash = name_hasher.finish();
86
87 let mut val_hasher = FxHasher::default();
88 val_hasher.write_u64(row_label_hash);
89 for arr in tag_arrays {
90 if !arr.is_null(row) {
91 val_hasher.write(arr.value(row).as_bytes());
92 val_hasher.write_u8(0xff);
93 }
94 }
95 val_hasher.finish()
96 };
97
98 tsid_values.push(tsid);
99 }
100
101 UInt64Array::from(tsid_values)
102}
103
104fn build_tag_arrays<'a>(
105 batch: &'a RecordBatch,
106 sorted_tag_columns: &[TagColumnInfo],
107) -> Vec<&'a StringArray> {
108 sorted_tag_columns
109 .iter()
110 .map(|tc| {
111 batch
112 .column(tc.index)
113 .as_any()
114 .downcast_ref::<StringArray>()
115 .expect("tag column must be utf8")
116 })
117 .collect()
118}
119
120pub fn modify_batch_sparse(
140 batch: RecordBatch,
141 table_id: u32,
142 sorted_tag_columns: &[TagColumnInfo],
143 extra_column_indices: &[usize],
144) -> Result<RecordBatch> {
145 let num_rows = batch.num_rows();
146 let codec = SparsePrimaryKeyCodec::schemaless();
147 let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
148 let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
149
150 let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
151 let mut buffer = Vec::new();
152 for row in 0..num_rows {
153 buffer.clear();
154 codec
155 .encode_internal(table_id, tsid_array.value(row), &mut buffer)
156 .context(EncodePrimaryKeySnafu)?;
157
158 let tags = sorted_tag_columns
159 .iter()
160 .zip(tag_arrays.iter())
161 .filter(|(_, arr)| !arr.is_null(row))
162 .map(|(tc, arr)| (tc.column_id, arr.value(row).as_bytes()));
163 codec
164 .encode_raw_tag_value(tags, &mut buffer)
165 .context(EncodePrimaryKeySnafu)?;
166
167 pk_builder.append_value(&buffer);
168 }
169
170 let pk_array = pk_builder.finish();
171
172 let mut fields = vec![Arc::new(Field::new(
173 PRIMARY_KEY_COLUMN_NAME,
174 DataType::Binary,
175 false,
176 ))];
177 let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
178
179 for &idx in extra_column_indices {
180 fields.push(batch.schema().fields()[idx].clone());
181 columns.push(batch.column(idx).clone());
182 }
183
184 let new_schema = Arc::new(ArrowSchema::new(fields));
185 RecordBatch::try_new(new_schema, columns).map_err(|e| {
186 UnexpectedRequestSnafu {
187 reason: format!("Failed to build modified sparse RecordBatch: {e}"),
188 }
189 .build()
190 })
191}
192
193#[cfg(test)]
194mod tests {
195 use std::collections::HashMap;
196 use std::sync::Arc;
197
198 use api::v1::value::ValueData;
199 use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
200 use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
201 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
202 use datatypes::arrow::record_batch::RecordBatch;
203 use store_api::codec::PrimaryKeyEncoding;
204 use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
205
206 use super::*;
207 use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
208
209 fn build_sparse_test_batch() -> RecordBatch {
210 let schema = Arc::new(ArrowSchema::new(vec![
211 Field::new("greptime_timestamp", DataType::Int64, false),
212 Field::new("greptime_value", DataType::Float64, true),
213 Field::new("namespace", DataType::Utf8, true),
214 Field::new("host", DataType::Utf8, true),
215 ]));
216 RecordBatch::try_new(
217 schema,
218 vec![
219 Arc::new(Int64Array::from(vec![1000])),
220 Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
221 Arc::new(StringArray::from(vec!["greptimedb"])),
222 Arc::new(StringArray::from(vec!["127.0.0.1"])),
223 ],
224 )
225 .unwrap()
226 }
227
228 fn sparse_tag_columns() -> Vec<TagColumnInfo> {
229 vec![
230 TagColumnInfo {
231 name: "host".to_string(),
232 index: 3,
233 column_id: 3,
234 },
235 TagColumnInfo {
236 name: "namespace".to_string(),
237 index: 2,
238 column_id: 2,
239 },
240 ]
241 }
242
243 #[test]
244 fn test_compute_tsid_basic() {
245 let schema = Arc::new(ArrowSchema::new(vec![
246 Field::new("namespace", DataType::Utf8, true),
247 Field::new("host", DataType::Utf8, true),
248 ]));
249 let batch = RecordBatch::try_new(
250 schema,
251 vec![
252 Arc::new(StringArray::from(vec!["greptimedb"])),
253 Arc::new(StringArray::from(vec!["127.0.0.1"])),
254 ],
255 )
256 .unwrap();
257
258 let tag_columns: Vec<TagColumnInfo> = vec![
259 TagColumnInfo {
260 name: "host".to_string(),
261 index: 1,
262 column_id: 2,
263 },
264 TagColumnInfo {
265 name: "namespace".to_string(),
266 index: 0,
267 column_id: 1,
268 },
269 ];
270 let tag_arrays = build_tag_arrays(&batch, &tag_columns);
271 let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
272
273 assert_eq!(tsid_array.value(0), 2721566936019240841);
274 }
275
276 #[test]
277 fn test_compute_tsid_with_nulls() {
278 let schema = Arc::new(ArrowSchema::new(vec![
279 Field::new("a", DataType::Utf8, true),
280 Field::new("b", DataType::Utf8, true),
281 ]));
282 let batch_no_null = RecordBatch::try_new(
283 schema.clone(),
284 vec![
285 Arc::new(StringArray::from(vec!["A"])),
286 Arc::new(StringArray::from(vec!["B"])),
287 ],
288 )
289 .unwrap();
290 let tag_cols_2: Vec<TagColumnInfo> = vec![
291 TagColumnInfo {
292 name: "a".to_string(),
293 index: 0,
294 column_id: 1,
295 },
296 TagColumnInfo {
297 name: "b".to_string(),
298 index: 1,
299 column_id: 2,
300 },
301 ];
302 let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
303 let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
304
305 let schema3 = Arc::new(ArrowSchema::new(vec![
306 Field::new("a", DataType::Utf8, true),
307 Field::new("b", DataType::Utf8, true),
308 Field::new("c", DataType::Utf8, true),
309 ]));
310 let batch_with_null = RecordBatch::try_new(
311 schema3,
312 vec![
313 Arc::new(StringArray::from(vec!["A"])),
314 Arc::new(StringArray::from(vec!["B"])),
315 Arc::new(StringArray::from(vec![None as Option<&str>])),
316 ],
317 )
318 .unwrap();
319 let tag_cols_3: Vec<TagColumnInfo> = vec![
320 TagColumnInfo {
321 name: "a".to_string(),
322 index: 0,
323 column_id: 1,
324 },
325 TagColumnInfo {
326 name: "b".to_string(),
327 index: 1,
328 column_id: 2,
329 },
330 TagColumnInfo {
331 name: "c".to_string(),
332 index: 2,
333 column_id: 3,
334 },
335 ];
336 let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
337 let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
338
339 assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
340 }
341
342 #[test]
343 fn test_modify_batch_sparse() {
344 let batch = build_sparse_test_batch();
345 let tag_columns = sparse_tag_columns();
346 let non_tag_indices = vec![0, 1];
347 let table_id: u32 = 1025;
348
349 let modified =
350 modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
351
352 assert_eq!(modified.num_columns(), 3);
353 assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
354 assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
355 assert_eq!(modified.schema().field(2).name(), "greptime_value");
356 }
357
358 #[test]
359 fn test_modify_batch_sparse_matches_row_modifier() {
360 let batch = build_sparse_test_batch();
361 let tag_columns = sparse_tag_columns();
362 let non_tag_indices = vec![0, 1];
363 let table_id: u32 = 1025;
364 let modified =
365 modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
366
367 let name_to_column_id: HashMap<String, ColumnId> = [
368 ("greptime_timestamp".to_string(), 0),
369 ("greptime_value".to_string(), 1),
370 ("namespace".to_string(), 2),
371 ("host".to_string(), 3),
372 ]
373 .into_iter()
374 .collect();
375
376 let rows = Rows {
377 schema: vec![
378 ColumnSchema {
379 column_name: "greptime_timestamp".to_string(),
380 datatype: ColumnDataType::TimestampMillisecond as i32,
381 semantic_type: SemanticType::Timestamp as i32,
382 ..Default::default()
383 },
384 ColumnSchema {
385 column_name: "greptime_value".to_string(),
386 datatype: ColumnDataType::Float64 as i32,
387 semantic_type: SemanticType::Field as i32,
388 ..Default::default()
389 },
390 ColumnSchema {
391 column_name: "namespace".to_string(),
392 datatype: ColumnDataType::String as i32,
393 semantic_type: SemanticType::Tag as i32,
394 ..Default::default()
395 },
396 ColumnSchema {
397 column_name: "host".to_string(),
398 datatype: ColumnDataType::String as i32,
399 semantic_type: SemanticType::Tag as i32,
400 ..Default::default()
401 },
402 ],
403 rows: vec![Row {
404 values: vec![
405 Value {
406 value_data: Some(ValueData::TimestampMillisecondValue(1000)),
407 },
408 Value {
409 value_data: Some(ValueData::F64Value(42.0)),
410 },
411 Value {
412 value_data: Some(ValueData::StringValue("greptimedb".to_string())),
413 },
414 Value {
415 value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
416 },
417 ],
418 }],
419 };
420
421 let row_iter = RowsIter::new(rows, &name_to_column_id);
422 let rows = RowModifier::default()
423 .modify_rows(
424 row_iter,
425 TableIdInput::Single(table_id),
426 PrimaryKeyEncoding::Sparse,
427 )
428 .unwrap();
429 let ValueData::BinaryValue(expected_pk) =
430 rows.rows[0].values[0].value_data.clone().unwrap()
431 else {
432 panic!("expected binary primary key");
433 };
434
435 let actual_array = modified
436 .column(0)
437 .as_any()
438 .downcast_ref::<BinaryArray>()
439 .unwrap();
440 assert_eq!(actual_array.value(0), expected_pk.as_slice());
441 }
442}