1mod dense;
16mod sparse;
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use datatypes::value::{Value, ValueRef};
22pub use dense::{DensePrimaryKeyCodec, SortField};
23pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::{RegionMetadata, RegionMetadataRef};
26use store_api::storage::ColumnId;
27
28use crate::error::Result;
29use crate::memtable::key_values::KeyValue;
30
31pub trait PrimaryKeyCodecExt {
33 fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
37 where
38 I: Iterator<Item = ValueRef<'a>>,
39 {
40 let mut buffer = Vec::new();
41 self.encode_to_vec(row, &mut buffer)?;
42 Ok(buffer)
43 }
44
45 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
49 where
50 I: Iterator<Item = ValueRef<'a>>;
51}
52
53pub trait PrimaryKeyFilter: Send + Sync {
54 fn matches(&mut self, pk: &[u8]) -> bool;
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum CompositeValues {
61 Dense(Vec<(ColumnId, Value)>),
62 Sparse(SparseValues),
63}
64
65impl CompositeValues {
66 pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
68 match self {
69 CompositeValues::Dense(dense_values) => {
70 for (column_id, value) in values {
71 dense_values.push((*column_id, value.clone()));
72 }
73 }
74 CompositeValues::Sparse(sprase_value) => {
75 for (column_id, value) in values {
76 sprase_value.insert(*column_id, value.clone());
77 }
78 }
79 }
80 }
81}
82
83#[cfg(test)]
84impl CompositeValues {
85 pub fn into_sparse(self) -> SparseValues {
86 match self {
87 CompositeValues::Sparse(v) => v,
88 _ => panic!("CompositeValues is not sparse"),
89 }
90 }
91
92 pub fn into_dense(self) -> Vec<Value> {
93 match self {
94 CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
95 _ => panic!("CompositeValues is not dense"),
96 }
97 }
98}
99
100pub trait PrimaryKeyCodec: Send + Sync + Debug {
101 fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
103
104 fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
106
107 fn encode_value_refs(
109 &self,
110 values: &[(ColumnId, ValueRef)],
111 buffer: &mut Vec<u8>,
112 ) -> Result<()>;
113
114 fn num_fields(&self) -> Option<usize>;
116
117 fn primary_key_filter(
119 &self,
120 metadata: &RegionMetadataRef,
121 filters: Arc<Vec<SimpleFilterEvaluator>>,
122 ) -> Box<dyn PrimaryKeyFilter>;
123
124 fn estimated_size(&self) -> Option<usize> {
126 None
127 }
128
129 fn encoding(&self) -> PrimaryKeyEncoding;
131
132 fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
136
137 fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
139}
140
141pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
143 let fields = region_metadata.primary_key_columns().map(|col| {
144 (
145 col.column_id,
146 SortField::new(col.column_schema.data_type.clone()),
147 )
148 });
149 build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
150}
151
152pub fn build_primary_key_codec_with_fields(
154 encoding: PrimaryKeyEncoding,
155 fields: impl Iterator<Item = (ColumnId, SortField)>,
156) -> Arc<dyn PrimaryKeyCodec> {
157 match encoding {
158 PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
159 PrimaryKeyEncoding::Sparse => {
160 Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
161 }
162 }
163}