1use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{COLUMN_ID_ENCODE_SIZE, SortField};
26
27#[derive(Debug)]
29pub struct KeyValues {
30 pub mutation: Mutation,
35 helper: SparseReadRowHelper,
37 primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42 pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46 let rows = mutation.rows.as_ref()?;
47 let primary_key_encoding =
48 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51 Some(KeyValues {
52 mutation,
53 helper,
54 primary_key_encoding,
55 })
56 }
57
58 pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
60 let rows = self.mutation.rows.as_ref().unwrap();
61 let schema = &rows.schema;
62 rows.rows.iter().enumerate().map(|(idx, row)| {
63 KeyValue {
64 row,
65 schema,
66 helper: &self.helper,
67 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70 primary_key_encoding: self.primary_key_encoding,
71 }
72 })
73 }
74
75 pub fn num_rows(&self) -> usize {
77 self.mutation.rows.as_ref().unwrap().rows.len()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.mutation.rows.is_none()
84 }
85
86 pub fn max_sequence(&self) -> SequenceNumber {
90 let mut sequence = self.mutation.sequence;
91 let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92 sequence += num_rows;
93 if num_rows > 0 {
94 sequence -= 1;
95 }
96
97 sequence
98 }
99}
100
101#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104 mutation: &'a Mutation,
109 helper: SparseReadRowHelper,
111 primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116 pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120 let rows = mutation.rows.as_ref()?;
121 let primary_key_encoding =
122 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125 Some(KeyValuesRef {
126 mutation,
127 helper,
128 primary_key_encoding,
129 })
130 }
131
132 pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
134 let rows = self.mutation.rows.as_ref().unwrap();
135 let schema = &rows.schema;
136 rows.rows.iter().enumerate().map(|(idx, row)| {
137 KeyValue {
138 row,
139 schema,
140 helper: &self.helper,
141 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144 primary_key_encoding: self.primary_key_encoding,
145 }
146 })
147 }
148
149 pub fn num_rows(&self) -> usize {
151 self.mutation.rows.as_ref().unwrap().rows.len()
153 }
154}
155
156#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164 row: &'a Row,
165 schema: &'a Vec<ColumnSchema>,
166 helper: &'a SparseReadRowHelper,
167 sequence: SequenceNumber,
168 op_type: OpType,
169 primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175 self.primary_key_encoding
176 }
177
178 pub fn partition_key(&self) -> u32 {
180 if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182 let Some(primary_key) = self.primary_keys().next() else {
183 return 0;
184 };
185 let key = primary_key.try_into_binary().unwrap().unwrap();
186
187 let mut deserializer = Deserializer::new(key);
188 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189 let field = SortField::new(ConcreteDataType::uint32_datatype());
190 let table_id = field.deserialize(&mut deserializer).unwrap();
191 table_id.as_value_ref().try_into_u32().unwrap().unwrap()
192 } else {
193 let Some(value) = self.primary_keys().next() else {
194 return 0;
195 };
196
197 value.try_into_u32().unwrap().unwrap()
198 }
199 }
200
201 pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef<'_>> {
203 self.helper.indices[..self.helper.num_primary_key_column]
204 .iter()
205 .map(|idx| match idx {
206 Some(i) => api::helper::pb_value_to_value_ref(
207 &self.row.values[*i],
208 self.schema[*i].datatype_extension.as_ref(),
209 ),
210 None => ValueRef::Null,
211 })
212 }
213
214 pub fn fields(&self) -> impl Iterator<Item = ValueRef<'_>> {
216 self.helper.indices[self.helper.num_primary_key_column + 1..]
217 .iter()
218 .map(|idx| match idx {
219 Some(i) => api::helper::pb_value_to_value_ref(
220 &self.row.values[*i],
221 self.schema[*i].datatype_extension.as_ref(),
222 ),
223 None => ValueRef::Null,
224 })
225 }
226
227 pub fn timestamp(&self) -> ValueRef<'_> {
229 let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231 api::helper::pb_value_to_value_ref(
232 &self.row.values[index],
233 self.schema[index].datatype_extension.as_ref(),
234 )
235 }
236
237 pub fn num_primary_keys(&self) -> usize {
239 self.helper.num_primary_key_column
240 }
241
242 pub fn num_fields(&self) -> usize {
244 self.helper.indices.len() - self.helper.num_primary_key_column - 1
245 }
246
247 pub fn sequence(&self) -> SequenceNumber {
249 self.sequence
250 }
251
252 pub fn op_type(&self) -> OpType {
254 self.op_type
255 }
256}
257
258#[derive(Debug)]
260struct SparseReadRowHelper {
261 indices: Vec<Option<usize>>,
266 num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271 fn new(
276 metadata: &RegionMetadata,
277 rows: &Rows,
278 primary_key_encoding: PrimaryKeyEncoding,
279 ) -> SparseReadRowHelper {
280 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281 if rows.schema.len() == 3 {
285 let indices = rows
286 .schema
287 .iter()
288 .enumerate()
289 .map(|(index, _)| Some(index))
290 .collect();
291 return SparseReadRowHelper {
292 indices,
293 num_primary_key_column: 1,
294 };
295 };
296
297 let mut indices = Vec::with_capacity(rows.schema.len());
298 let name_to_index: HashMap<_, _> = rows
299 .schema
300 .iter()
301 .enumerate()
302 .map(|(index, col)| (&col.column_name, index))
303 .collect();
304 indices.extend(
305 rows.schema[0..2]
306 .iter()
307 .enumerate()
308 .map(|(index, _)| Some(index)),
309 );
310 for column in metadata.field_columns() {
312 let index = name_to_index.get(&column.column_schema.name);
314 indices.push(index.copied());
315 }
316 return SparseReadRowHelper {
317 indices,
318 num_primary_key_column: 1,
319 };
320 }
321 let name_to_index: HashMap<_, _> = rows
323 .schema
324 .iter()
325 .enumerate()
326 .map(|(index, col)| (&col.column_name, index))
327 .collect();
328 let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
329
330 for pk_column_id in &metadata.primary_key {
332 let column = metadata.column_by_id(*pk_column_id).unwrap();
334 let index = name_to_index.get(&column.column_schema.name);
335 indices.push(index.copied());
336 }
337 let ts_index = name_to_index
340 .get(&metadata.time_index_column().column_schema.name)
341 .unwrap();
342 indices.push(Some(*ts_index));
343
344 for column in metadata.field_columns() {
346 let index = name_to_index.get(&column.column_schema.name);
348 indices.push(index.copied());
349 }
350
351 SparseReadRowHelper {
352 indices,
353 num_primary_key_column: metadata.primary_key.len(),
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use api::v1::{self, ColumnDataType, SemanticType};
361
362 use super::*;
363 use crate::test_util::{TestRegionMetadataBuilder, i64_value};
364
365 const TS_NAME: &str = "ts";
366 const START_SEQ: SequenceNumber = 100;
367
368 fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
370 TestRegionMetadataBuilder::default()
371 .ts_name(TS_NAME)
372 .num_tags(num_tags)
373 .num_fields(num_fields)
374 .build()
375 }
376
377 fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
379 let mut rows = Vec::with_capacity(num_rows);
380 for _ in 0..num_rows {
381 let values: Vec<_> = (0..column_names.len())
384 .map(|idx| i64_value(idx as i64))
385 .collect();
386 rows.push(Row { values });
387 }
388
389 let schema = column_names
390 .iter()
391 .map(|column_name| {
392 let datatype = if *column_name == TS_NAME {
393 ColumnDataType::TimestampMillisecond as i32
394 } else {
395 ColumnDataType::Int64 as i32
396 };
397 let semantic_type = if column_name.starts_with('k') {
398 SemanticType::Tag as i32
399 } else if column_name.starts_with('v') {
400 SemanticType::Field as i32
401 } else {
402 SemanticType::Timestamp as i32
403 };
404 v1::ColumnSchema {
405 column_name: column_name.to_string(),
406 datatype,
407 semantic_type,
408 ..Default::default()
409 }
410 })
411 .collect();
412
413 Rows { rows, schema }
414 }
415
416 fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
417 let rows = new_rows(column_names, num_rows);
418 Mutation {
419 op_type: OpType::Put as i32,
420 sequence: START_SEQ,
421 rows: Some(rows),
422 write_hint: None,
423 }
424 }
425
426 fn check_key_values(
427 kvs: &KeyValues,
428 num_rows: usize,
429 keys: &[Option<i64>],
430 ts: i64,
431 values: &[Option<i64>],
432 ) {
433 assert_eq!(num_rows, kvs.num_rows());
434 let mut expect_seq = START_SEQ;
435 let expect_ts = ValueRef::Int64(ts);
436 for kv in kvs.iter() {
437 assert_eq!(expect_seq, kv.sequence());
438 expect_seq += 1;
439 assert_eq!(OpType::Put, kv.op_type);
440 assert_eq!(keys.len(), kv.num_primary_keys());
441 assert_eq!(values.len(), kv.num_fields());
442
443 assert_eq!(expect_ts, kv.timestamp());
444 let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
445 let actual_keys: Vec<_> = kv.primary_keys().collect();
446 assert_eq!(expect_keys, actual_keys);
447 let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
448 let actual_values: Vec<_> = kv.fields().collect();
449 assert_eq!(expect_values, actual_values);
450 }
451 }
452
453 #[test]
454 fn test_empty_key_values() {
455 let meta = new_region_metadata(1, 1);
456 let mutation = Mutation {
457 op_type: OpType::Put as i32,
458 sequence: 100,
459 rows: None,
460 write_hint: None,
461 };
462 let kvs = KeyValues::new(&meta, mutation);
463 assert!(kvs.is_none());
464 }
465
466 #[test]
467 fn test_ts_only() {
468 let meta = new_region_metadata(0, 0);
469 let mutation = new_mutation(&["ts"], 2);
470 let kvs = KeyValues::new(&meta, mutation).unwrap();
471 check_key_values(&kvs, 2, &[], 0, &[]);
472 }
473
474 #[test]
475 fn test_no_field() {
476 let meta = new_region_metadata(2, 0);
477 let mutation = new_mutation(&["k1", "ts", "k0"], 3);
480 let kvs = KeyValues::new(&meta, mutation).unwrap();
481 check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
485 }
486
487 #[test]
488 fn test_no_tag() {
489 let meta = new_region_metadata(0, 2);
490 let mutation = new_mutation(&["v1", "v0", "ts"], 3);
493 let kvs = KeyValues::new(&meta, mutation).unwrap();
494 check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
498 }
499
500 #[test]
501 fn test_tag_field() {
502 let meta = new_region_metadata(2, 2);
503 let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
506 let kvs = KeyValues::new(&meta, mutation).unwrap();
507 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
512 }
513
514 #[test]
515 fn test_sparse_field() {
516 let meta = new_region_metadata(2, 2);
517 let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
520 let kvs = KeyValues::new(&meta, mutation).unwrap();
521 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
526 }
527
528 #[test]
529 fn test_sparse_tag_field() {
530 let meta = new_region_metadata(2, 2);
531 let mutation = new_mutation(&["k0", "v0", "ts"], 3);
534 let kvs = KeyValues::new(&meta, mutation).unwrap();
535 check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
540 }
541}