mito2/memtable/
key_values.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
18use datatypes::prelude::ConcreteDataType;
19use datatypes::value::ValueRef;
20use memcomparable::Deserializer;
21use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
22use store_api::metadata::RegionMetadata;
23use store_api::storage::SequenceNumber;
24
25use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
26
27/// Key value view of a mutation.
28#[derive(Debug)]
29pub struct KeyValues {
30    /// Mutation to read.
31    ///
32    /// This mutation must be a valid mutation and rows in the mutation
33    /// must not be `None`.
34    pub(crate) mutation: Mutation,
35    /// Key value read helper.
36    helper: SparseReadRowHelper,
37    /// Primary key encoding hint.
38    primary_key_encoding: PrimaryKeyEncoding,
39}
40
41impl KeyValues {
42    /// Creates [KeyValues] from specific `mutation`.
43    ///
44    /// Returns `None` if `rows` of the `mutation` is `None`.
45    pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
46        let rows = mutation.rows.as_ref()?;
47        let primary_key_encoding =
48            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
49        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
50
51        Some(KeyValues {
52            mutation,
53            helper,
54            primary_key_encoding,
55        })
56    }
57
58    /// Returns a key value iterator.
59    pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
60        let rows = self.mutation.rows.as_ref().unwrap();
61        let schema = &rows.schema;
62        rows.rows.iter().enumerate().map(|(idx, row)| {
63            KeyValue {
64                row,
65                schema,
66                helper: &self.helper,
67                sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
68                // Safety: This is a valid mutation.
69                op_type: OpType::try_from(self.mutation.op_type).unwrap(),
70                primary_key_encoding: self.primary_key_encoding,
71            }
72        })
73    }
74
75    /// Returns number of rows.
76    pub fn num_rows(&self) -> usize {
77        // Safety: rows is not None.
78        self.mutation.rows.as_ref().unwrap().rows.len()
79    }
80
81    /// Returns if this container is empty
82    pub fn is_empty(&self) -> bool {
83        self.mutation.rows.is_none()
84    }
85
86    /// Return the max sequence in this container.
87    ///
88    /// When the mutation has no rows, the sequence is the same as the mutation sequence.
89    pub fn max_sequence(&self) -> SequenceNumber {
90        let mut sequence = self.mutation.sequence;
91        let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
92        sequence += num_rows;
93        if num_rows > 0 {
94            sequence -= 1;
95        }
96
97        sequence
98    }
99}
100
101/// Key value view of a mutation.
102#[derive(Debug)]
103pub struct KeyValuesRef<'a> {
104    /// Mutation to read.
105    ///
106    /// This mutation must be a valid mutation and rows in the mutation
107    /// must not be `None`.
108    mutation: &'a Mutation,
109    /// Key value read helper.
110    helper: SparseReadRowHelper,
111    /// Primary key encoding hint.
112    primary_key_encoding: PrimaryKeyEncoding,
113}
114
115impl<'a> KeyValuesRef<'a> {
116    /// Creates [crate::memtable::KeyValues] from specific `mutation`.
117    ///
118    /// Returns `None` if `rows` of the `mutation` is `None`.
119    pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
120        let rows = mutation.rows.as_ref()?;
121        let primary_key_encoding =
122            infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
123        let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
124
125        Some(KeyValuesRef {
126            mutation,
127            helper,
128            primary_key_encoding,
129        })
130    }
131
132    /// Returns a key value iterator.
133    pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
134        let rows = self.mutation.rows.as_ref().unwrap();
135        let schema = &rows.schema;
136        rows.rows.iter().enumerate().map(|(idx, row)| {
137            KeyValue {
138                row,
139                schema,
140                helper: &self.helper,
141                sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
142                // Safety: This is a valid mutation.
143                op_type: OpType::try_from(self.mutation.op_type).unwrap(),
144                primary_key_encoding: self.primary_key_encoding,
145            }
146        })
147    }
148
149    /// Returns number of rows.
150    pub fn num_rows(&self) -> usize {
151        // Safety: rows is not None.
152        self.mutation.rows.as_ref().unwrap().rows.len()
153    }
154}
155
156/// Key value view of a row.
157///
158/// A key value view divides primary key columns and field (value) columns.
159/// Primary key columns have the same order as region's primary key. Field
160/// columns are ordered by their position in the region schema (The same order
161/// as users defined while creating the region).
162#[derive(Debug, Clone, Copy)]
163pub struct KeyValue<'a> {
164    row: &'a Row,
165    schema: &'a Vec<ColumnSchema>,
166    helper: &'a SparseReadRowHelper,
167    sequence: SequenceNumber,
168    op_type: OpType,
169    primary_key_encoding: PrimaryKeyEncoding,
170}
171
172impl KeyValue<'_> {
173    /// Returns primary key encoding.
174    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
175        self.primary_key_encoding
176    }
177
178    /// Returns the partition key.
179    pub fn partition_key(&self) -> u32 {
180        // TODO(yingwen): refactor this code
181        if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
182            let Some(primary_key) = self.primary_keys().next() else {
183                return 0;
184            };
185            let key = primary_key.as_binary().unwrap().unwrap();
186
187            let mut deserializer = Deserializer::new(key);
188            deserializer.advance(COLUMN_ID_ENCODE_SIZE);
189            let field = SortField::new(ConcreteDataType::uint32_datatype());
190            let table_id = field.deserialize(&mut deserializer).unwrap();
191            table_id.as_value_ref().as_u32().unwrap().unwrap()
192        } else {
193            let Some(value) = self.primary_keys().next() else {
194                return 0;
195            };
196
197            value.as_u32().unwrap().unwrap()
198        }
199    }
200
201    /// Get primary key columns.
202    pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
203        self.helper.indices[..self.helper.num_primary_key_column]
204            .iter()
205            .map(|idx| match idx {
206                Some(i) => api::helper::pb_value_to_value_ref(
207                    &self.row.values[*i],
208                    &self.schema[*i].datatype_extension,
209                ),
210                None => ValueRef::Null,
211            })
212    }
213
214    /// Get field columns.
215    pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
216        self.helper.indices[self.helper.num_primary_key_column + 1..]
217            .iter()
218            .map(|idx| match idx {
219                Some(i) => api::helper::pb_value_to_value_ref(
220                    &self.row.values[*i],
221                    &self.schema[*i].datatype_extension,
222                ),
223                None => ValueRef::Null,
224            })
225    }
226
227    /// Get timestamp.
228    pub fn timestamp(&self) -> ValueRef {
229        // Timestamp is primitive, we clone it.
230        let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
231        api::helper::pb_value_to_value_ref(
232            &self.row.values[index],
233            &self.schema[index].datatype_extension,
234        )
235    }
236
237    /// Get number of primary key columns.
238    pub fn num_primary_keys(&self) -> usize {
239        self.helper.num_primary_key_column
240    }
241
242    /// Get number of field columns.
243    pub fn num_fields(&self) -> usize {
244        self.helper.indices.len() - self.helper.num_primary_key_column - 1
245    }
246
247    /// Get sequence.
248    pub fn sequence(&self) -> SequenceNumber {
249        self.sequence
250    }
251
252    /// Get op type.
253    pub fn op_type(&self) -> OpType {
254        self.op_type
255    }
256}
257
258/// Helper to read rows in key, value order for sparse data.
259#[derive(Debug)]
260struct SparseReadRowHelper {
261    /// Key and value column indices.
262    ///
263    /// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]`
264    /// is the timestamp column and remainings are field columns.
265    indices: Vec<Option<usize>>,
266    /// Number of primary key columns.
267    num_primary_key_column: usize,
268}
269
270impl SparseReadRowHelper {
271    /// Creates a [SparseReadRowHelper] for specific `rows`.
272    ///
273    /// # Panics
274    /// Time index column must exist.
275    fn new(
276        metadata: &RegionMetadata,
277        rows: &Rows,
278        primary_key_encoding: PrimaryKeyEncoding,
279    ) -> SparseReadRowHelper {
280        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
281            // We can skip build the indices for sparse primary key encoding.
282            // The order of the columns is encoded primary key, timestamp, field columns.
283            let indices = rows
284                .schema
285                .iter()
286                .enumerate()
287                .map(|(index, _)| Some(index))
288                .collect();
289            return SparseReadRowHelper {
290                indices,
291                num_primary_key_column: 1,
292            };
293        }
294        // Build a name to index mapping for rows.
295        let name_to_index: HashMap<_, _> = rows
296            .schema
297            .iter()
298            .enumerate()
299            .map(|(index, col)| (&col.column_name, index))
300            .collect();
301        let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
302
303        // Get primary key indices.
304        for pk_column_id in &metadata.primary_key {
305            // Safety: Id comes from primary key.
306            let column = metadata.column_by_id(*pk_column_id).unwrap();
307            let index = name_to_index.get(&column.column_schema.name);
308            indices.push(index.copied());
309        }
310        // Get timestamp index.
311        // Safety: time index must exist
312        let ts_index = name_to_index
313            .get(&metadata.time_index_column().column_schema.name)
314            .unwrap();
315        indices.push(Some(*ts_index));
316
317        // Iterate columns and find field columns.
318        for column in metadata.field_columns() {
319            // Get index in request for each field column.
320            let index = name_to_index.get(&column.column_schema.name);
321            indices.push(index.copied());
322        }
323
324        SparseReadRowHelper {
325            indices,
326            num_primary_key_column: metadata.primary_key.len(),
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use api::v1::{self, ColumnDataType, SemanticType};
334
335    use super::*;
336    use crate::test_util::i64_value;
337    use crate::test_util::meta_util::TestRegionMetadataBuilder;
338
339    const TS_NAME: &str = "ts";
340    const START_SEQ: SequenceNumber = 100;
341
342    /// Creates a region: `ts, k0, k1, ..., v0, v1, ...`
343    fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
344        TestRegionMetadataBuilder::default()
345            .ts_name(TS_NAME)
346            .num_tags(num_tags)
347            .num_fields(num_fields)
348            .build()
349    }
350
351    /// Creates rows `[ 0, 1, ..., n ] x num_rows`
352    fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
353        let mut rows = Vec::with_capacity(num_rows);
354        for _ in 0..num_rows {
355            // For simplicity, we use i64 for timestamp millisecond type. This violates the column schema
356            // but it's acceptable for tests.
357            let values: Vec<_> = (0..column_names.len())
358                .map(|idx| i64_value(idx as i64))
359                .collect();
360            rows.push(Row { values });
361        }
362
363        let schema = column_names
364            .iter()
365            .map(|column_name| {
366                let datatype = if *column_name == TS_NAME {
367                    ColumnDataType::TimestampMillisecond as i32
368                } else {
369                    ColumnDataType::Int64 as i32
370                };
371                let semantic_type = if column_name.starts_with('k') {
372                    SemanticType::Tag as i32
373                } else if column_name.starts_with('v') {
374                    SemanticType::Field as i32
375                } else {
376                    SemanticType::Timestamp as i32
377                };
378                v1::ColumnSchema {
379                    column_name: column_name.to_string(),
380                    datatype,
381                    semantic_type,
382                    ..Default::default()
383                }
384            })
385            .collect();
386
387        Rows { rows, schema }
388    }
389
390    fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
391        let rows = new_rows(column_names, num_rows);
392        Mutation {
393            op_type: OpType::Put as i32,
394            sequence: START_SEQ,
395            rows: Some(rows),
396            write_hint: None,
397        }
398    }
399
400    fn check_key_values(
401        kvs: &KeyValues,
402        num_rows: usize,
403        keys: &[Option<i64>],
404        ts: i64,
405        values: &[Option<i64>],
406    ) {
407        assert_eq!(num_rows, kvs.num_rows());
408        let mut expect_seq = START_SEQ;
409        let expect_ts = ValueRef::Int64(ts);
410        for kv in kvs.iter() {
411            assert_eq!(expect_seq, kv.sequence());
412            expect_seq += 1;
413            assert_eq!(OpType::Put, kv.op_type);
414            assert_eq!(keys.len(), kv.num_primary_keys());
415            assert_eq!(values.len(), kv.num_fields());
416
417            assert_eq!(expect_ts, kv.timestamp());
418            let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
419            let actual_keys: Vec<_> = kv.primary_keys().collect();
420            assert_eq!(expect_keys, actual_keys);
421            let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
422            let actual_values: Vec<_> = kv.fields().collect();
423            assert_eq!(expect_values, actual_values);
424        }
425    }
426
427    #[test]
428    fn test_empty_key_values() {
429        let meta = new_region_metadata(1, 1);
430        let mutation = Mutation {
431            op_type: OpType::Put as i32,
432            sequence: 100,
433            rows: None,
434            write_hint: None,
435        };
436        let kvs = KeyValues::new(&meta, mutation);
437        assert!(kvs.is_none());
438    }
439
440    #[test]
441    fn test_ts_only() {
442        let meta = new_region_metadata(0, 0);
443        let mutation = new_mutation(&["ts"], 2);
444        let kvs = KeyValues::new(&meta, mutation).unwrap();
445        check_key_values(&kvs, 2, &[], 0, &[]);
446    }
447
448    #[test]
449    fn test_no_field() {
450        let meta = new_region_metadata(2, 0);
451        // The value of each row:
452        // k1=0, ts=1, k0=2,
453        let mutation = new_mutation(&["k1", "ts", "k0"], 3);
454        let kvs = KeyValues::new(&meta, mutation).unwrap();
455        // KeyValues
456        // keys: [k0=2, k1=0]
457        // ts: 1,
458        check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
459    }
460
461    #[test]
462    fn test_no_tag() {
463        let meta = new_region_metadata(0, 2);
464        // The value of each row:
465        // v1=0, v0=1, ts=2,
466        let mutation = new_mutation(&["v1", "v0", "ts"], 3);
467        let kvs = KeyValues::new(&meta, mutation).unwrap();
468        // KeyValues (note that v0 is in front of v1 in region schema)
469        // ts: 2,
470        // fields: [v0=1, v1=0]
471        check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
472    }
473
474    #[test]
475    fn test_tag_field() {
476        let meta = new_region_metadata(2, 2);
477        // The value of each row:
478        // k0=0, v0=1, ts=2, k1=3, v1=4,
479        let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
480        let kvs = KeyValues::new(&meta, mutation).unwrap();
481        // KeyValues
482        // keys: [k0=0, k1=3]
483        // ts: 2,
484        // fields: [v0=1, v1=4]
485        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
486    }
487
488    #[test]
489    fn test_sparse_field() {
490        let meta = new_region_metadata(2, 2);
491        // The value of each row:
492        // k0=0, v0=1, ts=2, k1=3, (v1 will be null)
493        let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
494        let kvs = KeyValues::new(&meta, mutation).unwrap();
495        // KeyValues
496        // keys: [k0=0, k1=3]
497        // ts: 2,
498        // fields: [v0=1, v1=null]
499        check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
500    }
501
502    #[test]
503    fn test_sparse_tag_field() {
504        let meta = new_region_metadata(2, 2);
505        // The value of each row:
506        // k0 = 0, v0=1, ts=2, (k1, v1 will be null)
507        let mutation = new_mutation(&["k0", "v0", "ts"], 3);
508        let kvs = KeyValues::new(&meta, mutation).unwrap();
509        // KeyValues
510        // keys: [k0=0, k1=null]
511        // ts: 2,
512        // fields: [v0=1, v1=null]
513        check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
514    }
515}