use std::collections::HashMap;
use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
#[derive(Debug)]
pub struct KeyValues {
pub(crate) mutation: Mutation,
helper: SparseReadRowHelper,
primary_key_encoding: PrimaryKeyEncoding,
}
impl KeyValues {
pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
let rows = mutation.rows.as_ref()?;
let primary_key_encoding =
infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
Some(KeyValues {
mutation,
helper,
primary_key_encoding,
})
}
pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
let rows = self.mutation.rows.as_ref().unwrap();
let schema = &rows.schema;
rows.rows.iter().enumerate().map(|(idx, row)| {
KeyValue {
row,
schema,
helper: &self.helper,
sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
primary_key_encoding: self.primary_key_encoding,
}
})
}
pub fn num_rows(&self) -> usize {
self.mutation.rows.as_ref().unwrap().rows.len()
}
pub fn is_empty(&self) -> bool {
self.mutation.rows.is_none()
}
pub fn max_sequence(&self) -> SequenceNumber {
let mut sequence = self.mutation.sequence;
let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
sequence += num_rows;
if num_rows > 0 {
sequence -= 1;
}
sequence
}
}
#[derive(Debug)]
pub struct KeyValuesRef<'a> {
mutation: &'a Mutation,
helper: SparseReadRowHelper,
primary_key_encoding: PrimaryKeyEncoding,
}
impl<'a> KeyValuesRef<'a> {
pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
let rows = mutation.rows.as_ref()?;
let primary_key_encoding =
infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
Some(KeyValuesRef {
mutation,
helper,
primary_key_encoding,
})
}
pub fn iter(&self) -> impl Iterator<Item = KeyValue> {
let rows = self.mutation.rows.as_ref().unwrap();
let schema = &rows.schema;
rows.rows.iter().enumerate().map(|(idx, row)| {
KeyValue {
row,
schema,
helper: &self.helper,
sequence: self.mutation.sequence + idx as u64, op_type: OpType::try_from(self.mutation.op_type).unwrap(),
primary_key_encoding: self.primary_key_encoding,
}
})
}
pub fn num_rows(&self) -> usize {
self.mutation.rows.as_ref().unwrap().rows.len()
}
}
#[derive(Debug, Clone, Copy)]
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,
helper: &'a SparseReadRowHelper,
sequence: SequenceNumber,
op_type: OpType,
primary_key_encoding: PrimaryKeyEncoding,
}
impl KeyValue<'_> {
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
self.primary_key_encoding
}
pub fn partition_key(&self) -> u32 {
if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
let Some(primary_key) = self.primary_keys().next() else {
return 0;
};
let key = primary_key.as_binary().unwrap().unwrap();
let mut deserializer = Deserializer::new(key);
deserializer.advance(COLUMN_ID_ENCODE_SIZE);
let field = SortField::new(ConcreteDataType::uint32_datatype());
let table_id = field.deserialize(&mut deserializer).unwrap();
table_id.as_value_ref().as_u32().unwrap().unwrap()
} else {
let Some(value) = self.primary_keys().next() else {
return 0;
};
value.as_u32().unwrap().unwrap()
}
}
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[..self.helper.num_primary_key_column]
.iter()
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}
pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[self.helper.num_primary_key_column + 1..]
.iter()
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}
pub fn timestamp(&self) -> ValueRef {
let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
api::helper::pb_value_to_value_ref(
&self.row.values[index],
&self.schema[index].datatype_extension,
)
}
pub fn num_primary_keys(&self) -> usize {
self.helper.num_primary_key_column
}
pub fn num_fields(&self) -> usize {
self.helper.indices.len() - self.helper.num_primary_key_column - 1
}
pub fn sequence(&self) -> SequenceNumber {
self.sequence
}
pub fn op_type(&self) -> OpType {
self.op_type
}
}
#[derive(Debug)]
struct SparseReadRowHelper {
indices: Vec<Option<usize>>,
num_primary_key_column: usize,
}
impl SparseReadRowHelper {
fn new(
metadata: &RegionMetadata,
rows: &Rows,
primary_key_encoding: PrimaryKeyEncoding,
) -> SparseReadRowHelper {
if primary_key_encoding == PrimaryKeyEncoding::Sparse {
let indices = rows
.schema
.iter()
.enumerate()
.map(|(index, _)| Some(index))
.collect();
return SparseReadRowHelper {
indices,
num_primary_key_column: 1,
};
}
let name_to_index: HashMap<_, _> = rows
.schema
.iter()
.enumerate()
.map(|(index, col)| (&col.column_name, index))
.collect();
let mut indices = Vec::with_capacity(metadata.column_metadatas.len());
for pk_column_id in &metadata.primary_key {
let column = metadata.column_by_id(*pk_column_id).unwrap();
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}
let ts_index = name_to_index
.get(&metadata.time_index_column().column_schema.name)
.unwrap();
indices.push(Some(*ts_index));
for column in metadata.field_columns() {
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}
SparseReadRowHelper {
indices,
num_primary_key_column: metadata.primary_key.len(),
}
}
}
#[cfg(test)]
mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::test_util::i64_value;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;
fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
TestRegionMetadataBuilder::default()
.ts_name(TS_NAME)
.num_tags(num_tags)
.num_fields(num_fields)
.build()
}
fn new_rows(column_names: &[&str], num_rows: usize) -> Rows {
let mut rows = Vec::with_capacity(num_rows);
for _ in 0..num_rows {
let values: Vec<_> = (0..column_names.len())
.map(|idx| i64_value(idx as i64))
.collect();
rows.push(Row { values });
}
let schema = column_names
.iter()
.map(|column_name| {
let datatype = if *column_name == TS_NAME {
ColumnDataType::TimestampMillisecond as i32
} else {
ColumnDataType::Int64 as i32
};
let semantic_type = if column_name.starts_with('k') {
SemanticType::Tag as i32
} else if column_name.starts_with('v') {
SemanticType::Field as i32
} else {
SemanticType::Timestamp as i32
};
v1::ColumnSchema {
column_name: column_name.to_string(),
datatype,
semantic_type,
..Default::default()
}
})
.collect();
Rows { rows, schema }
}
fn new_mutation(column_names: &[&str], num_rows: usize) -> Mutation {
let rows = new_rows(column_names, num_rows);
Mutation {
op_type: OpType::Put as i32,
sequence: START_SEQ,
rows: Some(rows),
write_hint: None,
}
}
fn check_key_values(
kvs: &KeyValues,
num_rows: usize,
keys: &[Option<i64>],
ts: i64,
values: &[Option<i64>],
) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
let expect_ts = ValueRef::Int64(ts);
for kv in kvs.iter() {
assert_eq!(expect_seq, kv.sequence());
expect_seq += 1;
assert_eq!(OpType::Put, kv.op_type);
assert_eq!(keys.len(), kv.num_primary_keys());
assert_eq!(values.len(), kv.num_fields());
assert_eq!(expect_ts, kv.timestamp());
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
let actual_keys: Vec<_> = kv.primary_keys().collect();
assert_eq!(expect_keys, actual_keys);
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
let actual_values: Vec<_> = kv.fields().collect();
assert_eq!(expect_values, actual_values);
}
}
#[test]
fn test_empty_key_values() {
let meta = new_region_metadata(1, 1);
let mutation = Mutation {
op_type: OpType::Put as i32,
sequence: 100,
rows: None,
write_hint: None,
};
let kvs = KeyValues::new(&meta, mutation);
assert!(kvs.is_none());
}
#[test]
fn test_ts_only() {
let meta = new_region_metadata(0, 0);
let mutation = new_mutation(&["ts"], 2);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 2, &[], 0, &[]);
}
#[test]
fn test_no_field() {
let meta = new_region_metadata(2, 0);
let mutation = new_mutation(&["k1", "ts", "k0"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
}
#[test]
fn test_no_tag() {
let meta = new_region_metadata(0, 2);
let mutation = new_mutation(&["v1", "v0", "ts"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
}
#[test]
fn test_tag_field() {
let meta = new_region_metadata(2, 2);
let mutation = new_mutation(&["k0", "v0", "ts", "k1", "v1"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
}
#[test]
fn test_sparse_field() {
let meta = new_region_metadata(2, 2);
let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
}
#[test]
fn test_sparse_tag_field() {
let meta = new_region_metadata(2, 2);
let mutation = new_mutation(&["k0", "v0", "ts"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
}
}