use std::sync::Arc;
use async_trait::async_trait;
use datatypes::vectors::UInt32Vector;
use store_api::storage::TimeSeriesRowSelector;
use crate::cache::{
selector_result_cache_hit, selector_result_cache_miss, CacheStrategy, SelectorResultKey,
SelectorResultValue,
};
use crate::error::Result;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::sst::file::FileId;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
pub(crate) struct LastRowReader {
reader: BoxedBatchReader,
selector: LastRowSelector,
}
impl LastRowReader {
pub(crate) fn new(reader: BoxedBatchReader) -> Self {
Self {
reader,
selector: LastRowSelector::default(),
}
}
pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(yielded) = self.selector.on_next(batch) {
return Ok(Some(yielded));
}
}
Ok(self.selector.finish())
}
}
#[async_trait]
impl BatchReader for LastRowReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_last_row().await
}
}
pub(crate) enum RowGroupLastRowCachedReader {
Hit(LastRowCacheReader),
Miss(RowGroupLastRowReader),
}
impl RowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_strategy: CacheStrategy,
row_group_reader: RowGroupReader,
) -> Self {
let key = SelectorResultKey {
file_id,
row_group_idx,
selector: TimeSeriesRowSelector::LastRow,
};
if let Some(value) = cache_strategy.get_selector_result(&key) {
let schema_matches =
value.projection == row_group_reader.read_format().projection_indices();
if schema_matches {
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, cache_strategy)
}
} else {
Self::new_miss(key, row_group_reader, cache_strategy)
}
}
pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
match self {
RowGroupLastRowCachedReader::Hit(_) => None,
RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
}
}
fn new_hit(value: Arc<SelectorResultValue>) -> Self {
selector_result_cache_hit();
Self::Hit(LastRowCacheReader { value, idx: 0 })
}
fn new_miss(
key: SelectorResultKey,
row_group_reader: RowGroupReader,
cache_strategy: CacheStrategy,
) -> Self {
selector_result_cache_miss();
Self::Miss(RowGroupLastRowReader::new(
key,
row_group_reader,
cache_strategy,
))
}
}
#[async_trait]
impl BatchReader for RowGroupLastRowCachedReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
}
}
}
pub(crate) struct LastRowCacheReader {
value: Arc<SelectorResultValue>,
idx: usize,
}
impl LastRowCacheReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if self.idx < self.value.result.len() {
let res = Ok(Some(self.value.result[self.idx].clone()));
self.idx += 1;
res
} else {
Ok(None)
}
}
}
pub(crate) struct RowGroupLastRowReader {
key: SelectorResultKey,
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_strategy: CacheStrategy,
take_index: UInt32Vector,
}
impl RowGroupLastRowReader {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
Self {
key,
reader,
selector: LastRowSelector::default(),
yielded_batches: vec![],
cache_strategy,
take_index: UInt32Vector::from_vec(vec![0]),
}
}
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(yielded) = self.selector.on_next(batch) {
push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
return Ok(Some(yielded));
}
}
let last_batch = if let Some(last_batch) = self.selector.finish() {
push_yielded_batches(
last_batch.clone(),
&self.take_index,
&mut self.yielded_batches,
)?;
Some(last_batch)
} else {
None
};
self.maybe_update_cache();
Ok(last_batch)
}
fn maybe_update_cache(&mut self) {
if self.yielded_batches.is_empty() {
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self.reader.read_format().projection_indices().to_vec(),
});
self.cache_strategy.put_selector_result(self.key, value);
}
fn metrics(&self) -> &ReaderMetrics {
self.reader.metrics()
}
}
fn push_yielded_batches(
mut batch: Batch,
take_index: &UInt32Vector,
yielded_batches: &mut Vec<Batch>,
) -> Result<()> {
assert_eq!(1, batch.num_rows());
batch.take_in_place(take_index)?;
yielded_batches.push(batch);
Ok(())
}
#[derive(Default)]
pub struct LastRowSelector {
last_batch: Option<Batch>,
}
impl LastRowSelector {
pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
if let Some(last) = &self.last_batch {
if last.primary_key() == batch.primary_key() {
self.last_batch = Some(batch);
None
} else {
debug_assert!(!last.is_empty());
let last_row = last.slice(last.num_rows() - 1, 1);
self.last_batch = Some(batch);
Some(last_row)
}
} else {
self.last_batch = Some(batch);
None
}
}
pub fn finish(&mut self) -> Option<Batch> {
if let Some(last) = self.last_batch.take() {
let last_row = last.slice(last.num_rows() - 1, 1);
return Some(last_row);
}
None
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
async fn test_last_row_one_batch() {
let input = [new_batch(
b"k1",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[21, 22],
)];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
)
.await;
let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
)
.await;
}
#[tokio::test]
async fn test_last_row_multi_batch() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[3, 4],
&[11, 11],
&[OpType::Put, OpType::Put],
&[23, 24],
),
new_batch(
b"k2",
&[1, 2],
&[11, 11],
&[OpType::Put, OpType::Put],
&[31, 32],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
],
)
.await;
}
}