mito_codec/
row_converter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod dense;
16pub mod sparse;
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use common_recordbatch::filter::SimpleFilterEvaluator;
22use datatypes::value::{Value, ValueRef};
23pub use dense::{DensePrimaryKeyCodec, SortField};
24pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::{RegionMetadata, RegionMetadataRef};
27use store_api::storage::ColumnId;
28
29use crate::error::Result;
30use crate::key_values::KeyValue;
31
32/// Row value encoder/decoder.
33pub trait PrimaryKeyCodecExt {
34    /// Encodes rows to bytes.
35    /// # Note
36    /// Ensure the length of row iterator matches the length of fields.
37    fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
38    where
39        I: Iterator<Item = ValueRef<'a>>,
40    {
41        let mut buffer = Vec::new();
42        self.encode_to_vec(row, &mut buffer)?;
43        Ok(buffer)
44    }
45
46    /// Encodes rows to specific vec.
47    /// # Note
48    /// Ensure the length of row iterator matches the length of fields.
49    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
50    where
51        I: Iterator<Item = ValueRef<'a>>;
52}
53
54pub trait PrimaryKeyFilter: Send + Sync {
55    /// Returns true if the primary key matches the filter.
56    fn matches(&mut self, pk: &[u8]) -> bool;
57}
58
59/// Composite values decoded from primary key bytes.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum CompositeValues {
62    Dense(Vec<(ColumnId, Value)>),
63    Sparse(SparseValues),
64}
65
66impl CompositeValues {
67    /// Extends the composite values with the given values.
68    pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
69        match self {
70            CompositeValues::Dense(dense_values) => {
71                for (column_id, value) in values {
72                    dense_values.push((*column_id, value.clone()));
73                }
74            }
75            CompositeValues::Sparse(sprase_value) => {
76                for (column_id, value) in values {
77                    sprase_value.insert(*column_id, value.clone());
78                }
79            }
80        }
81    }
82}
83
84#[cfg(any(test, feature = "testing"))]
85impl CompositeValues {
86    pub fn into_sparse(self) -> SparseValues {
87        match self {
88            CompositeValues::Sparse(v) => v,
89            _ => panic!("CompositeValues is not sparse"),
90        }
91    }
92
93    pub fn into_dense(self) -> Vec<Value> {
94        match self {
95            CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
96            _ => panic!("CompositeValues is not dense"),
97        }
98    }
99}
100
101pub trait PrimaryKeyCodec: Send + Sync + Debug {
102    /// Encodes a key value to bytes.
103    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
104
105    /// Encodes values to bytes.
106    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
107
108    /// Encodes values to bytes.
109    fn encode_value_refs(
110        &self,
111        values: &[(ColumnId, ValueRef)],
112        buffer: &mut Vec<u8>,
113    ) -> Result<()>;
114
115    /// Returns the number of fields in the primary key.
116    fn num_fields(&self) -> Option<usize>;
117
118    /// Returns a primary key filter factory.
119    fn primary_key_filter(
120        &self,
121        metadata: &RegionMetadataRef,
122        filters: Arc<Vec<SimpleFilterEvaluator>>,
123    ) -> Box<dyn PrimaryKeyFilter>;
124
125    /// Returns the estimated size of the primary key.
126    fn estimated_size(&self) -> Option<usize> {
127        None
128    }
129
130    /// Returns the encoding type of the primary key.
131    fn encoding(&self) -> PrimaryKeyEncoding;
132
133    /// Decodes the primary key from the given bytes.
134    ///
135    /// Returns a [`CompositeValues`] that follows the primary key ordering.
136    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
137
138    /// Decode the leftmost value from bytes.
139    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
140}
141
142/// Builds a primary key codec from region metadata.
143pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
144    let fields = region_metadata.primary_key_columns().map(|col| {
145        (
146            col.column_id,
147            SortField::new(col.column_schema.data_type.clone()),
148        )
149    });
150    build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
151}
152
153/// Builds a primary key codec from region metadata.
154pub fn build_primary_key_codec_with_fields(
155    encoding: PrimaryKeyEncoding,
156    fields: impl Iterator<Item = (ColumnId, SortField)>,
157) -> Arc<dyn PrimaryKeyCodec> {
158    match encoding {
159        PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
160        PrimaryKeyEncoding::Sparse => {
161            Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
162        }
163    }
164}