1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::vectors::UInt32Vector;
21use store_api::storage::TimeSeriesRowSelector;
22
23use crate::cache::{
24 selector_result_cache_hit, selector_result_cache_miss, CacheStrategy, SelectorResultKey,
25 SelectorResultValue,
26};
27use crate::error::Result;
28use crate::read::{Batch, BatchReader, BoxedBatchReader};
29use crate::sst::file::FileId;
30use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
31
32pub(crate) struct LastRowReader {
41 reader: BoxedBatchReader,
43 selector: LastRowSelector,
45}
46
47impl LastRowReader {
48 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
50 Self {
51 reader,
52 selector: LastRowSelector::default(),
53 }
54 }
55
56 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
58 while let Some(batch) = self.reader.next_batch().await? {
59 if let Some(yielded) = self.selector.on_next(batch) {
60 return Ok(Some(yielded));
61 }
62 }
63 Ok(self.selector.finish())
64 }
65}
66
67#[async_trait]
68impl BatchReader for LastRowReader {
69 async fn next_batch(&mut self) -> Result<Option<Batch>> {
70 self.next_last_row().await
71 }
72}
73
74pub(crate) enum RowGroupLastRowCachedReader {
79 Hit(LastRowCacheReader),
81 Miss(RowGroupLastRowReader),
83}
84
85impl RowGroupLastRowCachedReader {
86 pub(crate) fn new(
87 file_id: FileId,
88 row_group_idx: usize,
89 cache_strategy: CacheStrategy,
90 row_group_reader: RowGroupReader,
91 ) -> Self {
92 let key = SelectorResultKey {
93 file_id,
94 row_group_idx,
95 selector: TimeSeriesRowSelector::LastRow,
96 };
97
98 if let Some(value) = cache_strategy.get_selector_result(&key) {
99 let schema_matches =
100 value.projection == row_group_reader.read_format().projection_indices();
101 if schema_matches {
102 Self::new_hit(value)
104 } else {
105 Self::new_miss(key, row_group_reader, cache_strategy)
106 }
107 } else {
108 Self::new_miss(key, row_group_reader, cache_strategy)
109 }
110 }
111
112 pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
114 match self {
115 RowGroupLastRowCachedReader::Hit(_) => None,
116 RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
117 }
118 }
119
120 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
122 selector_result_cache_hit();
123 Self::Hit(LastRowCacheReader { value, idx: 0 })
124 }
125
126 fn new_miss(
128 key: SelectorResultKey,
129 row_group_reader: RowGroupReader,
130 cache_strategy: CacheStrategy,
131 ) -> Self {
132 selector_result_cache_miss();
133 Self::Miss(RowGroupLastRowReader::new(
134 key,
135 row_group_reader,
136 cache_strategy,
137 ))
138 }
139}
140
141#[async_trait]
142impl BatchReader for RowGroupLastRowCachedReader {
143 async fn next_batch(&mut self) -> Result<Option<Batch>> {
144 match self {
145 RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
146 RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
147 }
148 }
149}
150
151pub(crate) struct LastRowCacheReader {
153 value: Arc<SelectorResultValue>,
154 idx: usize,
155}
156
157impl LastRowCacheReader {
158 async fn next_batch(&mut self) -> Result<Option<Batch>> {
160 if self.idx < self.value.result.len() {
161 let res = Ok(Some(self.value.result[self.idx].clone()));
162 self.idx += 1;
163 res
164 } else {
165 Ok(None)
166 }
167 }
168}
169
170pub(crate) struct RowGroupLastRowReader {
171 key: SelectorResultKey,
172 reader: RowGroupReader,
173 selector: LastRowSelector,
174 yielded_batches: Vec<Batch>,
175 cache_strategy: CacheStrategy,
176 take_index: UInt32Vector,
178}
179
180impl RowGroupLastRowReader {
181 fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
182 Self {
183 key,
184 reader,
185 selector: LastRowSelector::default(),
186 yielded_batches: vec![],
187 cache_strategy,
188 take_index: UInt32Vector::from_vec(vec![0]),
189 }
190 }
191
192 async fn next_batch(&mut self) -> Result<Option<Batch>> {
193 while let Some(batch) = self.reader.next_batch().await? {
194 if let Some(yielded) = self.selector.on_next(batch) {
195 push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
196 return Ok(Some(yielded));
197 }
198 }
199 let last_batch = if let Some(last_batch) = self.selector.finish() {
200 push_yielded_batches(
201 last_batch.clone(),
202 &self.take_index,
203 &mut self.yielded_batches,
204 )?;
205 Some(last_batch)
206 } else {
207 None
208 };
209
210 self.maybe_update_cache();
212 Ok(last_batch)
213 }
214
215 fn maybe_update_cache(&mut self) {
217 if self.yielded_batches.is_empty() {
218 return;
220 }
221 let value = Arc::new(SelectorResultValue {
222 result: std::mem::take(&mut self.yielded_batches),
223 projection: self.reader.read_format().projection_indices().to_vec(),
224 });
225 self.cache_strategy.put_selector_result(self.key, value);
226 }
227
228 fn metrics(&self) -> &ReaderMetrics {
229 self.reader.metrics()
230 }
231}
232
233fn push_yielded_batches(
235 mut batch: Batch,
236 take_index: &UInt32Vector,
237 yielded_batches: &mut Vec<Batch>,
238) -> Result<()> {
239 assert_eq!(1, batch.num_rows());
240 batch.take_in_place(take_index)?;
241 yielded_batches.push(batch);
242
243 Ok(())
244}
245
246#[derive(Default)]
248pub struct LastRowSelector {
249 last_batch: Option<Batch>,
250}
251
252impl LastRowSelector {
253 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
255 if let Some(last) = &self.last_batch {
256 if last.primary_key() == batch.primary_key() {
257 self.last_batch = Some(batch);
259 None
260 } else {
261 debug_assert!(!last.is_empty());
264 let last_row = last.slice(last.num_rows() - 1, 1);
265 self.last_batch = Some(batch);
266 Some(last_row)
267 }
268 } else {
269 self.last_batch = Some(batch);
270 None
271 }
272 }
273
274 pub fn finish(&mut self) -> Option<Batch> {
276 if let Some(last) = self.last_batch.take() {
277 let last_row = last.slice(last.num_rows() - 1, 1);
279 return Some(last_row);
280 }
281 None
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use api::v1::OpType;
288
289 use super::*;
290 use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
291
292 #[tokio::test]
293 async fn test_last_row_one_batch() {
294 let input = [new_batch(
295 b"k1",
296 &[1, 2],
297 &[11, 11],
298 &[OpType::Put, OpType::Put],
299 &[21, 22],
300 )];
301 let reader = VecBatchReader::new(&input);
302 let mut reader = LastRowReader::new(Box::new(reader));
303 check_reader_result(
304 &mut reader,
305 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
306 )
307 .await;
308
309 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
311 let reader = VecBatchReader::new(&input);
312 let mut reader = LastRowReader::new(Box::new(reader));
313 check_reader_result(
314 &mut reader,
315 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
316 )
317 .await;
318 }
319
320 #[tokio::test]
321 async fn test_last_row_multi_batch() {
322 let input = [
323 new_batch(
324 b"k1",
325 &[1, 2],
326 &[11, 11],
327 &[OpType::Put, OpType::Put],
328 &[21, 22],
329 ),
330 new_batch(
331 b"k1",
332 &[3, 4],
333 &[11, 11],
334 &[OpType::Put, OpType::Put],
335 &[23, 24],
336 ),
337 new_batch(
338 b"k2",
339 &[1, 2],
340 &[11, 11],
341 &[OpType::Put, OpType::Put],
342 &[31, 32],
343 ),
344 ];
345 let reader = VecBatchReader::new(&input);
346 let mut reader = LastRowReader::new(Box::new(reader));
347 check_reader_result(
348 &mut reader,
349 &[
350 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
351 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
352 ],
353 )
354 .await;
355 }
356}