mito_codec/
row_converter.rs1pub mod dense;
16pub mod sparse;
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use common_recordbatch::filter::SimpleFilterEvaluator;
22use datatypes::value::{Value, ValueRef};
23pub use dense::{DensePrimaryKeyCodec, SortField};
24pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::{RegionMetadata, RegionMetadataRef};
27use store_api::storage::ColumnId;
28
29use crate::error::Result;
30use crate::key_values::KeyValue;
31
32pub trait PrimaryKeyCodecExt {
34 fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
38 where
39 I: Iterator<Item = ValueRef<'a>>,
40 {
41 let mut buffer = Vec::new();
42 self.encode_to_vec(row, &mut buffer)?;
43 Ok(buffer)
44 }
45
46 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
50 where
51 I: Iterator<Item = ValueRef<'a>>;
52}
53
54pub trait PrimaryKeyFilter: Send + Sync {
55 fn matches(&mut self, pk: &[u8]) -> bool;
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum CompositeValues {
62 Dense(Vec<(ColumnId, Value)>),
63 Sparse(SparseValues),
64}
65
66impl CompositeValues {
67 pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
69 match self {
70 CompositeValues::Dense(dense_values) => {
71 for (column_id, value) in values {
72 dense_values.push((*column_id, value.clone()));
73 }
74 }
75 CompositeValues::Sparse(sprase_value) => {
76 for (column_id, value) in values {
77 sprase_value.insert(*column_id, value.clone());
78 }
79 }
80 }
81 }
82}
83
84#[cfg(any(test, feature = "testing"))]
85impl CompositeValues {
86 pub fn into_sparse(self) -> SparseValues {
87 match self {
88 CompositeValues::Sparse(v) => v,
89 _ => panic!("CompositeValues is not sparse"),
90 }
91 }
92
93 pub fn into_dense(self) -> Vec<Value> {
94 match self {
95 CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
96 _ => panic!("CompositeValues is not dense"),
97 }
98 }
99}
100
101pub trait PrimaryKeyCodec: Send + Sync + Debug {
102 fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
104
105 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
107
108 fn encode_value_refs(
110 &self,
111 values: &[(ColumnId, ValueRef)],
112 buffer: &mut Vec<u8>,
113 ) -> Result<()>;
114
115 fn num_fields(&self) -> Option<usize>;
117
118 fn primary_key_filter(
120 &self,
121 metadata: &RegionMetadataRef,
122 filters: Arc<Vec<SimpleFilterEvaluator>>,
123 ) -> Box<dyn PrimaryKeyFilter>;
124
125 fn estimated_size(&self) -> Option<usize> {
127 None
128 }
129
130 fn encoding(&self) -> PrimaryKeyEncoding;
132
133 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
137
138 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
140}
141
142pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
144 let fields = region_metadata.primary_key_columns().map(|col| {
145 (
146 col.column_id,
147 SortField::new(col.column_schema.data_type.clone()),
148 )
149 });
150 build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
151}
152
153pub fn build_primary_key_codec_with_fields(
155 encoding: PrimaryKeyEncoding,
156 fields: impl Iterator<Item = (ColumnId, SortField)>,
157) -> Arc<dyn PrimaryKeyCodec> {
158 match encoding {
159 PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
160 PrimaryKeyEncoding::Sparse => {
161 Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
162 }
163 }
164}