mod dense;
mod sparse;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
pub trait PrimaryKeyCodecExt {
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
}
pub trait PrimaryKeyFilter: Send + Sync {
fn matches(&mut self, pk: &[u8]) -> bool;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),
Sparse(SparseValues),
}
impl CompositeValues {
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
match self {
CompositeValues::Dense(dense_values) => {
for (column_id, value) in values {
dense_values.push((*column_id, value.clone()));
}
}
CompositeValues::Sparse(sprase_value) => {
for (column_id, value) in values {
sprase_value.insert(*column_id, value.clone());
}
}
}
}
}
#[cfg(test)]
impl CompositeValues {
pub fn into_sparse(self) -> SparseValues {
match self {
CompositeValues::Sparse(v) => v,
_ => panic!("CompositeValues is not sparse"),
}
}
pub fn into_dense(self) -> Vec<Value> {
match self {
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
_ => panic!("CompositeValues is not dense"),
}
}
}
pub trait PrimaryKeyCodec: Send + Sync + Debug {
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()>;
fn num_fields(&self) -> Option<usize>;
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter>;
fn estimated_size(&self) -> Option<usize> {
None
}
fn encoding(&self) -> PrimaryKeyEncoding;
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
let fields = region_metadata.primary_key_columns().map(|col| {
(
col.column_id,
SortField::new(col.column_schema.data_type.clone()),
)
});
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}
pub fn build_primary_key_codec_with_fields(
encoding: PrimaryKeyEncoding,
fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
match encoding {
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
PrimaryKeyEncoding::Sparse => {
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
}
}
}