mito_codec/
row_converter.rs1pub mod dense;
16pub mod sparse;
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use common_recordbatch::filter::SimpleFilterEvaluator;
22use datatypes::value::{Value, ValueRef};
23pub use dense::{DensePrimaryKeyCodec, SortField};
24pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparseOffsetsCache, SparsePrimaryKeyCodec, SparseValues};
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::{RegionMetadata, RegionMetadataRef};
27use store_api::storage::ColumnId;
28
29use crate::error::Result;
30use crate::key_values::KeyValue;
31
32pub trait PrimaryKeyCodecExt {
34 fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
38 where
39 I: Iterator<Item = ValueRef<'a>>,
40 {
41 let mut buffer = Vec::new();
42 self.encode_to_vec(row, &mut buffer)?;
43 Ok(buffer)
44 }
45
46 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
50 where
51 I: Iterator<Item = ValueRef<'a>>;
52}
53
54pub trait PrimaryKeyFilter: Send + Sync {
55 fn matches(&mut self, pk: &[u8]) -> Result<bool>;
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum CompositeValues {
62 Dense(Vec<(ColumnId, Value)>),
63 Sparse(SparseValues),
64}
65
66impl CompositeValues {
67 pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
73 match self {
74 CompositeValues::Dense(dense_values) => {
75 for (column_id, value) in values {
76 dense_values.push((*column_id, value.clone()));
77 }
78 }
79 CompositeValues::Sparse(sprase_value) => {
80 for (column_id, value) in values {
81 sprase_value.insert(*column_id, value.clone());
82 }
83 }
84 }
85 }
86}
87
88#[cfg(any(test, feature = "testing"))]
89impl CompositeValues {
90 pub fn into_sparse(self) -> SparseValues {
91 match self {
92 CompositeValues::Sparse(v) => v,
93 _ => panic!("CompositeValues is not sparse"),
94 }
95 }
96
97 pub fn into_dense(self) -> Vec<Value> {
98 match self {
99 CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
100 _ => panic!("CompositeValues is not dense"),
101 }
102 }
103}
104
105pub trait PrimaryKeyCodec: Send + Sync + Debug {
106 fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
108
109 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
111
112 fn encode_value_refs(
114 &self,
115 values: &[(ColumnId, ValueRef)],
116 buffer: &mut Vec<u8>,
117 ) -> Result<()>;
118
119 fn num_fields(&self) -> Option<usize>;
121
122 fn primary_key_filter(
124 &self,
125 metadata: &RegionMetadataRef,
126 filters: Arc<Vec<SimpleFilterEvaluator>>,
127 skip_partition_column: bool,
128 ) -> Box<dyn PrimaryKeyFilter>;
129
130 fn estimated_size(&self) -> Option<usize> {
132 None
133 }
134
135 fn encoding(&self) -> PrimaryKeyEncoding;
137
138 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
142
143 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
145}
146
147pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
149 let fields = region_metadata.primary_key_columns().map(|col| {
150 (
151 col.column_id,
152 SortField::new(col.column_schema.data_type.clone()),
153 )
154 });
155 build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
156}
157
158pub fn build_primary_key_codec_with_fields(
160 encoding: PrimaryKeyEncoding,
161 fields: impl Iterator<Item = (ColumnId, SortField)>,
162) -> Arc<dyn PrimaryKeyCodec> {
163 match encoding {
164 PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
165 PrimaryKeyEncoding::Sparse => {
166 Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
167 }
168 }
169}