1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use snafu::ResultExt;
25use store_api::storage::{FileId, TimeSeriesRowSelector};
26
27use crate::cache::{
28 CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
29 selector_result_cache_hit, selector_result_cache_miss,
30};
31use crate::error::{ComputeArrowSnafu, Result};
32use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
33use crate::read::{Batch, BatchReader, BoxedBatchReader};
34use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
35use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
36use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
37use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
38
39pub(crate) struct LastRowReader {
48 reader: BoxedBatchReader,
50 selector: LastRowSelector,
52}
53
54impl LastRowReader {
55 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
57 Self {
58 reader,
59 selector: LastRowSelector::default(),
60 }
61 }
62
63 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
65 while let Some(batch) = self.reader.next_batch().await? {
66 if let Some(yielded) = self.selector.on_next(batch) {
67 return Ok(Some(yielded));
68 }
69 }
70 Ok(self.selector.finish())
71 }
72}
73
74#[async_trait]
75impl BatchReader for LastRowReader {
76 async fn next_batch(&mut self) -> Result<Option<Batch>> {
77 self.next_last_row().await
78 }
79}
80
81pub(crate) enum RowGroupLastRowCachedReader {
86 Hit(LastRowCacheReader),
88 Miss(RowGroupLastRowReader),
90}
91
92impl RowGroupLastRowCachedReader {
93 pub(crate) fn new(
94 file_id: FileId,
95 row_group_idx: usize,
96 cache_strategy: CacheStrategy,
97 row_group_reader: RowGroupReader,
98 ) -> Self {
99 let key = SelectorResultKey {
100 file_id,
101 row_group_idx,
102 selector: TimeSeriesRowSelector::LastRow,
103 };
104
105 if let Some(value) = cache_strategy.get_selector_result(&key) {
106 let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
107 let schema_matches =
108 value.projection == row_group_reader.read_format().projection_indices();
109 if is_primary_key && schema_matches {
110 Self::new_hit(value)
112 } else {
113 Self::new_miss(key, row_group_reader, cache_strategy)
114 }
115 } else {
116 Self::new_miss(key, row_group_reader, cache_strategy)
117 }
118 }
119
120 pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
122 match self {
123 RowGroupLastRowCachedReader::Hit(_) => None,
124 RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
125 }
126 }
127
128 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
130 selector_result_cache_hit();
131 Self::Hit(LastRowCacheReader { value, idx: 0 })
132 }
133
134 fn new_miss(
136 key: SelectorResultKey,
137 row_group_reader: RowGroupReader,
138 cache_strategy: CacheStrategy,
139 ) -> Self {
140 selector_result_cache_miss();
141 Self::Miss(RowGroupLastRowReader::new(
142 key,
143 row_group_reader,
144 cache_strategy,
145 ))
146 }
147}
148
149#[async_trait]
150impl BatchReader for RowGroupLastRowCachedReader {
151 async fn next_batch(&mut self) -> Result<Option<Batch>> {
152 match self {
153 RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
154 RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
155 }
156 }
157}
158
159pub(crate) struct LastRowCacheReader {
161 value: Arc<SelectorResultValue>,
162 idx: usize,
163}
164
165impl LastRowCacheReader {
166 async fn next_batch(&mut self) -> Result<Option<Batch>> {
168 let batches = match &self.value.result {
169 SelectorResult::PrimaryKey(batches) => batches,
170 SelectorResult::Flat(_) => unreachable!(),
171 };
172 if self.idx < batches.len() {
173 let res = Ok(Some(batches[self.idx].clone()));
174 self.idx += 1;
175 res
176 } else {
177 Ok(None)
178 }
179 }
180}
181
182pub(crate) struct RowGroupLastRowReader {
183 key: SelectorResultKey,
184 reader: RowGroupReader,
185 selector: LastRowSelector,
186 yielded_batches: Vec<Batch>,
187 cache_strategy: CacheStrategy,
188 take_index: UInt32Vector,
190}
191
192impl RowGroupLastRowReader {
193 fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
194 Self {
195 key,
196 reader,
197 selector: LastRowSelector::default(),
198 yielded_batches: vec![],
199 cache_strategy,
200 take_index: UInt32Vector::from_vec(vec![0]),
201 }
202 }
203
204 async fn next_batch(&mut self) -> Result<Option<Batch>> {
205 while let Some(batch) = self.reader.next_batch().await? {
206 if let Some(yielded) = self.selector.on_next(batch) {
207 push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
208 return Ok(Some(yielded));
209 }
210 }
211 let last_batch = if let Some(last_batch) = self.selector.finish() {
212 push_yielded_batches(
213 last_batch.clone(),
214 &self.take_index,
215 &mut self.yielded_batches,
216 )?;
217 Some(last_batch)
218 } else {
219 None
220 };
221
222 self.maybe_update_cache();
224 Ok(last_batch)
225 }
226
227 fn maybe_update_cache(&mut self) {
229 if self.yielded_batches.is_empty() {
230 return;
232 }
233 let value = Arc::new(SelectorResultValue::new(
234 std::mem::take(&mut self.yielded_batches),
235 self.reader.read_format().projection_indices().to_vec(),
236 ));
237 self.cache_strategy.put_selector_result(self.key, value);
238 }
239
240 fn metrics(&self) -> &ReaderMetrics {
241 self.reader.metrics()
242 }
243}
244
245fn push_yielded_batches(
247 mut batch: Batch,
248 take_index: &UInt32Vector,
249 yielded_batches: &mut Vec<Batch>,
250) -> Result<()> {
251 assert_eq!(1, batch.num_rows());
252 batch.take_in_place(take_index)?;
253 yielded_batches.push(batch);
254
255 Ok(())
256}
257
258#[derive(Default)]
260pub struct LastRowSelector {
261 last_batch: Option<Batch>,
262}
263
264impl LastRowSelector {
265 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
267 if let Some(last) = &self.last_batch {
268 if last.primary_key() == batch.primary_key() {
269 self.last_batch = Some(batch);
271 None
272 } else {
273 debug_assert!(!last.is_empty());
276 let last_row = last.slice(last.num_rows() - 1, 1);
277 self.last_batch = Some(batch);
278 Some(last_row)
279 }
280 } else {
281 self.last_batch = Some(batch);
282 None
283 }
284 }
285
286 pub fn finish(&mut self) -> Option<Batch> {
288 if let Some(last) = self.last_batch.take() {
289 let last_row = last.slice(last.num_rows() - 1, 1);
291 return Some(last_row);
292 }
293 None
294 }
295}
296
297pub(crate) enum FlatRowGroupLastRowCachedReader {
301 Hit(FlatLastRowCacheReader),
303 Miss(FlatRowGroupLastRowReader),
305}
306
307impl FlatRowGroupLastRowCachedReader {
308 pub(crate) fn new(
309 file_id: FileId,
310 row_group_idx: usize,
311 cache_strategy: CacheStrategy,
312 projection: &[usize],
313 reader: FlatRowGroupReader,
314 ) -> Self {
315 let key = SelectorResultKey {
316 file_id,
317 row_group_idx,
318 selector: TimeSeriesRowSelector::LastRow,
319 };
320
321 if let Some(value) = cache_strategy.get_selector_result(&key) {
322 let is_flat = matches!(&value.result, SelectorResult::Flat(_));
323 let schema_matches = value.projection == projection;
324 if is_flat && schema_matches {
325 Self::new_hit(value)
326 } else {
327 Self::new_miss(key, projection, reader, cache_strategy)
328 }
329 } else {
330 Self::new_miss(key, projection, reader, cache_strategy)
331 }
332 }
333
334 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
336 match self {
337 FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
338 FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
339 }
340 }
341
342 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
343 selector_result_cache_hit();
344 Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
345 }
346
347 fn new_miss(
348 key: SelectorResultKey,
349 projection: &[usize],
350 reader: FlatRowGroupReader,
351 cache_strategy: CacheStrategy,
352 ) -> Self {
353 selector_result_cache_miss();
354 Self::Miss(FlatRowGroupLastRowReader::new(
355 key,
356 projection.to_vec(),
357 reader,
358 cache_strategy,
359 ))
360 }
361}
362
363pub(crate) struct FlatLastRowCacheReader {
365 value: Arc<SelectorResultValue>,
366 idx: usize,
367}
368
369impl FlatLastRowCacheReader {
370 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
371 let batches = match &self.value.result {
372 SelectorResult::Flat(batches) => batches,
373 SelectorResult::PrimaryKey(_) => unreachable!(),
374 };
375 if self.idx < batches.len() {
376 let res = Ok(Some(batches[self.idx].clone()));
377 self.idx += 1;
378 res
379 } else {
380 Ok(None)
381 }
382 }
383}
384
385pub(crate) struct BatchBuffer {
387 batches: Vec<RecordBatch>,
388 num_rows: usize,
389}
390
391impl BatchBuffer {
392 fn new() -> Self {
393 Self {
394 batches: Vec::new(),
395 num_rows: 0,
396 }
397 }
398
399 fn is_full(&self) -> bool {
401 self.num_rows >= DEFAULT_READ_BATCH_SIZE
402 }
403
404 fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
406 for batch in batches {
407 self.num_rows += batch.num_rows();
408 }
409 self.batches.extend_from_slice(batches);
410 }
411
412 fn is_empty(&self) -> bool {
414 self.batches.is_empty()
415 }
416
417 fn concat(&mut self) -> Result<RecordBatch> {
419 debug_assert!(!self.batches.is_empty());
420 let schema = self.batches[0].schema();
421 let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
422 self.batches.clear();
423 self.num_rows = 0;
424 Ok(merged)
425 }
426}
427
428pub(crate) struct FlatRowGroupLastRowReader {
430 key: SelectorResultKey,
431 reader: FlatRowGroupReader,
432 selector: FlatLastTimestampSelector,
433 yielded_batches: Vec<RecordBatch>,
434 cache_strategy: CacheStrategy,
435 projection: Vec<usize>,
436 pending: BatchBuffer,
438}
439
440impl FlatRowGroupLastRowReader {
441 fn new(
442 key: SelectorResultKey,
443 projection: Vec<usize>,
444 reader: FlatRowGroupReader,
445 cache_strategy: CacheStrategy,
446 ) -> Self {
447 Self {
448 key,
449 reader,
450 selector: FlatLastTimestampSelector::default(),
451 yielded_batches: vec![],
452 cache_strategy,
453 projection,
454 pending: BatchBuffer::new(),
455 }
456 }
457
458 fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
460 if self.pending.is_empty() {
461 return Ok(None);
462 }
463 let merged = self.pending.concat()?;
464 self.yielded_batches.push(merged.clone());
465 Ok(Some(merged))
466 }
467
468 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
469 if self.pending.is_full() {
470 return self.flush_pending();
471 }
472
473 while let Some(batch) = self.reader.next_batch()? {
474 self.selector.on_next(batch, &mut self.pending)?;
475 if self.pending.is_full() {
476 return self.flush_pending();
477 }
478 }
479
480 self.selector.finish(&mut self.pending)?;
482 if !self.pending.is_empty() {
483 let result = self.flush_pending();
484 self.maybe_update_cache();
486 return result;
487 }
488
489 self.maybe_update_cache();
491 Ok(None)
492 }
493
494 fn maybe_update_cache(&mut self) {
495 if self.yielded_batches.is_empty() {
496 return;
497 }
498 let batches = std::mem::take(&mut self.yielded_batches);
499 let value = Arc::new(SelectorResultValue::new_flat(
500 batches,
501 self.projection.clone(),
502 ));
503 self.cache_strategy.put_selector_result(self.key, value);
504 }
505}
506
507#[derive(Default)]
512pub(crate) struct FlatLastTimestampSelector {
513 current_key: Option<LastKeyState>,
515}
516
517#[derive(Debug)]
518struct LastKeyState {
519 key: Vec<u8>,
520 last_timestamp: i64,
521 slices: Vec<RecordBatch>,
522}
523
524impl LastKeyState {
525 fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
526 Self {
527 key,
528 last_timestamp,
529 slices: vec![first_slice],
530 }
531 }
532}
533
534impl FlatLastTimestampSelector {
535 pub(crate) fn on_next(
537 &mut self,
538 batch: RecordBatch,
539 output_buffer: &mut BatchBuffer,
540 ) -> Result<()> {
541 if batch.num_rows() == 0 {
542 return Ok(());
543 }
544
545 let num_columns = batch.num_columns();
546 let pk_col_idx = primary_key_column_index(num_columns);
547 let ts_col_idx = time_index_column_index(num_columns);
548
549 let pk_array = batch
550 .column(pk_col_idx)
551 .as_any()
552 .downcast_ref::<PrimaryKeyArray>()
553 .unwrap();
554 let offsets = primary_key_offsets(pk_array)?;
555 if offsets.is_empty() {
556 return Ok(());
557 }
558
559 let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
560 for i in 0..offsets.len() - 1 {
561 let range_start = offsets[i];
562 let range_end = offsets[i + 1];
563 let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
564 let range_last_ts = ts_values[range_end - 1];
565 let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
566 let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
567
568 match self.current_key.as_mut() {
569 Some(state) if state.key.as_slice() == range_key => {
570 if range_last_ts > state.last_timestamp {
571 state.last_timestamp = range_last_ts;
572 state.slices.clear();
573 state.slices.push(range_slice);
574 } else if range_last_ts == state.last_timestamp {
575 state.slices.push(range_slice);
576 }
577 }
578 Some(_) => {
579 self.flush_current_key(output_buffer);
580 self.current_key = Some(LastKeyState::new(
581 range_key.to_vec(),
582 range_last_ts,
583 range_slice,
584 ));
585 }
586 None => {
587 self.current_key = Some(LastKeyState::new(
588 range_key.to_vec(),
589 range_last_ts,
590 range_slice,
591 ));
592 }
593 }
594 }
595
596 Ok(())
597 }
598
599 pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
601 self.flush_current_key(output_buffer);
602 Ok(())
603 }
604
605 fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
606 let Some(state) = self.current_key.take() else {
607 return;
608 };
609 output_buffer.extend_from_slice(&state.slices);
610 }
611}
612
613fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
615 let pk_dict = batch
616 .column(pk_col_idx)
617 .as_any()
618 .downcast_ref::<PrimaryKeyArray>()
619 .unwrap();
620 let key = pk_dict.keys().value(index);
621 let binary_values = pk_dict
622 .values()
623 .as_any()
624 .downcast_ref::<BinaryArray>()
625 .unwrap();
626 binary_values.value(key as usize)
627}
628
629fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
632 debug_assert!(range_start < range_end);
633
634 let last_ts = ts_values[range_end - 1];
635 let mut start = range_end - 1;
636 while start > range_start && ts_values[start - 1] == last_ts {
637 start -= 1;
638 }
639 start
640}
641
642#[cfg(test)]
643mod tests {
644 use std::sync::Arc;
645
646 use api::v1::OpType;
647 use datatypes::arrow::array::{
648 ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
649 UInt64Array,
650 };
651 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
652 use datatypes::arrow::record_batch::RecordBatch;
653
654 use super::*;
655 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
656
657 #[tokio::test]
658 async fn test_last_row_one_batch() {
659 let input = [new_batch(
660 b"k1",
661 &[1, 2],
662 &[11, 11],
663 &[OpType::Put, OpType::Put],
664 &[21, 22],
665 )];
666 let reader = VecBatchReader::new(&input);
667 let mut reader = LastRowReader::new(Box::new(reader));
668 check_reader_result(
669 &mut reader,
670 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
671 )
672 .await;
673
674 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
676 let reader = VecBatchReader::new(&input);
677 let mut reader = LastRowReader::new(Box::new(reader));
678 check_reader_result(
679 &mut reader,
680 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
681 )
682 .await;
683 }
684
685 #[tokio::test]
686 async fn test_last_row_multi_batch() {
687 let input = [
688 new_batch(
689 b"k1",
690 &[1, 2],
691 &[11, 11],
692 &[OpType::Put, OpType::Put],
693 &[21, 22],
694 ),
695 new_batch(
696 b"k1",
697 &[3, 4],
698 &[11, 11],
699 &[OpType::Put, OpType::Put],
700 &[23, 24],
701 ),
702 new_batch(
703 b"k2",
704 &[1, 2],
705 &[11, 11],
706 &[OpType::Put, OpType::Put],
707 &[31, 32],
708 ),
709 ];
710 let reader = VecBatchReader::new(&input);
711 let mut reader = LastRowReader::new(Box::new(reader));
712 check_reader_result(
713 &mut reader,
714 &[
715 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
716 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
717 ],
718 )
719 .await;
720 }
721
722 fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
724 let num_rows = timestamps.len();
725 assert_eq!(primary_keys.len(), num_rows);
726 assert_eq!(fields.len(), num_rows);
727
728 let columns: Vec<ArrayRef> = vec![
729 Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
731 Arc::new(TimestampMillisecondArray::from_iter_values(
733 timestamps.iter().copied(),
734 )),
735 {
737 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
738 for &pk in primary_keys {
739 builder.append(pk).unwrap();
740 }
741 Arc::new(builder.finish())
742 },
743 Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
745 Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
747 ];
748
749 RecordBatch::try_new(test_flat_schema(), columns).unwrap()
750 }
751
752 fn test_flat_schema() -> SchemaRef {
753 let fields = vec![
754 Field::new("field0", DataType::Int64, false),
755 Field::new(
756 "ts",
757 DataType::Timestamp(TimeUnit::Millisecond, None),
758 false,
759 ),
760 Field::new(
761 "__primary_key",
762 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
763 false,
764 ),
765 Field::new("__sequence", DataType::UInt64, false),
766 Field::new("__op_type", DataType::UInt8, false),
767 ];
768 Arc::new(Schema::new(fields))
769 }
770
771 fn collect_flat_results(
773 selector: &mut FlatLastTimestampSelector,
774 batches: Vec<RecordBatch>,
775 ) -> Vec<(Vec<u8>, i64)> {
776 let mut output_buffer = BatchBuffer::new();
777 let mut results = Vec::new();
778 for batch in batches {
779 selector.on_next(batch, &mut output_buffer).unwrap();
780 for r in output_buffer.batches.drain(..) {
781 extract_flat_rows(&r, &mut results);
782 }
783 output_buffer.num_rows = 0;
784 }
785 selector.finish(&mut output_buffer).unwrap();
786 for r in output_buffer.batches.drain(..) {
787 extract_flat_rows(&r, &mut results);
788 }
789 results
790 }
791
792 fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
794 let ts_col = batch
795 .column(1)
796 .as_any()
797 .downcast_ref::<TimestampMillisecondArray>()
798 .unwrap();
799 let pk_col = batch
800 .column(2)
801 .as_any()
802 .downcast_ref::<PrimaryKeyArray>()
803 .unwrap();
804 let binary_values = pk_col
805 .values()
806 .as_any()
807 .downcast_ref::<BinaryArray>()
808 .unwrap();
809
810 for i in 0..batch.num_rows() {
811 let key_idx = pk_col.keys().value(i);
812 let pk = binary_values.value(key_idx as usize).to_vec();
813 let ts = ts_col.value(i);
814 out.push((pk, ts));
815 }
816 }
817
818 #[test]
819 fn test_flat_single_batch_one_key() {
820 let mut selector = FlatLastTimestampSelector::default();
821 let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
822 let results = collect_flat_results(&mut selector, vec![batch]);
823 assert_eq!(vec![(b"k1".to_vec(), 3)], results);
824 }
825
826 #[test]
827 fn test_flat_single_batch_multiple_keys() {
828 let mut selector = FlatLastTimestampSelector::default();
829 let batch = new_flat_batch(
830 &[b"k1", b"k1", b"k2", b"k2", b"k3"],
831 &[1, 2, 3, 4, 5],
832 &[10, 20, 30, 40, 50],
833 );
834 let results = collect_flat_results(&mut selector, vec![batch]);
835 assert_eq!(
836 vec![
837 (b"k1".to_vec(), 2),
838 (b"k2".to_vec(), 4),
839 (b"k3".to_vec(), 5),
840 ],
841 results
842 );
843 }
844
845 #[test]
846 fn test_flat_key_spans_batches() {
847 let mut selector = FlatLastTimestampSelector::default();
848 let batches = vec![
849 new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
850 new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
851 new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
852 ];
853 let results = collect_flat_results(&mut selector, batches);
854 assert_eq!(
855 vec![
856 (b"k1".to_vec(), 3),
857 (b"k2".to_vec(), 5),
858 (b"k3".to_vec(), 6),
859 ],
860 results
861 );
862 }
863
864 #[test]
865 fn test_flat_duplicate_last_timestamps() {
866 let mut selector = FlatLastTimestampSelector::default();
867 let batch = new_flat_batch(
869 &[b"k1", b"k1", b"k1", b"k2"],
870 &[1, 3, 3, 5],
871 &[10, 20, 30, 40],
872 );
873 let results = collect_flat_results(&mut selector, vec![batch]);
874 assert_eq!(
875 vec![
876 (b"k1".to_vec(), 3),
877 (b"k1".to_vec(), 3),
878 (b"k2".to_vec(), 5),
879 ],
880 results
881 );
882 }
883
884 #[test]
885 fn test_flat_duplicate_last_timestamps_across_batches() {
886 let mut selector = FlatLastTimestampSelector::default();
887 let batches = vec![
889 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
890 new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
891 ];
892 let results = collect_flat_results(&mut selector, batches);
893 assert_eq!(
894 vec![
895 (b"k1".to_vec(), 3),
896 (b"k1".to_vec(), 3),
897 (b"k2".to_vec(), 5),
898 ],
899 results
900 );
901 }
902
903 #[test]
904 fn test_flat_pending_chain_dropped_by_higher_timestamp() {
905 let mut selector = FlatLastTimestampSelector::default();
906 let batches = vec![
907 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
908 new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
909 new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
910 ];
911 let results = collect_flat_results(&mut selector, batches);
912 assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
913 }
914
915 #[test]
916 fn test_flat_finish_is_one_shot() {
917 let mut selector = FlatLastTimestampSelector::default();
918 let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
919 let mut output_buffer = BatchBuffer::new();
920
921 selector.on_next(batch, &mut output_buffer).unwrap();
923 let mut pre_finish = Vec::new();
924 for r in output_buffer.batches.drain(..) {
925 extract_flat_rows(&r, &mut pre_finish);
926 }
927 output_buffer.num_rows = 0;
928 assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
929
930 selector.finish(&mut output_buffer).unwrap();
932 assert!(!output_buffer.is_empty());
933 output_buffer.batches.clear();
934 output_buffer.num_rows = 0;
935
936 selector.finish(&mut output_buffer).unwrap();
938 assert!(output_buffer.is_empty());
939 }
940}