mito2/
row_converter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod dense;
16mod sparse;
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use datatypes::value::{Value, ValueRef};
22pub use dense::{DensePrimaryKeyCodec, SortField};
23pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::{RegionMetadata, RegionMetadataRef};
26use store_api::storage::ColumnId;
27
28use crate::error::Result;
29use crate::memtable::key_values::KeyValue;
30
31/// Row value encoder/decoder.
32pub trait PrimaryKeyCodecExt {
33    /// Encodes rows to bytes.
34    /// # Note
35    /// Ensure the length of row iterator matches the length of fields.
36    fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
37    where
38        I: Iterator<Item = ValueRef<'a>>,
39    {
40        let mut buffer = Vec::new();
41        self.encode_to_vec(row, &mut buffer)?;
42        Ok(buffer)
43    }
44
45    /// Encodes rows to specific vec.
46    /// # Note
47    /// Ensure the length of row iterator matches the length of fields.
48    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
49    where
50        I: Iterator<Item = ValueRef<'a>>;
51}
52
53pub trait PrimaryKeyFilter: Send + Sync {
54    /// Returns true if the primary key matches the filter.
55    fn matches(&mut self, pk: &[u8]) -> bool;
56}
57
58/// Composite values decoded from primary key bytes.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum CompositeValues {
61    Dense(Vec<(ColumnId, Value)>),
62    Sparse(SparseValues),
63}
64
65impl CompositeValues {
66    /// Extends the composite values with the given values.
67    pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
68        match self {
69            CompositeValues::Dense(dense_values) => {
70                for (column_id, value) in values {
71                    dense_values.push((*column_id, value.clone()));
72                }
73            }
74            CompositeValues::Sparse(sprase_value) => {
75                for (column_id, value) in values {
76                    sprase_value.insert(*column_id, value.clone());
77                }
78            }
79        }
80    }
81}
82
83#[cfg(test)]
84impl CompositeValues {
85    pub fn into_sparse(self) -> SparseValues {
86        match self {
87            CompositeValues::Sparse(v) => v,
88            _ => panic!("CompositeValues is not sparse"),
89        }
90    }
91
92    pub fn into_dense(self) -> Vec<Value> {
93        match self {
94            CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
95            _ => panic!("CompositeValues is not dense"),
96        }
97    }
98}
99
100pub trait PrimaryKeyCodec: Send + Sync + Debug {
101    /// Encodes a key value to bytes.
102    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
103
104    /// Encodes values to bytes.
105    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
106
107    /// Encodes values to bytes.
108    fn encode_value_refs(
109        &self,
110        values: &[(ColumnId, ValueRef)],
111        buffer: &mut Vec<u8>,
112    ) -> Result<()>;
113
114    /// Returns the number of fields in the primary key.
115    fn num_fields(&self) -> Option<usize>;
116
117    /// Returns a primary key filter factory.
118    fn primary_key_filter(
119        &self,
120        metadata: &RegionMetadataRef,
121        filters: Arc<Vec<SimpleFilterEvaluator>>,
122    ) -> Box<dyn PrimaryKeyFilter>;
123
124    /// Returns the estimated size of the primary key.
125    fn estimated_size(&self) -> Option<usize> {
126        None
127    }
128
129    /// Returns the encoding type of the primary key.
130    fn encoding(&self) -> PrimaryKeyEncoding;
131
132    /// Decodes the primary key from the given bytes.
133    ///
134    /// Returns a [`CompositeValues`] that follows the primary key ordering.
135    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
136
137    /// Decode the leftmost value from bytes.
138    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
139}
140
141/// Builds a primary key codec from region metadata.
142pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
143    let fields = region_metadata.primary_key_columns().map(|col| {
144        (
145            col.column_id,
146            SortField::new(col.column_schema.data_type.clone()),
147        )
148    });
149    build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
150}
151
152/// Builds a primary key codec from region metadata.
153pub fn build_primary_key_codec_with_fields(
154    encoding: PrimaryKeyEncoding,
155    fields: impl Iterator<Item = (ColumnId, SortField)>,
156) -> Arc<dyn PrimaryKeyCodec> {
157    match encoding {
158        PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
159        PrimaryKeyEncoding::Sparse => {
160            Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
161        }
162    }
163}