1use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{COLUMN_ID_ENCODE_SIZE, SortField};
26
27#[derive(Debug)]
29pub struct KeyValues {
30    pub mutation: Mutation,
35    helper: SparseReadRowHelper,
37    primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42    pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46        let rows = mutation.rows.as_ref()?;
47        let primary_key_encoding =
48            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51        Some(KeyValues {
52            mutation,
53            helper,
54            primary_key_encoding,
55        })
56    }
57
58    pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
60        let rows = self.mutation.rows.as_ref().unwrap();
61        let schema = &rows.schema;
62        rows.rows.iter().enumerate().map(|(idx, row)| {
63            KeyValue {
64                row,
65                schema,
66                helper: &self.helper,
67                sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70                primary_key_encoding: self.primary_key_encoding,
71            }
72        })
73    }
74
75    pub fn num_rows(&self) -> usize {
77        self.mutation.rows.as_ref().unwrap().rows.len()
79    }
80
81    pub fn is_empty(&self) -> bool {
83        self.mutation.rows.is_none()
84    }
85
86    pub fn max_sequence(&self) -> SequenceNumber {
90        let mut sequence = self.mutation.sequence;
91        let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92        sequence += num_rows;
93        if num_rows > 0 {
94            sequence -= 1;
95        }
96
97        sequence
98    }
99}
100
101#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104    mutation: &'a Mutation,
109    helper: SparseReadRowHelper,
111    primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116    pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120        let rows = mutation.rows.as_ref()?;
121        let primary_key_encoding =
122            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125        Some(KeyValuesRef {
126            mutation,
127            helper,
128            primary_key_encoding,
129        })
130    }
131
132    pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
134        let rows = self.mutation.rows.as_ref().unwrap();
135        let schema = &rows.schema;
136        rows.rows.iter().enumerate().map(|(idx, row)| {
137            KeyValue {
138                row,
139                schema,
140                helper: &self.helper,
141                sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144                primary_key_encoding: self.primary_key_encoding,
145            }
146        })
147    }
148
149    pub fn num_rows(&self) -> usize {
151        self.mutation.rows.as_ref().unwrap().rows.len()
153    }
154}
155
156#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164    row: &'a Row,
165    schema: &'a Vec<ColumnSchema>,
166    helper: &'a SparseReadRowHelper,
167    sequence: SequenceNumber,
168    op_type: OpType,
169    primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175        self.primary_key_encoding
176    }
177
178    pub fn partition_key(&self) -> u32 {
180        if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182            let Some(primary_key) = self.primary_keys().next() else {
183                return 0;
184            };
185            let key = primary_key.try_into_binary().unwrap().unwrap();
186
187            let mut deserializer = Deserializer::new(key);
188            deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189            let field = SortField::new(ConcreteDataType::uint32_datatype());
190            let table_id = field.deserialize(&mut deserializer).unwrap();
191            table_id.as_value_ref().try_into_u32().unwrap().unwrap()
192        } else {
193            let Some(value) = self.primary_keys().next() else {
194                return 0;
195            };
196
197            value.try_into_u32().unwrap().unwrap()
198        }
199    }
200
201    pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef<'_>> {
203        self.helper.indices[..self.helper.num_primary_key_column]
204            .iter()
205            .map(|idx| match idx {
206                Some(i) => api::helper::pb_value_to_value_ref(
207                    &self.row.values[*i],
208                    self.schema[*i].datatype_extension.as_ref(),
209                ),
210                None => ValueRef::Null,
211            })
212    }
213
214    pub fn fields(&self) -> impl Iterator<Item = ValueRef<'_>> {
216        self.helper.indices[self.helper.num_primary_key_column + 1..]
217            .iter()
218            .map(|idx| match idx {
219                Some(i) => api::helper::pb_value_to_value_ref(
220                    &self.row.values[*i],
221                    self.schema[*i].datatype_extension.as_ref(),
222                ),
223                None => ValueRef::Null,
224            })
225    }
226
227    pub fn timestamp(&self) -> ValueRef<'_> {
229        let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231        api::helper::pb_value_to_value_ref(
232            &self.row.values[index],
233            self.schema[index].datatype_extension.as_ref(),
234        )
235    }
236
237    pub fn num_primary_keys(&self) -> usize {
239        self.helper.num_primary_key_column
240    }
241
242    pub fn num_fields(&self) -> usize {
244        self.helper.indices.len() - self.helper.num_primary_key_column - 1
245    }
246
247    pub fn sequence(&self) -> SequenceNumber {
249        self.sequence
250    }
251
252    pub fn op_type(&self) -> OpType {
254        self.op_type
255    }
256}
257
258#[derive(Debug)]
260struct SparseReadRowHelper {
261    indices: Vec<Option<usize>>,
266    num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271    fn new(
276        metadata: &RegionMetadata,
277        rows: &Rows,
278        primary_key_encoding: PrimaryKeyEncoding,
279    ) -> SparseReadRowHelper {
280        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281            let indices = rows
284                .schema
285                .iter()
286                .enumerate()
287                .map(|(index, _)| Some(index))
288                .collect();
289            return SparseReadRowHelper {
290                indices,
291                num_primary_key_column: 1,
292            };
293        }
294        let name_to_index: HashMap<_, _> = rows
296            .schema
297            .iter()
298            .enumerate()
299            .map(|(index, col)| (&col.column_name, index))
300            .collect();
301        let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
302
303        for pk_column_id in &metadata.primary_key {
305            let column = metadata.column_by_id(*pk_column_id).unwrap();
307            let index = name_to_index.get(&column.column_schema.name);
308            indices.push(index.copied());
309        }
310        let ts_index = name_to_index
313            .get(&metadata.time_index_column().column_schema.name)
314            .unwrap();
315        indices.push(Some(*ts_index));
316
317        for column in metadata.field_columns() {
319            let index = name_to_index.get(&column.column_schema.name);
321            indices.push(index.copied());
322        }
323
324        SparseReadRowHelper {
325            indices,
326            num_primary_key_column: metadata.primary_key.len(),
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use api::v1::{self, ColumnDataType, SemanticType};
334
335    use super::*;
336    use crate::test_util::{TestRegionMetadataBuilder, i64_value};
337
338    const TS_NAME: &str = "ts";
339    const START_SEQ: SequenceNumber = 100;
340
341    fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
343        TestRegionMetadataBuilder::default()
344            .ts_name(TS_NAME)
345            .num_tags(num_tags)
346            .num_fields(num_fields)
347            .build()
348    }
349
350    fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
352        let mut rows = Vec::with_capacity(num_rows);
353        for _ in 0..num_rows {
354            let values: Vec<_> = (0..column_names.len())
357                .map(|idx| i64_value(idx as i64))
358                .collect();
359            rows.push(Row { values });
360        }
361
362        let schema = column_names
363            .iter()
364            .map(|column_name| {
365                let datatype = if *column_name == TS_NAME {
366                    ColumnDataType::TimestampMillisecond as i32
367                } else {
368                    ColumnDataType::Int64 as i32
369                };
370                let semantic_type = if column_name.starts_with('k') {
371                    SemanticType::Tag as i32
372                } else if column_name.starts_with('v') {
373                    SemanticType::Field as i32
374                } else {
375                    SemanticType::Timestamp as i32
376                };
377                v1::ColumnSchema {
378                    column_name: column_name.to_string(),
379                    datatype,
380                    semantic_type,
381                    ..Default::default()
382                }
383            })
384            .collect();
385
386        Rows { rows, schema }
387    }
388
389    fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
390        let rows = new_rows(column_names, num_rows);
391        Mutation {
392            op_type: OpType::Put as i32,
393            sequence: START_SEQ,
394            rows: Some(rows),
395            write_hint: None,
396        }
397    }
398
399    fn check_key_values(
400        kvs: &KeyValues,
401        num_rows: usize,
402        keys: &[Option<i64>],
403        ts: i64,
404        values: &[Option<i64>],
405    ) {
406        assert_eq!(num_rows, kvs.num_rows());
407        let mut expect_seq = START_SEQ;
408        let expect_ts = ValueRef::Int64(ts);
409        for kv in kvs.iter() {
410            assert_eq!(expect_seq, kv.sequence());
411            expect_seq += 1;
412            assert_eq!(OpType::Put, kv.op_type);
413            assert_eq!(keys.len(), kv.num_primary_keys());
414            assert_eq!(values.len(), kv.num_fields());
415
416            assert_eq!(expect_ts, kv.timestamp());
417            let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
418            let actual_keys: Vec<_> = kv.primary_keys().collect();
419            assert_eq!(expect_keys, actual_keys);
420            let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
421            let actual_values: Vec<_> = kv.fields().collect();
422            assert_eq!(expect_values, actual_values);
423        }
424    }
425
426    #[test]
427    fn test_empty_key_values() {
428        let meta = new_region_metadata(1, 1);
429        let mutation = Mutation {
430            op_type: OpType::Put as i32,
431            sequence: 100,
432            rows: None,
433            write_hint: None,
434        };
435        let kvs = KeyValues::new(&meta, mutation);
436        assert!(kvs.is_none());
437    }
438
439    #[test]
440    fn test_ts_only() {
441        let meta = new_region_metadata(0, 0);
442        let mutation = new_mutation(&["ts"], 2);
443        let kvs = KeyValues::new(&meta, mutation).unwrap();
444        check_key_values(&kvs, 2, &[], 0, &[]);
445    }
446
447    #[test]
448    fn test_no_field() {
449        let meta = new_region_metadata(2, 0);
450        let mutation = new_mutation(&["k1", "ts", "k0"], 3);
453        let kvs = KeyValues::new(&meta, mutation).unwrap();
454        check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
458    }
459
460    #[test]
461    fn test_no_tag() {
462        let meta = new_region_metadata(0, 2);
463        let mutation = new_mutation(&["v1", "v0", "ts"], 3);
466        let kvs = KeyValues::new(&meta, mutation).unwrap();
467        check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
471    }
472
473    #[test]
474    fn test_tag_field() {
475        let meta = new_region_metadata(2, 2);
476        let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
479        let kvs = KeyValues::new(&meta, mutation).unwrap();
480        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
485    }
486
487    #[test]
488    fn test_sparse_field() {
489        let meta = new_region_metadata(2, 2);
490        let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
493        let kvs = KeyValues::new(&meta, mutation).unwrap();
494        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
499    }
500
501    #[test]
502    fn test_sparse_tag_field() {
503        let meta = new_region_metadata(2, 2);
504        let mutation = new_mutation(&["k0", "v0", "ts"], 3);
507        let kvs = KeyValues::new(&meta, mutation).unwrap();
508        check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
513    }
514}