1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use futures::{Stream, TryStreamExt};
25use snafu::ResultExt;
26use store_api::storage::{FileId, TimeSeriesRowSelector};
27
28use crate::cache::{
29 CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
30 selector_result_cache_hit, selector_result_cache_miss,
31};
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
34use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
35use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
36use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
37use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
38use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
39
40pub(crate) struct LastRowReader {
49 reader: BoxedBatchReader,
51 selector: LastRowSelector,
53}
54
55impl LastRowReader {
56 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
58 Self {
59 reader,
60 selector: LastRowSelector::default(),
61 }
62 }
63
64 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
66 while let Some(batch) = self.reader.next_batch().await? {
67 if let Some(yielded) = self.selector.on_next(batch) {
68 return Ok(Some(yielded));
69 }
70 }
71 Ok(self.selector.finish())
72 }
73}
74
75#[async_trait]
76impl BatchReader for LastRowReader {
77 async fn next_batch(&mut self) -> Result<Option<Batch>> {
78 self.next_last_row().await
79 }
80}
81
82pub(crate) enum RowGroupLastRowCachedReader {
87 Hit(LastRowCacheReader),
89 Miss(RowGroupLastRowReader),
91}
92
93impl RowGroupLastRowCachedReader {
94 pub(crate) fn new(
95 file_id: FileId,
96 row_group_idx: usize,
97 cache_strategy: CacheStrategy,
98 row_group_reader: RowGroupReader,
99 ) -> Self {
100 let key = SelectorResultKey {
101 file_id,
102 row_group_idx,
103 selector: TimeSeriesRowSelector::LastRow,
104 };
105
106 if let Some(value) = cache_strategy.get_selector_result(&key) {
107 let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
108 let schema_matches =
109 value.projection == row_group_reader.read_format().projection_indices();
110 if is_primary_key && schema_matches {
111 Self::new_hit(value)
113 } else {
114 Self::new_miss(key, row_group_reader, cache_strategy)
115 }
116 } else {
117 Self::new_miss(key, row_group_reader, cache_strategy)
118 }
119 }
120
121 pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
123 match self {
124 RowGroupLastRowCachedReader::Hit(_) => None,
125 RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
126 }
127 }
128
129 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
131 selector_result_cache_hit();
132 Self::Hit(LastRowCacheReader { value, idx: 0 })
133 }
134
135 fn new_miss(
137 key: SelectorResultKey,
138 row_group_reader: RowGroupReader,
139 cache_strategy: CacheStrategy,
140 ) -> Self {
141 selector_result_cache_miss();
142 Self::Miss(RowGroupLastRowReader::new(
143 key,
144 row_group_reader,
145 cache_strategy,
146 ))
147 }
148}
149
150#[async_trait]
151impl BatchReader for RowGroupLastRowCachedReader {
152 async fn next_batch(&mut self) -> Result<Option<Batch>> {
153 match self {
154 RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
155 RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
156 }
157 }
158}
159
160pub(crate) struct LastRowCacheReader {
162 value: Arc<SelectorResultValue>,
163 idx: usize,
164}
165
166impl LastRowCacheReader {
167 async fn next_batch(&mut self) -> Result<Option<Batch>> {
169 let batches = match &self.value.result {
170 SelectorResult::PrimaryKey(batches) => batches,
171 SelectorResult::Flat(_) => unreachable!(),
172 };
173 if self.idx < batches.len() {
174 let res = Ok(Some(batches[self.idx].clone()));
175 self.idx += 1;
176 res
177 } else {
178 Ok(None)
179 }
180 }
181}
182
183pub(crate) struct RowGroupLastRowReader {
184 key: SelectorResultKey,
185 reader: RowGroupReader,
186 selector: LastRowSelector,
187 yielded_batches: Vec<Batch>,
188 cache_strategy: CacheStrategy,
189 take_index: UInt32Vector,
191}
192
193impl RowGroupLastRowReader {
194 fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
195 Self {
196 key,
197 reader,
198 selector: LastRowSelector::default(),
199 yielded_batches: vec![],
200 cache_strategy,
201 take_index: UInt32Vector::from_vec(vec![0]),
202 }
203 }
204
205 async fn next_batch(&mut self) -> Result<Option<Batch>> {
206 while let Some(batch) = self.reader.next_batch().await? {
207 if let Some(yielded) = self.selector.on_next(batch) {
208 push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
209 return Ok(Some(yielded));
210 }
211 }
212 let last_batch = if let Some(last_batch) = self.selector.finish() {
213 push_yielded_batches(
214 last_batch.clone(),
215 &self.take_index,
216 &mut self.yielded_batches,
217 )?;
218 Some(last_batch)
219 } else {
220 None
221 };
222
223 self.maybe_update_cache();
225 Ok(last_batch)
226 }
227
228 fn maybe_update_cache(&mut self) {
230 if self.yielded_batches.is_empty() {
231 return;
233 }
234 let value = Arc::new(SelectorResultValue::new(
235 std::mem::take(&mut self.yielded_batches),
236 self.reader.read_format().projection_indices().to_vec(),
237 ));
238 self.cache_strategy.put_selector_result(self.key, value);
239 }
240
241 fn metrics(&self) -> &ReaderMetrics {
242 self.reader.metrics()
243 }
244}
245
246fn push_yielded_batches(
248 mut batch: Batch,
249 take_index: &UInt32Vector,
250 yielded_batches: &mut Vec<Batch>,
251) -> Result<()> {
252 assert_eq!(1, batch.num_rows());
253 batch.take_in_place(take_index)?;
254 yielded_batches.push(batch);
255
256 Ok(())
257}
258
259#[derive(Default)]
261pub struct LastRowSelector {
262 last_batch: Option<Batch>,
263}
264
265impl LastRowSelector {
266 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
268 if let Some(last) = &self.last_batch {
269 if last.primary_key() == batch.primary_key() {
270 self.last_batch = Some(batch);
272 None
273 } else {
274 debug_assert!(!last.is_empty());
277 let last_row = last.slice(last.num_rows() - 1, 1);
278 self.last_batch = Some(batch);
279 Some(last_row)
280 }
281 } else {
282 self.last_batch = Some(batch);
283 None
284 }
285 }
286
287 pub fn finish(&mut self) -> Option<Batch> {
289 if let Some(last) = self.last_batch.take() {
290 let last_row = last.slice(last.num_rows() - 1, 1);
292 return Some(last_row);
293 }
294 None
295 }
296}
297
298pub(crate) enum FlatRowGroupLastRowCachedReader {
302 Hit(FlatLastRowCacheReader),
304 Miss(FlatRowGroupLastRowReader),
306}
307
308impl FlatRowGroupLastRowCachedReader {
309 pub(crate) fn new(
310 file_id: FileId,
311 row_group_idx: usize,
312 cache_strategy: CacheStrategy,
313 projection: &[usize],
314 reader: FlatRowGroupReader,
315 ) -> Self {
316 let key = SelectorResultKey {
317 file_id,
318 row_group_idx,
319 selector: TimeSeriesRowSelector::LastRow,
320 };
321
322 if let Some(value) = cache_strategy.get_selector_result(&key) {
323 let is_flat = matches!(&value.result, SelectorResult::Flat(_));
324 let schema_matches = value.projection == projection;
325 if is_flat && schema_matches {
326 Self::new_hit(value)
327 } else {
328 Self::new_miss(key, projection, reader, cache_strategy)
329 }
330 } else {
331 Self::new_miss(key, projection, reader, cache_strategy)
332 }
333 }
334
335 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
337 match self {
338 FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
339 FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
340 }
341 }
342
343 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
344 selector_result_cache_hit();
345 Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
346 }
347
348 fn new_miss(
349 key: SelectorResultKey,
350 projection: &[usize],
351 reader: FlatRowGroupReader,
352 cache_strategy: CacheStrategy,
353 ) -> Self {
354 selector_result_cache_miss();
355 Self::Miss(FlatRowGroupLastRowReader::new(
356 key,
357 projection.to_vec(),
358 reader,
359 cache_strategy,
360 ))
361 }
362}
363
364pub(crate) struct FlatLastRowCacheReader {
366 value: Arc<SelectorResultValue>,
367 idx: usize,
368}
369
370impl FlatLastRowCacheReader {
371 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
372 let batches = match &self.value.result {
373 SelectorResult::Flat(batches) => batches,
374 SelectorResult::PrimaryKey(_) => unreachable!(),
375 };
376 if self.idx < batches.len() {
377 let res = Ok(Some(batches[self.idx].clone()));
378 self.idx += 1;
379 res
380 } else {
381 Ok(None)
382 }
383 }
384}
385
386pub(crate) struct BatchBuffer {
388 batches: Vec<RecordBatch>,
389 num_rows: usize,
390}
391
392impl BatchBuffer {
393 fn new() -> Self {
394 Self {
395 batches: Vec::new(),
396 num_rows: 0,
397 }
398 }
399
400 fn is_full(&self) -> bool {
402 self.num_rows >= DEFAULT_READ_BATCH_SIZE
403 }
404
405 fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
407 for batch in batches {
408 self.num_rows += batch.num_rows();
409 }
410 self.batches.extend_from_slice(batches);
411 }
412
413 fn is_empty(&self) -> bool {
415 self.batches.is_empty()
416 }
417
418 fn concat(&mut self) -> Result<RecordBatch> {
420 debug_assert!(!self.batches.is_empty());
421 let schema = self.batches[0].schema();
422 let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
423 self.batches.clear();
424 self.num_rows = 0;
425 Ok(merged)
426 }
427}
428
429pub(crate) struct FlatRowGroupLastRowReader {
431 key: SelectorResultKey,
432 reader: FlatRowGroupReader,
433 selector: FlatLastTimestampSelector,
434 yielded_batches: Vec<RecordBatch>,
435 cache_strategy: CacheStrategy,
436 projection: Vec<usize>,
437 pending: BatchBuffer,
439}
440
441impl FlatRowGroupLastRowReader {
442 fn new(
443 key: SelectorResultKey,
444 projection: Vec<usize>,
445 reader: FlatRowGroupReader,
446 cache_strategy: CacheStrategy,
447 ) -> Self {
448 Self {
449 key,
450 reader,
451 selector: FlatLastTimestampSelector::default(),
452 yielded_batches: vec![],
453 cache_strategy,
454 projection,
455 pending: BatchBuffer::new(),
456 }
457 }
458
459 fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
461 if self.pending.is_empty() {
462 return Ok(None);
463 }
464 let merged = self.pending.concat()?;
465 self.yielded_batches.push(merged.clone());
466 Ok(Some(merged))
467 }
468
469 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
470 if self.pending.is_full() {
471 return self.flush_pending();
472 }
473
474 while let Some(batch) = self.reader.next_batch()? {
475 self.selector.on_next(batch, &mut self.pending)?;
476 if self.pending.is_full() {
477 return self.flush_pending();
478 }
479 }
480
481 self.selector.finish(&mut self.pending)?;
483 if !self.pending.is_empty() {
484 let result = self.flush_pending();
485 self.maybe_update_cache();
487 return result;
488 }
489
490 self.maybe_update_cache();
492 Ok(None)
493 }
494
495 fn maybe_update_cache(&mut self) {
496 if self.yielded_batches.is_empty() {
497 return;
498 }
499 let batches = std::mem::take(&mut self.yielded_batches);
500 let value = Arc::new(SelectorResultValue::new_flat(
501 batches,
502 self.projection.clone(),
503 ));
504 self.cache_strategy.put_selector_result(self.key, value);
505 }
506}
507
508#[derive(Default)]
513pub(crate) struct FlatLastTimestampSelector {
514 current_key: Option<LastKeyState>,
516}
517
518#[derive(Debug)]
519struct LastKeyState {
520 key: Vec<u8>,
521 last_timestamp: i64,
522 slices: Vec<RecordBatch>,
523}
524
525impl LastKeyState {
526 fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
527 Self {
528 key,
529 last_timestamp,
530 slices: vec![first_slice],
531 }
532 }
533}
534
535impl FlatLastTimestampSelector {
536 pub(crate) fn on_next(
538 &mut self,
539 batch: RecordBatch,
540 output_buffer: &mut BatchBuffer,
541 ) -> Result<()> {
542 if batch.num_rows() == 0 {
543 return Ok(());
544 }
545
546 let num_columns = batch.num_columns();
547 let pk_col_idx = primary_key_column_index(num_columns);
548 let ts_col_idx = time_index_column_index(num_columns);
549
550 let pk_array = batch
551 .column(pk_col_idx)
552 .as_any()
553 .downcast_ref::<PrimaryKeyArray>()
554 .unwrap();
555 let offsets = primary_key_offsets(pk_array)?;
556 if offsets.is_empty() {
557 return Ok(());
558 }
559
560 let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
561 for i in 0..offsets.len() - 1 {
562 let range_start = offsets[i];
563 let range_end = offsets[i + 1];
564 let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
565 let range_last_ts = ts_values[range_end - 1];
566 let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
567 let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
568
569 match self.current_key.as_mut() {
570 Some(state) if state.key.as_slice() == range_key => {
571 if range_last_ts > state.last_timestamp {
572 state.last_timestamp = range_last_ts;
573 state.slices.clear();
574 state.slices.push(range_slice);
575 } else if range_last_ts == state.last_timestamp {
576 state.slices.push(range_slice);
577 }
578 }
579 Some(_) => {
580 self.flush_current_key(output_buffer);
581 self.current_key = Some(LastKeyState::new(
582 range_key.to_vec(),
583 range_last_ts,
584 range_slice,
585 ));
586 }
587 None => {
588 self.current_key = Some(LastKeyState::new(
589 range_key.to_vec(),
590 range_last_ts,
591 range_slice,
592 ));
593 }
594 }
595 }
596
597 Ok(())
598 }
599
600 pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
602 self.flush_current_key(output_buffer);
603 Ok(())
604 }
605
606 fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
607 let Some(state) = self.current_key.take() else {
608 return;
609 };
610 output_buffer.extend_from_slice(&state.slices);
611 }
612}
613
614pub(crate) struct FlatLastRowReader {
617 stream: BoxedRecordBatchStream,
618 selector: FlatLastTimestampSelector,
619 pending: BatchBuffer,
620}
621
622impl FlatLastRowReader {
623 pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
625 Self {
626 stream,
627 selector: FlatLastTimestampSelector::default(),
628 pending: BatchBuffer::new(),
629 }
630 }
631
632 pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
634 async_stream::try_stream! {
635 while let Some(batch) = self.stream.try_next().await? {
636 self.selector.on_next(batch, &mut self.pending)?;
637 if self.pending.is_full() {
638 yield self.pending.concat()?;
639 }
640 }
641 self.selector.finish(&mut self.pending)?;
642 if !self.pending.is_empty() {
643 yield self.pending.concat()?;
644 }
645 }
646 }
647}
648
649fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
651 let pk_dict = batch
652 .column(pk_col_idx)
653 .as_any()
654 .downcast_ref::<PrimaryKeyArray>()
655 .unwrap();
656 let key = pk_dict.keys().value(index);
657 let binary_values = pk_dict
658 .values()
659 .as_any()
660 .downcast_ref::<BinaryArray>()
661 .unwrap();
662 binary_values.value(key as usize)
663}
664
665fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
668 debug_assert!(range_start < range_end);
669
670 let last_ts = ts_values[range_end - 1];
671 let mut start = range_end - 1;
672 while start > range_start && ts_values[start - 1] == last_ts {
673 start -= 1;
674 }
675 start
676}
677
678#[cfg(test)]
679mod tests {
680 use std::sync::Arc;
681
682 use api::v1::OpType;
683 use datatypes::arrow::array::{
684 ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
685 UInt64Array,
686 };
687 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
688 use datatypes::arrow::record_batch::RecordBatch;
689
690 use super::*;
691 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
692
693 #[tokio::test]
694 async fn test_last_row_one_batch() {
695 let input = [new_batch(
696 b"k1",
697 &[1, 2],
698 &[11, 11],
699 &[OpType::Put, OpType::Put],
700 &[21, 22],
701 )];
702 let reader = VecBatchReader::new(&input);
703 let mut reader = LastRowReader::new(Box::new(reader));
704 check_reader_result(
705 &mut reader,
706 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
707 )
708 .await;
709
710 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
712 let reader = VecBatchReader::new(&input);
713 let mut reader = LastRowReader::new(Box::new(reader));
714 check_reader_result(
715 &mut reader,
716 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
717 )
718 .await;
719 }
720
721 #[tokio::test]
722 async fn test_last_row_multi_batch() {
723 let input = [
724 new_batch(
725 b"k1",
726 &[1, 2],
727 &[11, 11],
728 &[OpType::Put, OpType::Put],
729 &[21, 22],
730 ),
731 new_batch(
732 b"k1",
733 &[3, 4],
734 &[11, 11],
735 &[OpType::Put, OpType::Put],
736 &[23, 24],
737 ),
738 new_batch(
739 b"k2",
740 &[1, 2],
741 &[11, 11],
742 &[OpType::Put, OpType::Put],
743 &[31, 32],
744 ),
745 ];
746 let reader = VecBatchReader::new(&input);
747 let mut reader = LastRowReader::new(Box::new(reader));
748 check_reader_result(
749 &mut reader,
750 &[
751 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
752 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
753 ],
754 )
755 .await;
756 }
757
758 fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
760 let num_rows = timestamps.len();
761 assert_eq!(primary_keys.len(), num_rows);
762 assert_eq!(fields.len(), num_rows);
763
764 let columns: Vec<ArrayRef> = vec![
765 Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
767 Arc::new(TimestampMillisecondArray::from_iter_values(
769 timestamps.iter().copied(),
770 )),
771 {
773 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
774 for &pk in primary_keys {
775 builder.append(pk).unwrap();
776 }
777 Arc::new(builder.finish())
778 },
779 Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
781 Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
783 ];
784
785 RecordBatch::try_new(test_flat_schema(), columns).unwrap()
786 }
787
788 fn test_flat_schema() -> SchemaRef {
789 let fields = vec![
790 Field::new("field0", DataType::Int64, false),
791 Field::new(
792 "ts",
793 DataType::Timestamp(TimeUnit::Millisecond, None),
794 false,
795 ),
796 Field::new(
797 "__primary_key",
798 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
799 false,
800 ),
801 Field::new("__sequence", DataType::UInt64, false),
802 Field::new("__op_type", DataType::UInt8, false),
803 ];
804 Arc::new(Schema::new(fields))
805 }
806
807 fn collect_flat_results(
809 selector: &mut FlatLastTimestampSelector,
810 batches: Vec<RecordBatch>,
811 ) -> Vec<(Vec<u8>, i64)> {
812 let mut output_buffer = BatchBuffer::new();
813 let mut results = Vec::new();
814 for batch in batches {
815 selector.on_next(batch, &mut output_buffer).unwrap();
816 for r in output_buffer.batches.drain(..) {
817 extract_flat_rows(&r, &mut results);
818 }
819 output_buffer.num_rows = 0;
820 }
821 selector.finish(&mut output_buffer).unwrap();
822 for r in output_buffer.batches.drain(..) {
823 extract_flat_rows(&r, &mut results);
824 }
825 results
826 }
827
828 fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
830 let ts_col = batch
831 .column(1)
832 .as_any()
833 .downcast_ref::<TimestampMillisecondArray>()
834 .unwrap();
835 let pk_col = batch
836 .column(2)
837 .as_any()
838 .downcast_ref::<PrimaryKeyArray>()
839 .unwrap();
840 let binary_values = pk_col
841 .values()
842 .as_any()
843 .downcast_ref::<BinaryArray>()
844 .unwrap();
845
846 for i in 0..batch.num_rows() {
847 let key_idx = pk_col.keys().value(i);
848 let pk = binary_values.value(key_idx as usize).to_vec();
849 let ts = ts_col.value(i);
850 out.push((pk, ts));
851 }
852 }
853
854 #[test]
855 fn test_flat_single_batch_one_key() {
856 let mut selector = FlatLastTimestampSelector::default();
857 let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
858 let results = collect_flat_results(&mut selector, vec![batch]);
859 assert_eq!(vec![(b"k1".to_vec(), 3)], results);
860 }
861
862 #[test]
863 fn test_flat_single_batch_multiple_keys() {
864 let mut selector = FlatLastTimestampSelector::default();
865 let batch = new_flat_batch(
866 &[b"k1", b"k1", b"k2", b"k2", b"k3"],
867 &[1, 2, 3, 4, 5],
868 &[10, 20, 30, 40, 50],
869 );
870 let results = collect_flat_results(&mut selector, vec![batch]);
871 assert_eq!(
872 vec![
873 (b"k1".to_vec(), 2),
874 (b"k2".to_vec(), 4),
875 (b"k3".to_vec(), 5),
876 ],
877 results
878 );
879 }
880
881 #[test]
882 fn test_flat_key_spans_batches() {
883 let mut selector = FlatLastTimestampSelector::default();
884 let batches = vec![
885 new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
886 new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
887 new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
888 ];
889 let results = collect_flat_results(&mut selector, batches);
890 assert_eq!(
891 vec![
892 (b"k1".to_vec(), 3),
893 (b"k2".to_vec(), 5),
894 (b"k3".to_vec(), 6),
895 ],
896 results
897 );
898 }
899
900 #[test]
901 fn test_flat_duplicate_last_timestamps() {
902 let mut selector = FlatLastTimestampSelector::default();
903 let batch = new_flat_batch(
905 &[b"k1", b"k1", b"k1", b"k2"],
906 &[1, 3, 3, 5],
907 &[10, 20, 30, 40],
908 );
909 let results = collect_flat_results(&mut selector, vec![batch]);
910 assert_eq!(
911 vec![
912 (b"k1".to_vec(), 3),
913 (b"k1".to_vec(), 3),
914 (b"k2".to_vec(), 5),
915 ],
916 results
917 );
918 }
919
920 #[test]
921 fn test_flat_duplicate_last_timestamps_across_batches() {
922 let mut selector = FlatLastTimestampSelector::default();
923 let batches = vec![
925 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
926 new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
927 ];
928 let results = collect_flat_results(&mut selector, batches);
929 assert_eq!(
930 vec![
931 (b"k1".to_vec(), 3),
932 (b"k1".to_vec(), 3),
933 (b"k2".to_vec(), 5),
934 ],
935 results
936 );
937 }
938
939 #[test]
940 fn test_flat_pending_chain_dropped_by_higher_timestamp() {
941 let mut selector = FlatLastTimestampSelector::default();
942 let batches = vec![
943 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
944 new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
945 new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
946 ];
947 let results = collect_flat_results(&mut selector, batches);
948 assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
949 }
950
951 #[test]
952 fn test_flat_finish_is_one_shot() {
953 let mut selector = FlatLastTimestampSelector::default();
954 let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
955 let mut output_buffer = BatchBuffer::new();
956
957 selector.on_next(batch, &mut output_buffer).unwrap();
959 let mut pre_finish = Vec::new();
960 for r in output_buffer.batches.drain(..) {
961 extract_flat_rows(&r, &mut pre_finish);
962 }
963 output_buffer.num_rows = 0;
964 assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
965
966 selector.finish(&mut output_buffer).unwrap();
968 assert!(!output_buffer.is_empty());
969 output_buffer.batches.clear();
970 output_buffer.num_rows = 0;
971
972 selector.finish(&mut output_buffer).unwrap();
974 assert!(output_buffer.is_empty());
975 }
976}