1use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
26
27#[derive(Debug)]
29pub struct KeyValues {
30 pub mutation: Mutation,
35 helper: SparseReadRowHelper,
37 primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42 pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46 let rows = mutation.rows.as_ref()?;
47 let primary_key_encoding =
48 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51 Some(KeyValues {
52 mutation,
53 helper,
54 primary_key_encoding,
55 })
56 }
57
58 pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
60 let rows = self.mutation.rows.as_ref().unwrap();
61 let schema = &rows.schema;
62 rows.rows.iter().enumerate().map(|(idx, row)| {
63 KeyValue {
64 row,
65 schema,
66 helper: &self.helper,
67 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70 primary_key_encoding: self.primary_key_encoding,
71 }
72 })
73 }
74
75 pub fn num_rows(&self) -> usize {
77 self.mutation.rows.as_ref().unwrap().rows.len()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.mutation.rows.is_none()
84 }
85
86 pub fn max_sequence(&self) -> SequenceNumber {
90 let mut sequence = self.mutation.sequence;
91 let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92 sequence += num_rows;
93 if num_rows > 0 {
94 sequence -= 1;
95 }
96
97 sequence
98 }
99}
100
101#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104 mutation: &'a Mutation,
109 helper: SparseReadRowHelper,
111 primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116 pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120 let rows = mutation.rows.as_ref()?;
121 let primary_key_encoding =
122 infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123 let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125 Some(KeyValuesRef {
126 mutation,
127 helper,
128 primary_key_encoding,
129 })
130 }
131
132 pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
134 let rows = self.mutation.rows.as_ref().unwrap();
135 let schema = &rows.schema;
136 rows.rows.iter().enumerate().map(|(idx, row)| {
137 KeyValue {
138 row,
139 schema,
140 helper: &self.helper,
141 sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144 primary_key_encoding: self.primary_key_encoding,
145 }
146 })
147 }
148
149 pub fn num_rows(&self) -> usize {
151 self.mutation.rows.as_ref().unwrap().rows.len()
153 }
154}
155
156#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164 row: &'a Row,
165 schema: &'a Vec<ColumnSchema>,
166 helper: &'a SparseReadRowHelper,
167 sequence: SequenceNumber,
168 op_type: OpType,
169 primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175 self.primary_key_encoding
176 }
177
178 pub fn partition_key(&self) -> u32 {
180 if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182 let Some(primary_key) = self.primary_keys().next() else {
183 return 0;
184 };
185 let key = primary_key.as_binary().unwrap().unwrap();
186
187 let mut deserializer = Deserializer::new(key);
188 deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189 let field = SortField::new(ConcreteDataType::uint32_datatype());
190 let table_id = field.deserialize(&mut deserializer).unwrap();
191 table_id.as_value_ref().as_u32().unwrap().unwrap()
192 } else {
193 let Some(value) = self.primary_keys().next() else {
194 return 0;
195 };
196
197 value.as_u32().unwrap().unwrap()
198 }
199 }
200
201 pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
203 self.helper.indices[..self.helper.num_primary_key_column]
204 .iter()
205 .map(|idx| match idx {
206 Some(i) => api::helper::pb_value_to_value_ref(
207 &self.row.values[*i],
208 &self.schema[*i].datatype_extension,
209 ),
210 None => ValueRef::Null,
211 })
212 }
213
214 pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
216 self.helper.indices[self.helper.num_primary_key_column + 1..]
217 .iter()
218 .map(|idx| match idx {
219 Some(i) => api::helper::pb_value_to_value_ref(
220 &self.row.values[*i],
221 &self.schema[*i].datatype_extension,
222 ),
223 None => ValueRef::Null,
224 })
225 }
226
227 pub fn timestamp(&self) -> ValueRef {
229 let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231 api::helper::pb_value_to_value_ref(
232 &self.row.values[index],
233 &self.schema[index].datatype_extension,
234 )
235 }
236
237 pub fn num_primary_keys(&self) -> usize {
239 self.helper.num_primary_key_column
240 }
241
242 pub fn num_fields(&self) -> usize {
244 self.helper.indices.len() - self.helper.num_primary_key_column - 1
245 }
246
247 pub fn sequence(&self) -> SequenceNumber {
249 self.sequence
250 }
251
252 pub fn op_type(&self) -> OpType {
254 self.op_type
255 }
256}
257
258#[derive(Debug)]
260struct SparseReadRowHelper {
261 indices: Vec<Option<usize>>,
266 num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271 fn new(
276 metadata: &RegionMetadata,
277 rows: &Rows,
278 primary_key_encoding: PrimaryKeyEncoding,
279 ) -> SparseReadRowHelper {
280 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281 let indices = rows
284 .schema
285 .iter()
286 .enumerate()
287 .map(|(index, _)| Some(index))
288 .collect();
289 return SparseReadRowHelper {
290 indices,
291 num_primary_key_column: 1,
292 };
293 }
294 let name_to_index: HashMap<_, _> = rows
296 .schema
297 .iter()
298 .enumerate()
299 .map(|(index, col)| (&col.column_name, index))
300 .collect();
301 let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
302
303 for pk_column_id in &metadata.primary_key {
305 let column = metadata.column_by_id(*pk_column_id).unwrap();
307 let index = name_to_index.get(&column.column_schema.name);
308 indices.push(index.copied());
309 }
310 let ts_index = name_to_index
313 .get(&metadata.time_index_column().column_schema.name)
314 .unwrap();
315 indices.push(Some(*ts_index));
316
317 for column in metadata.field_columns() {
319 let index = name_to_index.get(&column.column_schema.name);
321 indices.push(index.copied());
322 }
323
324 SparseReadRowHelper {
325 indices,
326 num_primary_key_column: metadata.primary_key.len(),
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use api::v1::{self, ColumnDataType, SemanticType};
334
335 use super::*;
336 use crate::test_util::{i64_value, TestRegionMetadataBuilder};
337
338 const TS_NAME: &str = "ts";
339 const START_SEQ: SequenceNumber = 100;
340
341 fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
343 TestRegionMetadataBuilder::default()
344 .ts_name(TS_NAME)
345 .num_tags(num_tags)
346 .num_fields(num_fields)
347 .build()
348 }
349
350 fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
352 let mut rows = Vec::with_capacity(num_rows);
353 for _ in 0..num_rows {
354 let values: Vec<_> = (0..column_names.len())
357 .map(|idx| i64_value(idx as i64))
358 .collect();
359 rows.push(Row { values });
360 }
361
362 let schema = column_names
363 .iter()
364 .map(|column_name| {
365 let datatype = if *column_name == TS_NAME {
366 ColumnDataType::TimestampMillisecond as i32
367 } else {
368 ColumnDataType::Int64 as i32
369 };
370 let semantic_type = if column_name.starts_with('k') {
371 SemanticType::Tag as i32
372 } else if column_name.starts_with('v') {
373 SemanticType::Field as i32
374 } else {
375 SemanticType::Timestamp as i32
376 };
377 v1::ColumnSchema {
378 column_name: column_name.to_string(),
379 datatype,
380 semantic_type,
381 ..Default::default()
382 }
383 })
384 .collect();
385
386 Rows { rows, schema }
387 }
388
389 fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
390 let rows = new_rows(column_names, num_rows);
391 Mutation {
392 op_type: OpType::Put as i32,
393 sequence: START_SEQ,
394 rows: Some(rows),
395 write_hint: None,
396 }
397 }
398
399 fn check_key_values(
400 kvs: &KeyValues,
401 num_rows: usize,
402 keys: &[Option<i64>],
403 ts: i64,
404 values: &[Option<i64>],
405 ) {
406 assert_eq!(num_rows, kvs.num_rows());
407 let mut expect_seq = START_SEQ;
408 let expect_ts = ValueRef::Int64(ts);
409 for kv in kvs.iter() {
410 assert_eq!(expect_seq, kv.sequence());
411 expect_seq += 1;
412 assert_eq!(OpType::Put, kv.op_type);
413 assert_eq!(keys.len(), kv.num_primary_keys());
414 assert_eq!(values.len(), kv.num_fields());
415
416 assert_eq!(expect_ts, kv.timestamp());
417 let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
418 let actual_keys: Vec<_> = kv.primary_keys().collect();
419 assert_eq!(expect_keys, actual_keys);
420 let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
421 let actual_values: Vec<_> = kv.fields().collect();
422 assert_eq!(expect_values, actual_values);
423 }
424 }
425
426 #[test]
427 fn test_empty_key_values() {
428 let meta = new_region_metadata(1, 1);
429 let mutation = Mutation {
430 op_type: OpType::Put as i32,
431 sequence: 100,
432 rows: None,
433 write_hint: None,
434 };
435 let kvs = KeyValues::new(&meta, mutation);
436 assert!(kvs.is_none());
437 }
438
439 #[test]
440 fn test_ts_only() {
441 let meta = new_region_metadata(0, 0);
442 let mutation = new_mutation(&["ts"], 2);
443 let kvs = KeyValues::new(&meta, mutation).unwrap();
444 check_key_values(&kvs, 2, &[], 0, &[]);
445 }
446
447 #[test]
448 fn test_no_field() {
449 let meta = new_region_metadata(2, 0);
450 let mutation = new_mutation(&["k1", "ts", "k0"], 3);
453 let kvs = KeyValues::new(&meta, mutation).unwrap();
454 check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
458 }
459
460 #[test]
461 fn test_no_tag() {
462 let meta = new_region_metadata(0, 2);
463 let mutation = new_mutation(&["v1", "v0", "ts"], 3);
466 let kvs = KeyValues::new(&meta, mutation).unwrap();
467 check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
471 }
472
473 #[test]
474 fn test_tag_field() {
475 let meta = new_region_metadata(2, 2);
476 let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
479 let kvs = KeyValues::new(&meta, mutation).unwrap();
480 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
485 }
486
487 #[test]
488 fn test_sparse_field() {
489 let meta = new_region_metadata(2, 2);
490 let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
493 let kvs = KeyValues::new(&meta, mutation).unwrap();
494 check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
499 }
500
501 #[test]
502 fn test_sparse_tag_field() {
503 let meta = new_region_metadata(2, 2);
504 let mutation = new_mutation(&["k0", "v0", "ts"], 3);
507 let kvs = KeyValues::new(&meta, mutation).unwrap();
508 check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
513 }
514}