1use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
26
27#[derive(Debug)]
29pub struct KeyValues {
30 pub(crate) mutation: Mutation,
35 helper: SparseReadRowHelper,
37 primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42 pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46 let rows = mutation.rows.as_ref()?;
47 let primary_key_encoding =
48 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51 Some(KeyValues {
52 mutation,
53 helper,
54 primary_key_encoding,
55 })
56 }
57
58 pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
60 let rows = self.mutation.rows.as_ref().unwrap();
61 let schema = &rows.schema;
62 rows.rows.iter().enumerate().map(|(idx, row)| {
63 KeyValue {
64 row,
65 schema,
66 helper: &self.helper,
67 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70 primary_key_encoding: self.primary_key_encoding,
71 }
72 })
73 }
74
75 pub fn num_rows(&self) -> usize {
77 self.mutation.rows.as_ref().unwrap().rows.len()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.mutation.rows.is_none()
84 }
85
86 pub fn max_sequence(&self) -> SequenceNumber {
90 let mut sequence = self.mutation.sequence;
91 let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92 sequence += num_rows;
93 if num_rows > 0 {
94 sequence -= 1;
95 }
96
97 sequence
98 }
99}
100
101#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104 mutation: &'a Mutation,
109 helper: SparseReadRowHelper,
111 primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116 pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120 let rows = mutation.rows.as_ref()?;
121 let primary_key_encoding =
122 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125 Some(KeyValuesRef {
126 mutation,
127 helper,
128 primary_key_encoding,
129 })
130 }
131
132 pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
134 let rows = self.mutation.rows.as_ref().unwrap();
135 let schema = &rows.schema;
136 rows.rows.iter().enumerate().map(|(idx, row)| {
137 KeyValue {
138 row,
139 schema,
140 helper: &self.helper,
141 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144 primary_key_encoding: self.primary_key_encoding,
145 }
146 })
147 }
148
149 pub fn num_rows(&self) -> usize {
151 self.mutation.rows.as_ref().unwrap().rows.len()
153 }
154}
155
156#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164 row: &'a Row,
165 schema: &'a Vec<ColumnSchema>,
166 helper: &'a SparseReadRowHelper,
167 sequence: SequenceNumber,
168 op_type: OpType,
169 primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175 self.primary_key_encoding
176 }
177
178 pub fn partition_key(&self) -> u32 {
180 if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182 let Some(primary_key) = self.primary_keys().next() else {
183 return 0;
184 };
185 let key = primary_key.as_binary().unwrap().unwrap();
186
187 let mut deserializer = Deserializer::new(key);
188 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189 let field = SortField::new(ConcreteDataType::uint32_datatype());
190 let table_id = field.deserialize(&mut deserializer).unwrap();
191 table_id.as_value_ref().as_u32().unwrap().unwrap()
192 } else {
193 let Some(value) = self.primary_keys().next() else {
194 return 0;
195 };
196
197 value.as_u32().unwrap().unwrap()
198 }
199 }
200
201 pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
203 self.helper.indices[..self.helper.num_primary_key_column]
204 .iter()
205 .map(|idx| match idx {
206 Some(i) => api::helper::pb_value_to_value_ref(
207 &self.row.values[*i],
208 &self.schema[*i].datatype_extension,
209 ),
210 None => ValueRef::Null,
211 })
212 }
213
214 pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
216 self.helper.indices[self.helper.num_primary_key_column + 1..]
217 .iter()
218 .map(|idx| match idx {
219 Some(i) => api::helper::pb_value_to_value_ref(
220 &self.row.values[*i],
221 &self.schema[*i].datatype_extension,
222 ),
223 None => ValueRef::Null,
224 })
225 }
226
227 pub fn timestamp(&self) -> ValueRef {
229 let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231 api::helper::pb_value_to_value_ref(
232 &self.row.values[index],
233 &self.schema[index].datatype_extension,
234 )
235 }
236
237 pub fn num_primary_keys(&self) -> usize {
239 self.helper.num_primary_key_column
240 }
241
242 pub fn num_fields(&self) -> usize {
244 self.helper.indices.len() - self.helper.num_primary_key_column - 1
245 }
246
247 pub fn sequence(&self) -> SequenceNumber {
249 self.sequence
250 }
251
252 pub fn op_type(&self) -> OpType {
254 self.op_type
255 }
256}
257
258#[derive(Debug)]
260struct SparseReadRowHelper {
261 indices: Vec<Option<usize>>,
266 num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271 fn new(
276 metadata: &RegionMetadata,
277 rows: &Rows,
278 primary_key_encoding: PrimaryKeyEncoding,
279 ) -> SparseReadRowHelper {
280 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281 let indices = rows
284 .schema
285 .iter()
286 .enumerate()
287 .map(|(index, _)| Some(index))
288 .collect();
289 return SparseReadRowHelper {
290 indices,
291 num_primary_key_column: 1,
292 };
293 }
294 let name_to_index: HashMap<_, _> = rows
296 .schema
297 .iter()
298 .enumerate()
299 .map(|(index, col)| (&col.column_name, index))
300 .collect();
301 let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
302
303 for pk_column_id in &metadata.primary_key {
305 let column = metadata.column_by_id(*pk_column_id).unwrap();
307 let index = name_to_index.get(&column.column_schema.name);
308 indices.push(index.copied());
309 }
310 let ts_index = name_to_index
313 .get(&metadata.time_index_column().column_schema.name)
314 .unwrap();
315 indices.push(Some(*ts_index));
316
317 for column in metadata.field_columns() {
319 let index = name_to_index.get(&column.column_schema.name);
321 indices.push(index.copied());
322 }
323
324 SparseReadRowHelper {
325 indices,
326 num_primary_key_column: metadata.primary_key.len(),
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use api::v1::{self, ColumnDataType, SemanticType};
334
335 use super::*;
336 use crate::test_util::i64_value;
337 use crate::test_util::meta_util::TestRegionMetadataBuilder;
338
339 const TS_NAME: &str = "ts";
340 const START_SEQ: SequenceNumber = 100;
341
342 fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
344 TestRegionMetadataBuilder::default()
345 .ts_name(TS_NAME)
346 .num_tags(num_tags)
347 .num_fields(num_fields)
348 .build()
349 }
350
351 fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
353 let mut rows = Vec::with_capacity(num_rows);
354 for _ in 0..num_rows {
355 let values: Vec<_> = (0..column_names.len())
358 .map(|idx| i64_value(idx as i64))
359 .collect();
360 rows.push(Row { values });
361 }
362
363 let schema = column_names
364 .iter()
365 .map(|column_name| {
366 let datatype = if *column_name == TS_NAME {
367 ColumnDataType::TimestampMillisecond as i32
368 } else {
369 ColumnDataType::Int64 as i32
370 };
371 let semantic_type = if column_name.starts_with('k') {
372 SemanticType::Tag as i32
373 } else if column_name.starts_with('v') {
374 SemanticType::Field as i32
375 } else {
376 SemanticType::Timestamp as i32
377 };
378 v1::ColumnSchema {
379 column_name: column_name.to_string(),
380 datatype,
381 semantic_type,
382 ..Default::default()
383 }
384 })
385 .collect();
386
387 Rows { rows, schema }
388 }
389
390 fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
391 let rows = new_rows(column_names, num_rows);
392 Mutation {
393 op_type: OpType::Put as i32,
394 sequence: START_SEQ,
395 rows: Some(rows),
396 write_hint: None,
397 }
398 }
399
400 fn check_key_values(
401 kvs: &KeyValues,
402 num_rows: usize,
403 keys: &[Option<i64>],
404 ts: i64,
405 values: &[Option<i64>],
406 ) {
407 assert_eq!(num_rows, kvs.num_rows());
408 let mut expect_seq = START_SEQ;
409 let expect_ts = ValueRef::Int64(ts);
410 for kv in kvs.iter() {
411 assert_eq!(expect_seq, kv.sequence());
412 expect_seq += 1;
413 assert_eq!(OpType::Put, kv.op_type);
414 assert_eq!(keys.len(), kv.num_primary_keys());
415 assert_eq!(values.len(), kv.num_fields());
416
417 assert_eq!(expect_ts, kv.timestamp());
418 let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
419 let actual_keys: Vec<_> = kv.primary_keys().collect();
420 assert_eq!(expect_keys, actual_keys);
421 let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
422 let actual_values: Vec<_> = kv.fields().collect();
423 assert_eq!(expect_values, actual_values);
424 }
425 }
426
427 #[test]
428 fn test_empty_key_values() {
429 let meta = new_region_metadata(1, 1);
430 let mutation = Mutation {
431 op_type: OpType::Put as i32,
432 sequence: 100,
433 rows: None,
434 write_hint: None,
435 };
436 let kvs = KeyValues::new(&meta, mutation);
437 assert!(kvs.is_none());
438 }
439
440 #[test]
441 fn test_ts_only() {
442 let meta = new_region_metadata(0, 0);
443 let mutation = new_mutation(&["ts"], 2);
444 let kvs = KeyValues::new(&meta, mutation).unwrap();
445 check_key_values(&kvs, 2, &[], 0, &[]);
446 }
447
448 #[test]
449 fn test_no_field() {
450 let meta = new_region_metadata(2, 0);
451 let mutation = new_mutation(&["k1", "ts", "k0"], 3);
454 let kvs = KeyValues::new(&meta, mutation).unwrap();
455 check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
459 }
460
461 #[test]
462 fn test_no_tag() {
463 let meta = new_region_metadata(0, 2);
464 let mutation = new_mutation(&["v1", "v0", "ts"], 3);
467 let kvs = KeyValues::new(&meta, mutation).unwrap();
468 check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
472 }
473
474 #[test]
475 fn test_tag_field() {
476 let meta = new_region_metadata(2, 2);
477 let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
480 let kvs = KeyValues::new(&meta, mutation).unwrap();
481 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
486 }
487
488 #[test]
489 fn test_sparse_field() {
490 let meta = new_region_metadata(2, 2);
491 let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
494 let kvs = KeyValues::new(&meta, mutation).unwrap();
495 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
500 }
501
502 #[test]
503 fn test_sparse_tag_field() {
504 let meta = new_region_metadata(2, 2);
505 let mutation = new_mutation(&["k0", "v0", "ts"], 3);
508 let kvs = KeyValues::new(&meta, mutation).unwrap();
509 check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
514 }
515}