1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use futures::{Stream, TryStreamExt};
24use snafu::ResultExt;
25use store_api::storage::{FileId, TimeSeriesRowSelector};
26
27use crate::cache::{
28 CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
29 selector_result_cache_hit, selector_result_cache_miss,
30};
31use crate::error::{ComputeArrowSnafu, Result};
32use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
33use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
34use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
35use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
36use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
37use crate::sst::parquet::read_columns::ParquetReadColumns;
38use crate::sst::parquet::reader::FlatRowGroupReader;
39
40#[allow(dead_code)]
49pub(crate) struct LastRowReader {
50 reader: BoxedBatchReader,
52 selector: LastRowSelector,
54}
55
56#[allow(dead_code)]
57impl LastRowReader {
58 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
60 Self {
61 reader,
62 selector: LastRowSelector::default(),
63 }
64 }
65
66 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
68 while let Some(batch) = self.reader.next_batch().await? {
69 if let Some(yielded) = self.selector.on_next(batch) {
70 return Ok(Some(yielded));
71 }
72 }
73 Ok(self.selector.finish())
74 }
75}
76
77#[async_trait]
78impl BatchReader for LastRowReader {
79 async fn next_batch(&mut self) -> Result<Option<Batch>> {
80 self.next_last_row().await
81 }
82}
83
84#[derive(Default)]
86pub struct LastRowSelector {
87 last_batch: Option<Batch>,
88}
89
90impl LastRowSelector {
91 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
93 if let Some(last) = &self.last_batch {
94 if last.primary_key() == batch.primary_key() {
95 self.last_batch = Some(batch);
97 None
98 } else {
99 debug_assert!(!last.is_empty());
102 let last_row = last.slice(last.num_rows() - 1, 1);
103 self.last_batch = Some(batch);
104 Some(last_row)
105 }
106 } else {
107 self.last_batch = Some(batch);
108 None
109 }
110 }
111
112 pub fn finish(&mut self) -> Option<Batch> {
114 if let Some(last) = self.last_batch.take() {
115 let last_row = last.slice(last.num_rows() - 1, 1);
117 return Some(last_row);
118 }
119 None
120 }
121}
122
123pub(crate) enum FlatRowGroupLastRowCachedReader {
127 Hit(FlatLastRowCacheReader),
129 Miss(FlatRowGroupLastRowReader),
131}
132
133impl FlatRowGroupLastRowCachedReader {
134 pub(crate) fn new(
135 file_id: FileId,
136 row_group_idx: usize,
137 cache_strategy: CacheStrategy,
138 read_cols: &ParquetReadColumns,
139 reader: FlatRowGroupReader,
140 ) -> Self {
141 let key = SelectorResultKey {
142 file_id,
143 row_group_idx,
144 selector: TimeSeriesRowSelector::LastRow,
145 };
146
147 if let Some(value) = cache_strategy.get_selector_result(&key) {
148 let is_flat = matches!(&value.result, SelectorResult::Flat(_));
149 let schema_matches = value.read_cols == *read_cols;
150 if is_flat && schema_matches {
151 Self::new_hit(value)
152 } else {
153 Self::new_miss(key, read_cols, reader, cache_strategy)
154 }
155 } else {
156 Self::new_miss(key, read_cols, reader, cache_strategy)
157 }
158 }
159
160 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
162 match self {
163 FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
164 FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
165 }
166 }
167
168 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
169 selector_result_cache_hit();
170 Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
171 }
172
173 fn new_miss(
174 key: SelectorResultKey,
175 read_cols: &ParquetReadColumns,
176 reader: FlatRowGroupReader,
177 cache_strategy: CacheStrategy,
178 ) -> Self {
179 selector_result_cache_miss();
180 Self::Miss(FlatRowGroupLastRowReader::new(
181 key,
182 read_cols.clone(),
183 reader,
184 cache_strategy,
185 ))
186 }
187}
188
189pub(crate) struct FlatLastRowCacheReader {
191 value: Arc<SelectorResultValue>,
192 idx: usize,
193}
194
195impl FlatLastRowCacheReader {
196 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
197 let batches = match &self.value.result {
198 SelectorResult::Flat(batches) => batches,
199 SelectorResult::PrimaryKey(_) => unreachable!(),
200 };
201 if self.idx < batches.len() {
202 let res = Ok(Some(batches[self.idx].clone()));
203 self.idx += 1;
204 res
205 } else {
206 Ok(None)
207 }
208 }
209}
210
211pub(crate) struct BatchBuffer {
213 batches: Vec<RecordBatch>,
214 num_rows: usize,
215}
216
217impl BatchBuffer {
218 fn new() -> Self {
219 Self {
220 batches: Vec::new(),
221 num_rows: 0,
222 }
223 }
224
225 fn is_full(&self) -> bool {
227 self.num_rows >= DEFAULT_READ_BATCH_SIZE
228 }
229
230 fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
232 for batch in batches {
233 self.num_rows += batch.num_rows();
234 }
235 self.batches.extend_from_slice(batches);
236 }
237
238 fn is_empty(&self) -> bool {
240 self.batches.is_empty()
241 }
242
243 fn concat(&mut self) -> Result<RecordBatch> {
245 debug_assert!(!self.batches.is_empty());
246 let schema = self.batches[0].schema();
247 let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
248 self.batches.clear();
249 self.num_rows = 0;
250 Ok(merged)
251 }
252}
253
254pub(crate) struct FlatRowGroupLastRowReader {
256 key: SelectorResultKey,
257 reader: FlatRowGroupReader,
258 selector: FlatLastTimestampSelector,
259 yielded_batches: Vec<RecordBatch>,
260 cache_strategy: CacheStrategy,
261 read_cols: ParquetReadColumns,
262 pending: BatchBuffer,
264}
265
266impl FlatRowGroupLastRowReader {
267 fn new(
268 key: SelectorResultKey,
269 read_cols: ParquetReadColumns,
270 reader: FlatRowGroupReader,
271 cache_strategy: CacheStrategy,
272 ) -> Self {
273 Self {
274 key,
275 reader,
276 selector: FlatLastTimestampSelector::default(),
277 yielded_batches: vec![],
278 cache_strategy,
279 read_cols,
280 pending: BatchBuffer::new(),
281 }
282 }
283
284 fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
286 if self.pending.is_empty() {
287 return Ok(None);
288 }
289 let merged = self.pending.concat()?;
290 self.yielded_batches.push(merged.clone());
291 Ok(Some(merged))
292 }
293
294 async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
295 if self.pending.is_full() {
296 return self.flush_pending();
297 }
298
299 while let Some(batch) = self.reader.next_batch().await? {
300 self.selector.on_next(batch, &mut self.pending)?;
301 if self.pending.is_full() {
302 return self.flush_pending();
303 }
304 }
305
306 self.selector.finish(&mut self.pending)?;
308 if !self.pending.is_empty() {
309 let result = self.flush_pending();
310 self.maybe_update_cache();
312 return result;
313 }
314
315 self.maybe_update_cache();
317 Ok(None)
318 }
319
320 fn maybe_update_cache(&mut self) {
321 if self.yielded_batches.is_empty() {
322 return;
323 }
324 let batches = std::mem::take(&mut self.yielded_batches);
325 let value = Arc::new(SelectorResultValue::new_flat(
326 batches,
327 self.read_cols.clone(),
328 ));
329 self.cache_strategy.put_selector_result(self.key, value);
330 }
331}
332
333#[derive(Default)]
338pub(crate) struct FlatLastTimestampSelector {
339 current_key: Option<LastKeyState>,
341}
342
343#[derive(Debug)]
344struct LastKeyState {
345 key: Vec<u8>,
346 last_timestamp: i64,
347 slices: Vec<RecordBatch>,
348}
349
350impl LastKeyState {
351 fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
352 Self {
353 key,
354 last_timestamp,
355 slices: vec![first_slice],
356 }
357 }
358}
359
360impl FlatLastTimestampSelector {
361 pub(crate) fn on_next(
363 &mut self,
364 batch: RecordBatch,
365 output_buffer: &mut BatchBuffer,
366 ) -> Result<()> {
367 if batch.num_rows() == 0 {
368 return Ok(());
369 }
370
371 let num_columns = batch.num_columns();
372 let pk_col_idx = primary_key_column_index(num_columns);
373 let ts_col_idx = time_index_column_index(num_columns);
374
375 let pk_array = batch
376 .column(pk_col_idx)
377 .as_any()
378 .downcast_ref::<PrimaryKeyArray>()
379 .unwrap();
380 let offsets = primary_key_offsets(pk_array)?;
381 if offsets.is_empty() {
382 return Ok(());
383 }
384
385 let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
386 for i in 0..offsets.len() - 1 {
387 let range_start = offsets[i];
388 let range_end = offsets[i + 1];
389 let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
390 let range_last_ts = ts_values[range_end - 1];
391 let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
392 let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
393
394 match self.current_key.as_mut() {
395 Some(state) if state.key.as_slice() == range_key => {
396 if range_last_ts > state.last_timestamp {
397 state.last_timestamp = range_last_ts;
398 state.slices.clear();
399 state.slices.push(range_slice);
400 } else if range_last_ts == state.last_timestamp {
401 state.slices.push(range_slice);
402 }
403 }
404 Some(_) => {
405 self.flush_current_key(output_buffer);
406 self.current_key = Some(LastKeyState::new(
407 range_key.to_vec(),
408 range_last_ts,
409 range_slice,
410 ));
411 }
412 None => {
413 self.current_key = Some(LastKeyState::new(
414 range_key.to_vec(),
415 range_last_ts,
416 range_slice,
417 ));
418 }
419 }
420 }
421
422 Ok(())
423 }
424
425 pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
427 self.flush_current_key(output_buffer);
428 Ok(())
429 }
430
431 fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
432 let Some(state) = self.current_key.take() else {
433 return;
434 };
435 output_buffer.extend_from_slice(&state.slices);
436 }
437}
438
439pub(crate) struct FlatLastRowReader {
442 stream: BoxedRecordBatchStream,
443 selector: FlatLastTimestampSelector,
444 pending: BatchBuffer,
445}
446
447impl FlatLastRowReader {
448 pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
450 Self {
451 stream,
452 selector: FlatLastTimestampSelector::default(),
453 pending: BatchBuffer::new(),
454 }
455 }
456
457 pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
459 async_stream::try_stream! {
460 while let Some(batch) = self.stream.try_next().await? {
461 self.selector.on_next(batch, &mut self.pending)?;
462 if self.pending.is_full() {
463 yield self.pending.concat()?;
464 }
465 }
466 self.selector.finish(&mut self.pending)?;
467 if !self.pending.is_empty() {
468 yield self.pending.concat()?;
469 }
470 }
471 }
472}
473
474fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
476 let pk_dict = batch
477 .column(pk_col_idx)
478 .as_any()
479 .downcast_ref::<PrimaryKeyArray>()
480 .unwrap();
481 let key = pk_dict.keys().value(index);
482 let binary_values = pk_dict
483 .values()
484 .as_any()
485 .downcast_ref::<BinaryArray>()
486 .unwrap();
487 binary_values.value(key as usize)
488}
489
490fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
493 debug_assert!(range_start < range_end);
494
495 let last_ts = ts_values[range_end - 1];
496 let mut start = range_end - 1;
497 while start > range_start && ts_values[start - 1] == last_ts {
498 start -= 1;
499 }
500 start
501}
502
503#[cfg(test)]
504mod tests {
505 use std::sync::Arc;
506
507 use api::v1::OpType;
508 use datatypes::arrow::array::{
509 ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
510 UInt64Array,
511 };
512 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
513 use datatypes::arrow::record_batch::RecordBatch;
514
515 use super::*;
516 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
517
518 #[tokio::test]
519 async fn test_last_row_one_batch() {
520 let input = [new_batch(
521 b"k1",
522 &[1, 2],
523 &[11, 11],
524 &[OpType::Put, OpType::Put],
525 &[21, 22],
526 )];
527 let reader = VecBatchReader::new(&input);
528 let mut reader = LastRowReader::new(Box::new(reader));
529 check_reader_result(
530 &mut reader,
531 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
532 )
533 .await;
534
535 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
537 let reader = VecBatchReader::new(&input);
538 let mut reader = LastRowReader::new(Box::new(reader));
539 check_reader_result(
540 &mut reader,
541 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
542 )
543 .await;
544 }
545
546 #[tokio::test]
547 async fn test_last_row_multi_batch() {
548 let input = [
549 new_batch(
550 b"k1",
551 &[1, 2],
552 &[11, 11],
553 &[OpType::Put, OpType::Put],
554 &[21, 22],
555 ),
556 new_batch(
557 b"k1",
558 &[3, 4],
559 &[11, 11],
560 &[OpType::Put, OpType::Put],
561 &[23, 24],
562 ),
563 new_batch(
564 b"k2",
565 &[1, 2],
566 &[11, 11],
567 &[OpType::Put, OpType::Put],
568 &[31, 32],
569 ),
570 ];
571 let reader = VecBatchReader::new(&input);
572 let mut reader = LastRowReader::new(Box::new(reader));
573 check_reader_result(
574 &mut reader,
575 &[
576 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
577 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
578 ],
579 )
580 .await;
581 }
582
583 fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
585 let num_rows = timestamps.len();
586 assert_eq!(primary_keys.len(), num_rows);
587 assert_eq!(fields.len(), num_rows);
588
589 let columns: Vec<ArrayRef> = vec![
590 Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
592 Arc::new(TimestampMillisecondArray::from_iter_values(
594 timestamps.iter().copied(),
595 )),
596 {
598 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
599 for &pk in primary_keys {
600 builder.append(pk).unwrap();
601 }
602 Arc::new(builder.finish())
603 },
604 Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
606 Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
608 ];
609
610 RecordBatch::try_new(test_flat_schema(), columns).unwrap()
611 }
612
613 fn test_flat_schema() -> SchemaRef {
614 let fields = vec![
615 Field::new("field0", DataType::Int64, false),
616 Field::new(
617 "ts",
618 DataType::Timestamp(TimeUnit::Millisecond, None),
619 false,
620 ),
621 Field::new(
622 "__primary_key",
623 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
624 false,
625 ),
626 Field::new("__sequence", DataType::UInt64, false),
627 Field::new("__op_type", DataType::UInt8, false),
628 ];
629 Arc::new(Schema::new(fields))
630 }
631
632 fn collect_flat_results(
634 selector: &mut FlatLastTimestampSelector,
635 batches: Vec<RecordBatch>,
636 ) -> Vec<(Vec<u8>, i64)> {
637 let mut output_buffer = BatchBuffer::new();
638 let mut results = Vec::new();
639 for batch in batches {
640 selector.on_next(batch, &mut output_buffer).unwrap();
641 for r in output_buffer.batches.drain(..) {
642 extract_flat_rows(&r, &mut results);
643 }
644 output_buffer.num_rows = 0;
645 }
646 selector.finish(&mut output_buffer).unwrap();
647 for r in output_buffer.batches.drain(..) {
648 extract_flat_rows(&r, &mut results);
649 }
650 results
651 }
652
653 fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
655 let ts_col = batch
656 .column(1)
657 .as_any()
658 .downcast_ref::<TimestampMillisecondArray>()
659 .unwrap();
660 let pk_col = batch
661 .column(2)
662 .as_any()
663 .downcast_ref::<PrimaryKeyArray>()
664 .unwrap();
665 let binary_values = pk_col
666 .values()
667 .as_any()
668 .downcast_ref::<BinaryArray>()
669 .unwrap();
670
671 for i in 0..batch.num_rows() {
672 let key_idx = pk_col.keys().value(i);
673 let pk = binary_values.value(key_idx as usize).to_vec();
674 let ts = ts_col.value(i);
675 out.push((pk, ts));
676 }
677 }
678
679 #[test]
680 fn test_flat_single_batch_one_key() {
681 let mut selector = FlatLastTimestampSelector::default();
682 let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
683 let results = collect_flat_results(&mut selector, vec![batch]);
684 assert_eq!(vec![(b"k1".to_vec(), 3)], results);
685 }
686
687 #[test]
688 fn test_flat_single_batch_multiple_keys() {
689 let mut selector = FlatLastTimestampSelector::default();
690 let batch = new_flat_batch(
691 &[b"k1", b"k1", b"k2", b"k2", b"k3"],
692 &[1, 2, 3, 4, 5],
693 &[10, 20, 30, 40, 50],
694 );
695 let results = collect_flat_results(&mut selector, vec![batch]);
696 assert_eq!(
697 vec![
698 (b"k1".to_vec(), 2),
699 (b"k2".to_vec(), 4),
700 (b"k3".to_vec(), 5),
701 ],
702 results
703 );
704 }
705
706 #[test]
707 fn test_flat_key_spans_batches() {
708 let mut selector = FlatLastTimestampSelector::default();
709 let batches = vec![
710 new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
711 new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
712 new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
713 ];
714 let results = collect_flat_results(&mut selector, batches);
715 assert_eq!(
716 vec![
717 (b"k1".to_vec(), 3),
718 (b"k2".to_vec(), 5),
719 (b"k3".to_vec(), 6),
720 ],
721 results
722 );
723 }
724
725 #[test]
726 fn test_flat_duplicate_last_timestamps() {
727 let mut selector = FlatLastTimestampSelector::default();
728 let batch = new_flat_batch(
730 &[b"k1", b"k1", b"k1", b"k2"],
731 &[1, 3, 3, 5],
732 &[10, 20, 30, 40],
733 );
734 let results = collect_flat_results(&mut selector, vec![batch]);
735 assert_eq!(
736 vec![
737 (b"k1".to_vec(), 3),
738 (b"k1".to_vec(), 3),
739 (b"k2".to_vec(), 5),
740 ],
741 results
742 );
743 }
744
745 #[test]
746 fn test_flat_duplicate_last_timestamps_across_batches() {
747 let mut selector = FlatLastTimestampSelector::default();
748 let batches = vec![
750 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
751 new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
752 ];
753 let results = collect_flat_results(&mut selector, batches);
754 assert_eq!(
755 vec![
756 (b"k1".to_vec(), 3),
757 (b"k1".to_vec(), 3),
758 (b"k2".to_vec(), 5),
759 ],
760 results
761 );
762 }
763
764 #[test]
765 fn test_flat_pending_chain_dropped_by_higher_timestamp() {
766 let mut selector = FlatLastTimestampSelector::default();
767 let batches = vec![
768 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
769 new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
770 new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
771 ];
772 let results = collect_flat_results(&mut selector, batches);
773 assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
774 }
775
776 #[test]
777 fn test_flat_finish_is_one_shot() {
778 let mut selector = FlatLastTimestampSelector::default();
779 let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
780 let mut output_buffer = BatchBuffer::new();
781
782 selector.on_next(batch, &mut output_buffer).unwrap();
784 let mut pre_finish = Vec::new();
785 for r in output_buffer.batches.drain(..) {
786 extract_flat_rows(&r, &mut pre_finish);
787 }
788 output_buffer.num_rows = 0;
789 assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
790
791 selector.finish(&mut output_buffer).unwrap();
793 assert!(!output_buffer.is_empty());
794 output_buffer.batches.clear();
795 output_buffer.num_rows = 0;
796
797 selector.finish(&mut output_buffer).unwrap();
799 assert!(output_buffer.is_empty());
800 }
801}