1use std::hash::Hasher;
16use std::sync::Arc;
17
18use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
19use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
20use datatypes::arrow::record_batch::RecordBatch;
21use fxhash::FxHasher;
22use mito_codec::row_converter::SparsePrimaryKeyCodec;
23use snafu::ResultExt;
24use store_api::storage::ColumnId;
25use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
26
27use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
28
29#[allow(dead_code)]
31pub(crate) struct TagColumnInfo {
32 pub name: String,
34 pub index: usize,
36 pub column_id: ColumnId,
38}
39
40#[allow(dead_code)]
42pub(crate) fn compute_tsid_array(
43 batch: &RecordBatch,
44 sorted_tag_columns: &[TagColumnInfo],
45 tag_arrays: &[&StringArray],
46) -> UInt64Array {
47 let num_rows = batch.num_rows();
48
49 let label_name_hash = {
50 let mut hasher = FxHasher::default();
51 for tag_col in sorted_tag_columns {
52 hasher.write(tag_col.name.as_bytes());
53 hasher.write_u8(0xff);
54 }
55 hasher.finish()
56 };
57
58 let mut tsid_values = Vec::with_capacity(num_rows);
59 for row in 0..num_rows {
60 let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
61
62 let tsid = if !has_null {
63 let mut hasher = FxHasher::default();
64 hasher.write_u64(label_name_hash);
65 for arr in tag_arrays {
66 hasher.write(arr.value(row).as_bytes());
67 hasher.write_u8(0xff);
68 }
69 hasher.finish()
70 } else {
71 let mut name_hasher = FxHasher::default();
72 for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
73 if !arr.is_null(row) {
74 name_hasher.write(tc.name.as_bytes());
75 name_hasher.write_u8(0xff);
76 }
77 }
78 let row_label_hash = name_hasher.finish();
79
80 let mut val_hasher = FxHasher::default();
81 val_hasher.write_u64(row_label_hash);
82 for arr in tag_arrays {
83 if !arr.is_null(row) {
84 val_hasher.write(arr.value(row).as_bytes());
85 val_hasher.write_u8(0xff);
86 }
87 }
88 val_hasher.finish()
89 };
90
91 tsid_values.push(tsid);
92 }
93
94 UInt64Array::from(tsid_values)
95}
96
97fn build_tag_arrays<'a>(
98 batch: &'a RecordBatch,
99 sorted_tag_columns: &[TagColumnInfo],
100) -> Vec<&'a StringArray> {
101 sorted_tag_columns
102 .iter()
103 .map(|tc| {
104 batch
105 .column(tc.index)
106 .as_any()
107 .downcast_ref::<StringArray>()
108 .expect("tag column must be utf8")
109 })
110 .collect()
111}
112
113pub(crate) fn modify_batch_sparse(
115 batch: RecordBatch,
116 table_id: u32,
117 sorted_tag_columns: &[TagColumnInfo],
118 non_tag_column_indices: &[usize],
119) -> Result<RecordBatch> {
120 let num_rows = batch.num_rows();
121 let codec = SparsePrimaryKeyCodec::schemaless();
122 let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
123 let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
124
125 let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
126 let mut buffer = Vec::new();
127 for row in 0..num_rows {
128 buffer.clear();
129 codec
130 .encode_internal(table_id, tsid_array.value(row), &mut buffer)
131 .context(EncodePrimaryKeySnafu)?;
132
133 let tags = sorted_tag_columns
134 .iter()
135 .zip(tag_arrays.iter())
136 .filter(|(_, arr)| !arr.is_null(row))
137 .map(|(tc, arr)| (tc.column_id, arr.value(row).as_bytes()));
138 codec
139 .encode_raw_tag_value(tags, &mut buffer)
140 .context(EncodePrimaryKeySnafu)?;
141
142 pk_builder.append_value(&buffer);
143 }
144
145 let pk_array = pk_builder.finish();
146
147 let mut fields = vec![Arc::new(Field::new(
148 PRIMARY_KEY_COLUMN_NAME,
149 DataType::Binary,
150 false,
151 ))];
152 let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
153
154 for &idx in non_tag_column_indices {
155 fields.push(batch.schema().fields()[idx].clone());
156 columns.push(batch.column(idx).clone());
157 }
158
159 let new_schema = Arc::new(ArrowSchema::new(fields));
160 RecordBatch::try_new(new_schema, columns).map_err(|e| {
161 UnexpectedRequestSnafu {
162 reason: format!("Failed to build modified sparse RecordBatch: {e}"),
163 }
164 .build()
165 })
166}
167
168#[cfg(test)]
169mod tests {
170 use std::collections::HashMap;
171 use std::sync::Arc;
172
173 use api::v1::value::ValueData;
174 use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
175 use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
176 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
177 use datatypes::arrow::record_batch::RecordBatch;
178 use store_api::codec::PrimaryKeyEncoding;
179 use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
180
181 use super::*;
182 use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
183
184 fn build_sparse_test_batch() -> RecordBatch {
185 let schema = Arc::new(ArrowSchema::new(vec![
186 Field::new("greptime_timestamp", DataType::Int64, false),
187 Field::new("greptime_value", DataType::Float64, true),
188 Field::new("namespace", DataType::Utf8, true),
189 Field::new("host", DataType::Utf8, true),
190 ]));
191 RecordBatch::try_new(
192 schema,
193 vec![
194 Arc::new(Int64Array::from(vec![1000])),
195 Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
196 Arc::new(StringArray::from(vec!["greptimedb"])),
197 Arc::new(StringArray::from(vec!["127.0.0.1"])),
198 ],
199 )
200 .unwrap()
201 }
202
203 fn sparse_tag_columns() -> Vec<TagColumnInfo> {
204 vec![
205 TagColumnInfo {
206 name: "host".to_string(),
207 index: 3,
208 column_id: 3,
209 },
210 TagColumnInfo {
211 name: "namespace".to_string(),
212 index: 2,
213 column_id: 2,
214 },
215 ]
216 }
217
218 #[test]
219 fn test_compute_tsid_basic() {
220 let schema = Arc::new(ArrowSchema::new(vec![
221 Field::new("namespace", DataType::Utf8, true),
222 Field::new("host", DataType::Utf8, true),
223 ]));
224 let batch = RecordBatch::try_new(
225 schema,
226 vec![
227 Arc::new(StringArray::from(vec!["greptimedb"])),
228 Arc::new(StringArray::from(vec!["127.0.0.1"])),
229 ],
230 )
231 .unwrap();
232
233 let tag_columns: Vec<TagColumnInfo> = vec![
234 TagColumnInfo {
235 name: "host".to_string(),
236 index: 1,
237 column_id: 2,
238 },
239 TagColumnInfo {
240 name: "namespace".to_string(),
241 index: 0,
242 column_id: 1,
243 },
244 ];
245 let tag_arrays = build_tag_arrays(&batch, &tag_columns);
246 let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
247
248 assert_eq!(tsid_array.value(0), 2721566936019240841);
249 }
250
251 #[test]
252 fn test_compute_tsid_with_nulls() {
253 let schema = Arc::new(ArrowSchema::new(vec![
254 Field::new("a", DataType::Utf8, true),
255 Field::new("b", DataType::Utf8, true),
256 ]));
257 let batch_no_null = RecordBatch::try_new(
258 schema.clone(),
259 vec![
260 Arc::new(StringArray::from(vec!["A"])),
261 Arc::new(StringArray::from(vec!["B"])),
262 ],
263 )
264 .unwrap();
265 let tag_cols_2: Vec<TagColumnInfo> = vec![
266 TagColumnInfo {
267 name: "a".to_string(),
268 index: 0,
269 column_id: 1,
270 },
271 TagColumnInfo {
272 name: "b".to_string(),
273 index: 1,
274 column_id: 2,
275 },
276 ];
277 let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
278 let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
279
280 let schema3 = Arc::new(ArrowSchema::new(vec![
281 Field::new("a", DataType::Utf8, true),
282 Field::new("b", DataType::Utf8, true),
283 Field::new("c", DataType::Utf8, true),
284 ]));
285 let batch_with_null = RecordBatch::try_new(
286 schema3,
287 vec![
288 Arc::new(StringArray::from(vec!["A"])),
289 Arc::new(StringArray::from(vec!["B"])),
290 Arc::new(StringArray::from(vec![None as Option<&str>])),
291 ],
292 )
293 .unwrap();
294 let tag_cols_3: Vec<TagColumnInfo> = vec![
295 TagColumnInfo {
296 name: "a".to_string(),
297 index: 0,
298 column_id: 1,
299 },
300 TagColumnInfo {
301 name: "b".to_string(),
302 index: 1,
303 column_id: 2,
304 },
305 TagColumnInfo {
306 name: "c".to_string(),
307 index: 2,
308 column_id: 3,
309 },
310 ];
311 let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
312 let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
313
314 assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
315 }
316
317 #[test]
318 fn test_modify_batch_sparse() {
319 let batch = build_sparse_test_batch();
320 let tag_columns = sparse_tag_columns();
321 let non_tag_indices = vec![0, 1];
322 let table_id: u32 = 1025;
323
324 let modified =
325 modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
326
327 assert_eq!(modified.num_columns(), 3);
328 assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
329 assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
330 assert_eq!(modified.schema().field(2).name(), "greptime_value");
331 }
332
333 #[test]
334 fn test_modify_batch_sparse_matches_row_modifier() {
335 let batch = build_sparse_test_batch();
336 let tag_columns = sparse_tag_columns();
337 let non_tag_indices = vec![0, 1];
338 let table_id: u32 = 1025;
339 let modified =
340 modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
341
342 let name_to_column_id: HashMap<String, ColumnId> = [
343 ("greptime_timestamp".to_string(), 0),
344 ("greptime_value".to_string(), 1),
345 ("namespace".to_string(), 2),
346 ("host".to_string(), 3),
347 ]
348 .into_iter()
349 .collect();
350
351 let rows = Rows {
352 schema: vec![
353 ColumnSchema {
354 column_name: "greptime_timestamp".to_string(),
355 datatype: ColumnDataType::TimestampMillisecond as i32,
356 semantic_type: SemanticType::Timestamp as i32,
357 ..Default::default()
358 },
359 ColumnSchema {
360 column_name: "greptime_value".to_string(),
361 datatype: ColumnDataType::Float64 as i32,
362 semantic_type: SemanticType::Field as i32,
363 ..Default::default()
364 },
365 ColumnSchema {
366 column_name: "namespace".to_string(),
367 datatype: ColumnDataType::String as i32,
368 semantic_type: SemanticType::Tag as i32,
369 ..Default::default()
370 },
371 ColumnSchema {
372 column_name: "host".to_string(),
373 datatype: ColumnDataType::String as i32,
374 semantic_type: SemanticType::Tag as i32,
375 ..Default::default()
376 },
377 ],
378 rows: vec![Row {
379 values: vec![
380 Value {
381 value_data: Some(ValueData::TimestampMillisecondValue(1000)),
382 },
383 Value {
384 value_data: Some(ValueData::F64Value(42.0)),
385 },
386 Value {
387 value_data: Some(ValueData::StringValue("greptimedb".to_string())),
388 },
389 Value {
390 value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
391 },
392 ],
393 }],
394 };
395
396 let row_iter = RowsIter::new(rows, &name_to_column_id);
397 let rows = RowModifier::default()
398 .modify_rows(
399 row_iter,
400 TableIdInput::Single(table_id),
401 PrimaryKeyEncoding::Sparse,
402 )
403 .unwrap();
404 let ValueData::BinaryValue(expected_pk) =
405 rows.rows[0].values[0].value_data.clone().unwrap()
406 else {
407 panic!("expected binary primary key");
408 };
409
410 let actual_array = modified
411 .column(0)
412 .as_any()
413 .downcast_ref::<BinaryArray>()
414 .unwrap();
415 assert_eq!(actual_array.value(0), expected_pk.as_slice());
416 }
417}