1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::vectors::UInt32Vector;
21use store_api::storage::{FileId, TimeSeriesRowSelector};
22
23use crate::cache::{
24 CacheStrategy, SelectorResultKey, SelectorResultValue, selector_result_cache_hit,
25 selector_result_cache_miss,
26};
27use crate::error::Result;
28use crate::read::{Batch, BatchReader, BoxedBatchReader};
29use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
30
31pub(crate) struct LastRowReader {
40 reader: BoxedBatchReader,
42 selector: LastRowSelector,
44}
45
46impl LastRowReader {
47 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
49 Self {
50 reader,
51 selector: LastRowSelector::default(),
52 }
53 }
54
55 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
57 while let Some(batch) = self.reader.next_batch().await? {
58 if let Some(yielded) = self.selector.on_next(batch) {
59 return Ok(Some(yielded));
60 }
61 }
62 Ok(self.selector.finish())
63 }
64}
65
66#[async_trait]
67impl BatchReader for LastRowReader {
68 async fn next_batch(&mut self) -> Result<Option<Batch>> {
69 self.next_last_row().await
70 }
71}
72
73pub(crate) enum RowGroupLastRowCachedReader {
78 Hit(LastRowCacheReader),
80 Miss(RowGroupLastRowReader),
82}
83
84impl RowGroupLastRowCachedReader {
85 pub(crate) fn new(
86 file_id: FileId,
87 row_group_idx: usize,
88 cache_strategy: CacheStrategy,
89 row_group_reader: RowGroupReader,
90 ) -> Self {
91 let key = SelectorResultKey {
92 file_id,
93 row_group_idx,
94 selector: TimeSeriesRowSelector::LastRow,
95 };
96
97 if let Some(value) = cache_strategy.get_selector_result(&key) {
98 let schema_matches =
99 value.projection == row_group_reader.read_format().projection_indices();
100 if schema_matches {
101 Self::new_hit(value)
103 } else {
104 Self::new_miss(key, row_group_reader, cache_strategy)
105 }
106 } else {
107 Self::new_miss(key, row_group_reader, cache_strategy)
108 }
109 }
110
111 pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
113 match self {
114 RowGroupLastRowCachedReader::Hit(_) => None,
115 RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
116 }
117 }
118
119 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
121 selector_result_cache_hit();
122 Self::Hit(LastRowCacheReader { value, idx: 0 })
123 }
124
125 fn new_miss(
127 key: SelectorResultKey,
128 row_group_reader: RowGroupReader,
129 cache_strategy: CacheStrategy,
130 ) -> Self {
131 selector_result_cache_miss();
132 Self::Miss(RowGroupLastRowReader::new(
133 key,
134 row_group_reader,
135 cache_strategy,
136 ))
137 }
138}
139
140#[async_trait]
141impl BatchReader for RowGroupLastRowCachedReader {
142 async fn next_batch(&mut self) -> Result<Option<Batch>> {
143 match self {
144 RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
145 RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
146 }
147 }
148}
149
150pub(crate) struct LastRowCacheReader {
152 value: Arc<SelectorResultValue>,
153 idx: usize,
154}
155
156impl LastRowCacheReader {
157 async fn next_batch(&mut self) -> Result<Option<Batch>> {
159 if self.idx < self.value.result.len() {
160 let res = Ok(Some(self.value.result[self.idx].clone()));
161 self.idx += 1;
162 res
163 } else {
164 Ok(None)
165 }
166 }
167}
168
169pub(crate) struct RowGroupLastRowReader {
170 key: SelectorResultKey,
171 reader: RowGroupReader,
172 selector: LastRowSelector,
173 yielded_batches: Vec<Batch>,
174 cache_strategy: CacheStrategy,
175 take_index: UInt32Vector,
177}
178
179impl RowGroupLastRowReader {
180 fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
181 Self {
182 key,
183 reader,
184 selector: LastRowSelector::default(),
185 yielded_batches: vec![],
186 cache_strategy,
187 take_index: UInt32Vector::from_vec(vec![0]),
188 }
189 }
190
191 async fn next_batch(&mut self) -> Result<Option<Batch>> {
192 while let Some(batch) = self.reader.next_batch().await? {
193 if let Some(yielded) = self.selector.on_next(batch) {
194 push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
195 return Ok(Some(yielded));
196 }
197 }
198 let last_batch = if let Some(last_batch) = self.selector.finish() {
199 push_yielded_batches(
200 last_batch.clone(),
201 &self.take_index,
202 &mut self.yielded_batches,
203 )?;
204 Some(last_batch)
205 } else {
206 None
207 };
208
209 self.maybe_update_cache();
211 Ok(last_batch)
212 }
213
214 fn maybe_update_cache(&mut self) {
216 if self.yielded_batches.is_empty() {
217 return;
219 }
220 let value = Arc::new(SelectorResultValue {
221 result: std::mem::take(&mut self.yielded_batches),
222 projection: self.reader.read_format().projection_indices().to_vec(),
223 });
224 self.cache_strategy.put_selector_result(self.key, value);
225 }
226
227 fn metrics(&self) -> &ReaderMetrics {
228 self.reader.metrics()
229 }
230}
231
232fn push_yielded_batches(
234 mut batch: Batch,
235 take_index: &UInt32Vector,
236 yielded_batches: &mut Vec<Batch>,
237) -> Result<()> {
238 assert_eq!(1, batch.num_rows());
239 batch.take_in_place(take_index)?;
240 yielded_batches.push(batch);
241
242 Ok(())
243}
244
245#[derive(Default)]
247pub struct LastRowSelector {
248 last_batch: Option<Batch>,
249}
250
251impl LastRowSelector {
252 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
254 if let Some(last) = &self.last_batch {
255 if last.primary_key() == batch.primary_key() {
256 self.last_batch = Some(batch);
258 None
259 } else {
260 debug_assert!(!last.is_empty());
263 let last_row = last.slice(last.num_rows() - 1, 1);
264 self.last_batch = Some(batch);
265 Some(last_row)
266 }
267 } else {
268 self.last_batch = Some(batch);
269 None
270 }
271 }
272
273 pub fn finish(&mut self) -> Option<Batch> {
275 if let Some(last) = self.last_batch.take() {
276 let last_row = last.slice(last.num_rows() - 1, 1);
278 return Some(last_row);
279 }
280 None
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use api::v1::OpType;
287
288 use super::*;
289 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
290
291 #[tokio::test]
292 async fn test_last_row_one_batch() {
293 let input = [new_batch(
294 b"k1",
295 &[1, 2],
296 &[11, 11],
297 &[OpType::Put, OpType::Put],
298 &[21, 22],
299 )];
300 let reader = VecBatchReader::new(&input);
301 let mut reader = LastRowReader::new(Box::new(reader));
302 check_reader_result(
303 &mut reader,
304 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
305 )
306 .await;
307
308 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
310 let reader = VecBatchReader::new(&input);
311 let mut reader = LastRowReader::new(Box::new(reader));
312 check_reader_result(
313 &mut reader,
314 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
315 )
316 .await;
317 }
318
319 #[tokio::test]
320 async fn test_last_row_multi_batch() {
321 let input = [
322 new_batch(
323 b"k1",
324 &[1, 2],
325 &[11, 11],
326 &[OpType::Put, OpType::Put],
327 &[21, 22],
328 ),
329 new_batch(
330 b"k1",
331 &[3, 4],
332 &[11, 11],
333 &[OpType::Put, OpType::Put],
334 &[23, 24],
335 ),
336 new_batch(
337 b"k2",
338 &[1, 2],
339 &[11, 11],
340 &[OpType::Put, OpType::Put],
341 &[31, 32],
342 ),
343 ];
344 let reader = VecBatchReader::new(&input);
345 let mut reader = LastRowReader::new(Box::new(reader));
346 check_reader_result(
347 &mut reader,
348 &[
349 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
350 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
351 ],
352 )
353 .await;
354 }
355}