mito_codec/
key_values.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{COLUMN_ID_ENCODE_SIZE, SortField};
26
27/// Key value view of a mutation.
28#[derive(Debug)]
29pub struct KeyValues {
30    /// Mutation to read.
31    ///
32    /// This mutation must be a valid mutation and rows in the mutation
33    /// must not be `None`.
34    pub mutation: Mutation,
35    /// Key value read helper.
36    helper: SparseReadRowHelper,
37    /// Primary key encoding hint.
38    primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42    /// Creates [KeyValues] from specific `mutation`.
43    ///
44    /// Returns `None` if `rows` of the `mutation` is `None`.
45    pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46        let rows = mutation.rows.as_ref()?;
47        let primary_key_encoding =
48            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51        Some(KeyValues {
52            mutation,
53            helper,
54            primary_key_encoding,
55        })
56    }
57
58    /// Returns a key value iterator.
59    pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
60        let rows = self.mutation.rows.as_ref().unwrap();
61        let schema = &rows.schema;
62        rows.rows.iter().enumerate().map(|(idx, row)| {
63            KeyValue {
64                row,
65                schema,
66                helper: &self.helper,
67                sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
68                // Safety: This is a valid mutation.
69                op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70                primary_key_encoding: self.primary_key_encoding,
71            }
72        })
73    }
74
75    /// Returns number of rows.
76    pub fn num_rows(&self) -> usize {
77        // Safety: rows is not None.
78        self.mutation.rows.as_ref().unwrap().rows.len()
79    }
80
81    /// Returns if this container is empty
82    pub fn is_empty(&self) -> bool {
83        self.mutation.rows.is_none()
84    }
85
86    /// Return the max sequence in this container.
87    ///
88    /// When the mutation has no rows, the sequence is the same as the mutation sequence.
89    pub fn max_sequence(&self) -> SequenceNumber {
90        let mut sequence = self.mutation.sequence;
91        let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92        sequence += num_rows;
93        if num_rows > 0 {
94            sequence -= 1;
95        }
96
97        sequence
98    }
99}
100
101/// Key value view of a mutation.
102#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104    /// Mutation to read.
105    ///
106    /// This mutation must be a valid mutation and rows in the mutation
107    /// must not be `None`.
108    mutation: &'a Mutation,
109    /// Key value read helper.
110    helper: SparseReadRowHelper,
111    /// Primary key encoding hint.
112    primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116    /// Creates [crate::memtable::KeyValues] from specific `mutation`.
117    ///
118    /// Returns `None` if `rows` of the `mutation` is `None`.
119    pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120        let rows = mutation.rows.as_ref()?;
121        let primary_key_encoding =
122            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125        Some(KeyValuesRef {
126            mutation,
127            helper,
128            primary_key_encoding,
129        })
130    }
131
132    /// Returns a key value iterator.
133    pub fn iter(&self) -> impl Iterator<Item = KeyValue<'_>> {
134        let rows = self.mutation.rows.as_ref().unwrap();
135        let schema = &rows.schema;
136        rows.rows.iter().enumerate().map(|(idx, row)| {
137            KeyValue {
138                row,
139                schema,
140                helper: &self.helper,
141                sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
142                // Safety: This is a valid mutation.
143                op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144                primary_key_encoding: self.primary_key_encoding,
145            }
146        })
147    }
148
149    /// Returns number of rows.
150    pub fn num_rows(&self) -> usize {
151        // Safety: rows is not None.
152        self.mutation.rows.as_ref().unwrap().rows.len()
153    }
154}
155
156/// Key value view of a row.
157///
158/// A key value view divides primary key columns and field (value) columns.
159/// Primary key columns have the same order as region's primary key. Field
160/// columns are ordered by their position in the region schema (The same order
161/// as users defined while creating the region).
162#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164    row: &'a Row,
165    schema: &'a Vec<ColumnSchema>,
166    helper: &'a SparseReadRowHelper,
167    sequence: SequenceNumber,
168    op_type: OpType,
169    primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173    /// Returns primary key encoding.
174    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175        self.primary_key_encoding
176    }
177
178    /// Returns the partition key.
179    pub fn partition_key(&self) -> u32 {
180        // TODO(yingwen): refactor this code
181        if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182            let Some(primary_key) = self.primary_keys().next() else {
183                return 0;
184            };
185            let key = primary_key.try_into_binary().unwrap().unwrap();
186
187            let mut deserializer = Deserializer::new(key);
188            deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189            let field = SortField::new(ConcreteDataType::uint32_datatype());
190            let table_id = field.deserialize(&mut deserializer).unwrap();
191            table_id.as_value_ref().try_into_u32().unwrap().unwrap()
192        } else {
193            let Some(value) = self.primary_keys().next() else {
194                return 0;
195            };
196
197            value.try_into_u32().unwrap().unwrap()
198        }
199    }
200
201    /// Get primary key columns.
202    pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef<'_>> {
203        self.helper.indices[..self.helper.num_primary_key_column]
204            .iter()
205            .map(|idx| match idx {
206                Some(i) => api::helper::pb_value_to_value_ref(
207                    &self.row.values[*i],
208                    self.schema[*i].datatype_extension.as_ref(),
209                ),
210                None => ValueRef::Null,
211            })
212    }
213
214    /// Get field columns.
215    pub fn fields(&self) -> impl Iterator<Item = ValueRef<'_>> {
216        self.helper.indices[self.helper.num_primary_key_column + 1..]
217            .iter()
218            .map(|idx| match idx {
219                Some(i) => api::helper::pb_value_to_value_ref(
220                    &self.row.values[*i],
221                    self.schema[*i].datatype_extension.as_ref(),
222                ),
223                None => ValueRef::Null,
224            })
225    }
226
227    /// Get timestamp.
228    pub fn timestamp(&self) -> ValueRef<'_> {
229        // Timestamp is primitive, we clone it.
230        let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231        api::helper::pb_value_to_value_ref(
232            &self.row.values[index],
233            self.schema[index].datatype_extension.as_ref(),
234        )
235    }
236
237    /// Get number of primary key columns.
238    pub fn num_primary_keys(&self) -> usize {
239        self.helper.num_primary_key_column
240    }
241
242    /// Get number of field columns.
243    pub fn num_fields(&self) -> usize {
244        self.helper.indices.len() - self.helper.num_primary_key_column - 1
245    }
246
247    /// Get sequence.
248    pub fn sequence(&self) -> SequenceNumber {
249        self.sequence
250    }
251
252    /// Get op type.
253    pub fn op_type(&self) -> OpType {
254        self.op_type
255    }
256}
257
258/// Helper to read rows in key, value order for sparse data.
259#[derive(Debug)]
260struct SparseReadRowHelper {
261    /// Key and value column indices.
262    ///
263    /// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]`
264    /// is the timestamp column and remainings are field columns.
265    indices: Vec<Option<usize>>,
266    /// Number of primary key columns.
267    num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271    /// Creates a [SparseReadRowHelper] for specific `rows`.
272    ///
273    /// # Panics
274    /// Time index column must exist.
275    fn new(
276        metadata: &RegionMetadata,
277        rows: &Rows,
278        primary_key_encoding: PrimaryKeyEncoding,
279    ) -> SparseReadRowHelper {
280        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281            // Optimized case: when schema has exactly 3 columns (primary key, timestamp, and one field),
282            // we can directly use their indices in order without building an explicit mapping.
283            // The column order is: encoded primary key, timestamp, and field.
284            if rows.schema.len() == 3 {
285                let indices = rows
286                    .schema
287                    .iter()
288                    .enumerate()
289                    .map(|(index, _)| Some(index))
290                    .collect();
291                return SparseReadRowHelper {
292                    indices,
293                    num_primary_key_column: 1,
294                };
295            };
296
297            let mut indices = Vec::with_capacity(rows.schema.len());
298            let name_to_index: HashMap<_, _> = rows
299                .schema
300                .iter()
301                .enumerate()
302                .map(|(index, col)| (&col.column_name, index))
303                .collect();
304            indices.extend(
305                rows.schema[0..2]
306                    .iter()
307                    .enumerate()
308                    .map(|(index, _)| Some(index)),
309            );
310            // Iterate columns and find field columns.
311            for column in metadata.field_columns() {
312                // Get index in request for each field column.
313                let index = name_to_index.get(&column.column_schema.name);
314                indices.push(index.copied());
315            }
316            return SparseReadRowHelper {
317                indices,
318                num_primary_key_column: 1,
319            };
320        }
321        // Build a name to index mapping for rows.
322        let name_to_index: HashMap<_, _> = rows
323            .schema
324            .iter()
325            .enumerate()
326            .map(|(index, col)| (&col.column_name, index))
327            .collect();
328        let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
329
330        // Get primary key indices.
331        for pk_column_id in &metadata.primary_key {
332            // Safety: Id comes from primary key.
333            let column = metadata.column_by_id(*pk_column_id).unwrap();
334            let index = name_to_index.get(&column.column_schema.name);
335            indices.push(index.copied());
336        }
337        // Get timestamp index.
338        // Safety: time index must exist
339        let ts_index = name_to_index
340            .get(&metadata.time_index_column().column_schema.name)
341            .unwrap();
342        indices.push(Some(*ts_index));
343
344        // Iterate columns and find field columns.
345        for column in metadata.field_columns() {
346            // Get index in request for each field column.
347            let index = name_to_index.get(&column.column_schema.name);
348            indices.push(index.copied());
349        }
350
351        SparseReadRowHelper {
352            indices,
353            num_primary_key_column: metadata.primary_key.len(),
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use api::v1::{self, ColumnDataType, SemanticType};
361
362    use super::*;
363    use crate::test_util::{TestRegionMetadataBuilder, i64_value};
364
365    const TS_NAME: &str = "ts";
366    const START_SEQ: SequenceNumber = 100;
367
368    /// Creates a region: `ts, k0, k1, ..., v0, v1, ...`
369    fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
370        TestRegionMetadataBuilder::default()
371            .ts_name(TS_NAME)
372            .num_tags(num_tags)
373            .num_fields(num_fields)
374            .build()
375    }
376
377    /// Creates rows `[ 0, 1, ..., n ] x num_rows`
378    fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
379        let mut rows = Vec::with_capacity(num_rows);
380        for _ in 0..num_rows {
381            // For simplicity, we use i64 for timestamp millisecond type. This violates the column schema
382            // but it's acceptable for tests.
383            let values: Vec<_> = (0..column_names.len())
384                .map(|idx| i64_value(idx as i64))
385                .collect();
386            rows.push(Row { values });
387        }
388
389        let schema = column_names
390            .iter()
391            .map(|column_name| {
392                let datatype = if *column_name == TS_NAME {
393                    ColumnDataType::TimestampMillisecond as i32
394                } else {
395                    ColumnDataType::Int64 as i32
396                };
397                let semantic_type = if column_name.starts_with('k') {
398                    SemanticType::Tag as i32
399                } else if column_name.starts_with('v') {
400                    SemanticType::Field as i32
401                } else {
402                    SemanticType::Timestamp as i32
403                };
404                v1::ColumnSchema {
405                    column_name: column_name.to_string(),
406                    datatype,
407                    semantic_type,
408                    ..Default::default()
409                }
410            })
411            .collect();
412
413        Rows { rows, schema }
414    }
415
416    fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
417        let rows = new_rows(column_names, num_rows);
418        Mutation {
419            op_type: OpType::Put as i32,
420            sequence: START_SEQ,
421            rows: Some(rows),
422            write_hint: None,
423        }
424    }
425
426    fn check_key_values(
427        kvs: &KeyValues,
428        num_rows: usize,
429        keys: &[Option<i64>],
430        ts: i64,
431        values: &[Option<i64>],
432    ) {
433        assert_eq!(num_rows, kvs.num_rows());
434        let mut expect_seq = START_SEQ;
435        let expect_ts = ValueRef::Int64(ts);
436        for kv in kvs.iter() {
437            assert_eq!(expect_seq, kv.sequence());
438            expect_seq += 1;
439            assert_eq!(OpType::Put, kv.op_type);
440            assert_eq!(keys.len(), kv.num_primary_keys());
441            assert_eq!(values.len(), kv.num_fields());
442
443            assert_eq!(expect_ts, kv.timestamp());
444            let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
445            let actual_keys: Vec<_> = kv.primary_keys().collect();
446            assert_eq!(expect_keys, actual_keys);
447            let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
448            let actual_values: Vec<_> = kv.fields().collect();
449            assert_eq!(expect_values, actual_values);
450        }
451    }
452
453    #[test]
454    fn test_empty_key_values() {
455        let meta = new_region_metadata(1, 1);
456        let mutation = Mutation {
457            op_type: OpType::Put as i32,
458            sequence: 100,
459            rows: None,
460            write_hint: None,
461        };
462        let kvs = KeyValues::new(&meta, mutation);
463        assert!(kvs.is_none());
464    }
465
466    #[test]
467    fn test_ts_only() {
468        let meta = new_region_metadata(0, 0);
469        let mutation = new_mutation(&["ts"], 2);
470        let kvs = KeyValues::new(&meta, mutation).unwrap();
471        check_key_values(&kvs, 2, &[], 0, &[]);
472    }
473
474    #[test]
475    fn test_no_field() {
476        let meta = new_region_metadata(2, 0);
477        // The value of each row:
478        // k1=0, ts=1, k0=2,
479        let mutation = new_mutation(&["k1", "ts", "k0"], 3);
480        let kvs = KeyValues::new(&meta, mutation).unwrap();
481        // KeyValues
482        // keys: [k0=2, k1=0]
483        // ts: 1,
484        check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
485    }
486
487    #[test]
488    fn test_no_tag() {
489        let meta = new_region_metadata(0, 2);
490        // The value of each row:
491        // v1=0, v0=1, ts=2,
492        let mutation = new_mutation(&["v1", "v0", "ts"], 3);
493        let kvs = KeyValues::new(&meta, mutation).unwrap();
494        // KeyValues (note that v0 is in front of v1 in region schema)
495        // ts: 2,
496        // fields: [v0=1, v1=0]
497        check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
498    }
499
500    #[test]
501    fn test_tag_field() {
502        let meta = new_region_metadata(2, 2);
503        // The value of each row:
504        // k0=0, v0=1, ts=2, k1=3, v1=4,
505        let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
506        let kvs = KeyValues::new(&meta, mutation).unwrap();
507        // KeyValues
508        // keys: [k0=0, k1=3]
509        // ts: 2,
510        // fields: [v0=1, v1=4]
511        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
512    }
513
514    #[test]
515    fn test_sparse_field() {
516        let meta = new_region_metadata(2, 2);
517        // The value of each row:
518        // k0=0, v0=1, ts=2, k1=3, (v1 will be null)
519        let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
520        let kvs = KeyValues::new(&meta, mutation).unwrap();
521        // KeyValues
522        // keys: [k0=0, k1=3]
523        // ts: 2,
524        // fields: [v0=1, v1=null]
525        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
526    }
527
528    #[test]
529    fn test_sparse_tag_field() {
530        let meta = new_region_metadata(2, 2);
531        // The value of each row:
532        // k0 = 0, v0=1, ts=2, (k1, v1 will be null)
533        let mutation = new_mutation(&["k0", "v0", "ts"], 3);
534        let kvs = KeyValues::new(&meta, mutation).unwrap();
535        // KeyValues
536        // keys: [k0=0, k1=null]
537        // ts: 2,
538        // fields: [v0=1, v1=null]
539        check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
540    }
541}