Skip to main content

mito_codec/
row_converter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod dense;
16pub mod sparse;
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use common_recordbatch::filter::SimpleFilterEvaluator;
22use datatypes::value::{Value, ValueRef};
23pub use dense::{DensePrimaryKeyCodec, SortField};
24pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparseOffsetsCache, SparsePrimaryKeyCodec, SparseValues};
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::{RegionMetadata, RegionMetadataRef};
27use store_api::storage::ColumnId;
28
29use crate::error::Result;
30use crate::key_values::KeyValue;
31
32/// Row value encoder/decoder.
33pub trait PrimaryKeyCodecExt {
34    /// Encodes rows to bytes.
35    /// # Note
36    /// Ensure the length of row iterator matches the length of fields.
37    fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
38    where
39        I: Iterator<Item = ValueRef<'a>>,
40    {
41        let mut buffer = Vec::new();
42        self.encode_to_vec(row, &mut buffer)?;
43        Ok(buffer)
44    }
45
46    /// Encodes rows to specific vec.
47    /// # Note
48    /// Ensure the length of row iterator matches the length of fields.
49    fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
50    where
51        I: Iterator<Item = ValueRef<'a>>;
52}
53
54pub trait PrimaryKeyFilter: Send + Sync {
55    /// Returns true if the primary key matches the filter.
56    fn matches(&mut self, pk: &[u8]) -> Result<bool>;
57}
58
59/// Composite values decoded from primary key bytes.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum CompositeValues {
62    Dense(Vec<(ColumnId, Value)>),
63    Sparse(SparseValues),
64}
65
66impl CompositeValues {
67    /// Extends the composite values with the given values.
68    ///
69    /// Append-only: `values` must not contain a column id already present in
70    /// the composite; otherwise the existing entry would shadow the new one on
71    /// `SparseValues` lookup.
72    pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
73        match self {
74            CompositeValues::Dense(dense_values) => {
75                for (column_id, value) in values {
76                    dense_values.push((*column_id, value.clone()));
77                }
78            }
79            CompositeValues::Sparse(sprase_value) => {
80                for (column_id, value) in values {
81                    sprase_value.insert(*column_id, value.clone());
82                }
83            }
84        }
85    }
86}
87
88#[cfg(any(test, feature = "testing"))]
89impl CompositeValues {
90    pub fn into_sparse(self) -> SparseValues {
91        match self {
92            CompositeValues::Sparse(v) => v,
93            _ => panic!("CompositeValues is not sparse"),
94        }
95    }
96
97    pub fn into_dense(self) -> Vec<Value> {
98        match self {
99            CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
100            _ => panic!("CompositeValues is not dense"),
101        }
102    }
103}
104
105pub trait PrimaryKeyCodec: Send + Sync + Debug {
106    /// Encodes a key value to bytes.
107    fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
108
109    /// Encodes values to bytes.
110    fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
111
112    /// Encodes values to bytes.
113    fn encode_value_refs(
114        &self,
115        values: &[(ColumnId, ValueRef)],
116        buffer: &mut Vec<u8>,
117    ) -> Result<()>;
118
119    /// Returns the number of fields in the primary key.
120    fn num_fields(&self) -> Option<usize>;
121
122    /// Returns a primary key filter factory.
123    fn primary_key_filter(
124        &self,
125        metadata: &RegionMetadataRef,
126        filters: Arc<Vec<SimpleFilterEvaluator>>,
127        skip_partition_column: bool,
128    ) -> Box<dyn PrimaryKeyFilter>;
129
130    /// Returns the estimated size of the primary key.
131    fn estimated_size(&self) -> Option<usize> {
132        None
133    }
134
135    /// Returns the encoding type of the primary key.
136    fn encoding(&self) -> PrimaryKeyEncoding;
137
138    /// Decodes the primary key from the given bytes.
139    ///
140    /// Returns a [`CompositeValues`] that follows the primary key ordering.
141    fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
142
143    /// Decode the leftmost value from bytes.
144    fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
145}
146
147/// Builds a primary key codec from region metadata.
148pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
149    let fields = region_metadata.primary_key_columns().map(|col| {
150        (
151            col.column_id,
152            SortField::new(col.column_schema.data_type.clone()),
153        )
154    });
155    build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
156}
157
158/// Builds a primary key codec from region metadata.
159pub fn build_primary_key_codec_with_fields(
160    encoding: PrimaryKeyEncoding,
161    fields: impl Iterator<Item = (ColumnId, SortField)>,
162) -> Arc<dyn PrimaryKeyCodec> {
163    match encoding {
164        PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
165        PrimaryKeyEncoding::Sparse => {
166            Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
167        }
168    }
169}