1use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use futures::{Stream, TryStreamExt};
25use snafu::ResultExt;
26use store_api::storage::{FileId, TimeSeriesRowSelector};
27
28use crate::cache::{
29 CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
30 selector_result_cache_hit, selector_result_cache_miss,
31};
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
34use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
35use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
36use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
37use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
38use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
39
40#[allow(dead_code)]
49pub(crate) struct LastRowReader {
50 reader: BoxedBatchReader,
52 selector: LastRowSelector,
54}
55
56#[allow(dead_code)]
57impl LastRowReader {
58 pub(crate) fn new(reader: BoxedBatchReader) -> Self {
60 Self {
61 reader,
62 selector: LastRowSelector::default(),
63 }
64 }
65
66 pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
68 while let Some(batch) = self.reader.next_batch().await? {
69 if let Some(yielded) = self.selector.on_next(batch) {
70 return Ok(Some(yielded));
71 }
72 }
73 Ok(self.selector.finish())
74 }
75}
76
77#[async_trait]
78impl BatchReader for LastRowReader {
79 async fn next_batch(&mut self) -> Result<Option<Batch>> {
80 self.next_last_row().await
81 }
82}
83
84pub(crate) enum RowGroupLastRowCachedReader {
89 Hit(LastRowCacheReader),
91 Miss(RowGroupLastRowReader),
93}
94
95impl RowGroupLastRowCachedReader {
96 pub(crate) fn new(
97 file_id: FileId,
98 row_group_idx: usize,
99 cache_strategy: CacheStrategy,
100 row_group_reader: RowGroupReader,
101 ) -> Self {
102 let key = SelectorResultKey {
103 file_id,
104 row_group_idx,
105 selector: TimeSeriesRowSelector::LastRow,
106 };
107
108 if let Some(value) = cache_strategy.get_selector_result(&key) {
109 let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
110 let schema_matches =
111 value.projection == row_group_reader.read_format().projection_indices();
112 if is_primary_key && schema_matches {
113 Self::new_hit(value)
115 } else {
116 Self::new_miss(key, row_group_reader, cache_strategy)
117 }
118 } else {
119 Self::new_miss(key, row_group_reader, cache_strategy)
120 }
121 }
122
123 pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
125 match self {
126 RowGroupLastRowCachedReader::Hit(_) => None,
127 RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
128 }
129 }
130
131 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
133 selector_result_cache_hit();
134 Self::Hit(LastRowCacheReader { value, idx: 0 })
135 }
136
137 fn new_miss(
139 key: SelectorResultKey,
140 row_group_reader: RowGroupReader,
141 cache_strategy: CacheStrategy,
142 ) -> Self {
143 selector_result_cache_miss();
144 Self::Miss(RowGroupLastRowReader::new(
145 key,
146 row_group_reader,
147 cache_strategy,
148 ))
149 }
150}
151
152#[async_trait]
153impl BatchReader for RowGroupLastRowCachedReader {
154 async fn next_batch(&mut self) -> Result<Option<Batch>> {
155 match self {
156 RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
157 RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
158 }
159 }
160}
161
162pub(crate) struct LastRowCacheReader {
164 value: Arc<SelectorResultValue>,
165 idx: usize,
166}
167
168impl LastRowCacheReader {
169 async fn next_batch(&mut self) -> Result<Option<Batch>> {
171 let batches = match &self.value.result {
172 SelectorResult::PrimaryKey(batches) => batches,
173 SelectorResult::Flat(_) => unreachable!(),
174 };
175 if self.idx < batches.len() {
176 let res = Ok(Some(batches[self.idx].clone()));
177 self.idx += 1;
178 res
179 } else {
180 Ok(None)
181 }
182 }
183}
184
185pub(crate) struct RowGroupLastRowReader {
186 key: SelectorResultKey,
187 reader: RowGroupReader,
188 selector: LastRowSelector,
189 yielded_batches: Vec<Batch>,
190 cache_strategy: CacheStrategy,
191 take_index: UInt32Vector,
193}
194
195impl RowGroupLastRowReader {
196 fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
197 Self {
198 key,
199 reader,
200 selector: LastRowSelector::default(),
201 yielded_batches: vec![],
202 cache_strategy,
203 take_index: UInt32Vector::from_vec(vec![0]),
204 }
205 }
206
207 async fn next_batch(&mut self) -> Result<Option<Batch>> {
208 while let Some(batch) = self.reader.next_batch().await? {
209 if let Some(yielded) = self.selector.on_next(batch) {
210 push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
211 return Ok(Some(yielded));
212 }
213 }
214 let last_batch = if let Some(last_batch) = self.selector.finish() {
215 push_yielded_batches(
216 last_batch.clone(),
217 &self.take_index,
218 &mut self.yielded_batches,
219 )?;
220 Some(last_batch)
221 } else {
222 None
223 };
224
225 self.maybe_update_cache();
227 Ok(last_batch)
228 }
229
230 fn maybe_update_cache(&mut self) {
232 if self.yielded_batches.is_empty() {
233 return;
235 }
236 let value = Arc::new(SelectorResultValue::new(
237 std::mem::take(&mut self.yielded_batches),
238 self.reader.read_format().projection_indices().to_vec(),
239 ));
240 self.cache_strategy.put_selector_result(self.key, value);
241 }
242
243 fn metrics(&self) -> &ReaderMetrics {
244 self.reader.metrics()
245 }
246}
247
248fn push_yielded_batches(
250 mut batch: Batch,
251 take_index: &UInt32Vector,
252 yielded_batches: &mut Vec<Batch>,
253) -> Result<()> {
254 assert_eq!(1, batch.num_rows());
255 batch.take_in_place(take_index)?;
256 yielded_batches.push(batch);
257
258 Ok(())
259}
260
261#[derive(Default)]
263pub struct LastRowSelector {
264 last_batch: Option<Batch>,
265}
266
267impl LastRowSelector {
268 pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
270 if let Some(last) = &self.last_batch {
271 if last.primary_key() == batch.primary_key() {
272 self.last_batch = Some(batch);
274 None
275 } else {
276 debug_assert!(!last.is_empty());
279 let last_row = last.slice(last.num_rows() - 1, 1);
280 self.last_batch = Some(batch);
281 Some(last_row)
282 }
283 } else {
284 self.last_batch = Some(batch);
285 None
286 }
287 }
288
289 pub fn finish(&mut self) -> Option<Batch> {
291 if let Some(last) = self.last_batch.take() {
292 let last_row = last.slice(last.num_rows() - 1, 1);
294 return Some(last_row);
295 }
296 None
297 }
298}
299
300pub(crate) enum FlatRowGroupLastRowCachedReader {
304 Hit(FlatLastRowCacheReader),
306 Miss(FlatRowGroupLastRowReader),
308}
309
310impl FlatRowGroupLastRowCachedReader {
311 pub(crate) fn new(
312 file_id: FileId,
313 row_group_idx: usize,
314 cache_strategy: CacheStrategy,
315 projection: &[usize],
316 reader: FlatRowGroupReader,
317 ) -> Self {
318 let key = SelectorResultKey {
319 file_id,
320 row_group_idx,
321 selector: TimeSeriesRowSelector::LastRow,
322 };
323
324 if let Some(value) = cache_strategy.get_selector_result(&key) {
325 let is_flat = matches!(&value.result, SelectorResult::Flat(_));
326 let schema_matches = value.projection == projection;
327 if is_flat && schema_matches {
328 Self::new_hit(value)
329 } else {
330 Self::new_miss(key, projection, reader, cache_strategy)
331 }
332 } else {
333 Self::new_miss(key, projection, reader, cache_strategy)
334 }
335 }
336
337 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
339 match self {
340 FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
341 FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
342 }
343 }
344
345 fn new_hit(value: Arc<SelectorResultValue>) -> Self {
346 selector_result_cache_hit();
347 Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
348 }
349
350 fn new_miss(
351 key: SelectorResultKey,
352 projection: &[usize],
353 reader: FlatRowGroupReader,
354 cache_strategy: CacheStrategy,
355 ) -> Self {
356 selector_result_cache_miss();
357 Self::Miss(FlatRowGroupLastRowReader::new(
358 key,
359 projection.to_vec(),
360 reader,
361 cache_strategy,
362 ))
363 }
364}
365
366pub(crate) struct FlatLastRowCacheReader {
368 value: Arc<SelectorResultValue>,
369 idx: usize,
370}
371
372impl FlatLastRowCacheReader {
373 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
374 let batches = match &self.value.result {
375 SelectorResult::Flat(batches) => batches,
376 SelectorResult::PrimaryKey(_) => unreachable!(),
377 };
378 if self.idx < batches.len() {
379 let res = Ok(Some(batches[self.idx].clone()));
380 self.idx += 1;
381 res
382 } else {
383 Ok(None)
384 }
385 }
386}
387
388pub(crate) struct BatchBuffer {
390 batches: Vec<RecordBatch>,
391 num_rows: usize,
392}
393
394impl BatchBuffer {
395 fn new() -> Self {
396 Self {
397 batches: Vec::new(),
398 num_rows: 0,
399 }
400 }
401
402 fn is_full(&self) -> bool {
404 self.num_rows >= DEFAULT_READ_BATCH_SIZE
405 }
406
407 fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
409 for batch in batches {
410 self.num_rows += batch.num_rows();
411 }
412 self.batches.extend_from_slice(batches);
413 }
414
415 fn is_empty(&self) -> bool {
417 self.batches.is_empty()
418 }
419
420 fn concat(&mut self) -> Result<RecordBatch> {
422 debug_assert!(!self.batches.is_empty());
423 let schema = self.batches[0].schema();
424 let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
425 self.batches.clear();
426 self.num_rows = 0;
427 Ok(merged)
428 }
429}
430
431pub(crate) struct FlatRowGroupLastRowReader {
433 key: SelectorResultKey,
434 reader: FlatRowGroupReader,
435 selector: FlatLastTimestampSelector,
436 yielded_batches: Vec<RecordBatch>,
437 cache_strategy: CacheStrategy,
438 projection: Vec<usize>,
439 pending: BatchBuffer,
441}
442
443impl FlatRowGroupLastRowReader {
444 fn new(
445 key: SelectorResultKey,
446 projection: Vec<usize>,
447 reader: FlatRowGroupReader,
448 cache_strategy: CacheStrategy,
449 ) -> Self {
450 Self {
451 key,
452 reader,
453 selector: FlatLastTimestampSelector::default(),
454 yielded_batches: vec![],
455 cache_strategy,
456 projection,
457 pending: BatchBuffer::new(),
458 }
459 }
460
461 fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
463 if self.pending.is_empty() {
464 return Ok(None);
465 }
466 let merged = self.pending.concat()?;
467 self.yielded_batches.push(merged.clone());
468 Ok(Some(merged))
469 }
470
471 async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
472 if self.pending.is_full() {
473 return self.flush_pending();
474 }
475
476 while let Some(batch) = self.reader.next_batch().await? {
477 self.selector.on_next(batch, &mut self.pending)?;
478 if self.pending.is_full() {
479 return self.flush_pending();
480 }
481 }
482
483 self.selector.finish(&mut self.pending)?;
485 if !self.pending.is_empty() {
486 let result = self.flush_pending();
487 self.maybe_update_cache();
489 return result;
490 }
491
492 self.maybe_update_cache();
494 Ok(None)
495 }
496
497 fn maybe_update_cache(&mut self) {
498 if self.yielded_batches.is_empty() {
499 return;
500 }
501 let batches = std::mem::take(&mut self.yielded_batches);
502 let value = Arc::new(SelectorResultValue::new_flat(
503 batches,
504 self.projection.clone(),
505 ));
506 self.cache_strategy.put_selector_result(self.key, value);
507 }
508}
509
510#[derive(Default)]
515pub(crate) struct FlatLastTimestampSelector {
516 current_key: Option<LastKeyState>,
518}
519
520#[derive(Debug)]
521struct LastKeyState {
522 key: Vec<u8>,
523 last_timestamp: i64,
524 slices: Vec<RecordBatch>,
525}
526
527impl LastKeyState {
528 fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
529 Self {
530 key,
531 last_timestamp,
532 slices: vec![first_slice],
533 }
534 }
535}
536
537impl FlatLastTimestampSelector {
538 pub(crate) fn on_next(
540 &mut self,
541 batch: RecordBatch,
542 output_buffer: &mut BatchBuffer,
543 ) -> Result<()> {
544 if batch.num_rows() == 0 {
545 return Ok(());
546 }
547
548 let num_columns = batch.num_columns();
549 let pk_col_idx = primary_key_column_index(num_columns);
550 let ts_col_idx = time_index_column_index(num_columns);
551
552 let pk_array = batch
553 .column(pk_col_idx)
554 .as_any()
555 .downcast_ref::<PrimaryKeyArray>()
556 .unwrap();
557 let offsets = primary_key_offsets(pk_array)?;
558 if offsets.is_empty() {
559 return Ok(());
560 }
561
562 let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
563 for i in 0..offsets.len() - 1 {
564 let range_start = offsets[i];
565 let range_end = offsets[i + 1];
566 let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
567 let range_last_ts = ts_values[range_end - 1];
568 let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
569 let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
570
571 match self.current_key.as_mut() {
572 Some(state) if state.key.as_slice() == range_key => {
573 if range_last_ts > state.last_timestamp {
574 state.last_timestamp = range_last_ts;
575 state.slices.clear();
576 state.slices.push(range_slice);
577 } else if range_last_ts == state.last_timestamp {
578 state.slices.push(range_slice);
579 }
580 }
581 Some(_) => {
582 self.flush_current_key(output_buffer);
583 self.current_key = Some(LastKeyState::new(
584 range_key.to_vec(),
585 range_last_ts,
586 range_slice,
587 ));
588 }
589 None => {
590 self.current_key = Some(LastKeyState::new(
591 range_key.to_vec(),
592 range_last_ts,
593 range_slice,
594 ));
595 }
596 }
597 }
598
599 Ok(())
600 }
601
602 pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
604 self.flush_current_key(output_buffer);
605 Ok(())
606 }
607
608 fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
609 let Some(state) = self.current_key.take() else {
610 return;
611 };
612 output_buffer.extend_from_slice(&state.slices);
613 }
614}
615
616pub(crate) struct FlatLastRowReader {
619 stream: BoxedRecordBatchStream,
620 selector: FlatLastTimestampSelector,
621 pending: BatchBuffer,
622}
623
624impl FlatLastRowReader {
625 pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
627 Self {
628 stream,
629 selector: FlatLastTimestampSelector::default(),
630 pending: BatchBuffer::new(),
631 }
632 }
633
634 pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
636 async_stream::try_stream! {
637 while let Some(batch) = self.stream.try_next().await? {
638 self.selector.on_next(batch, &mut self.pending)?;
639 if self.pending.is_full() {
640 yield self.pending.concat()?;
641 }
642 }
643 self.selector.finish(&mut self.pending)?;
644 if !self.pending.is_empty() {
645 yield self.pending.concat()?;
646 }
647 }
648 }
649}
650
651fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
653 let pk_dict = batch
654 .column(pk_col_idx)
655 .as_any()
656 .downcast_ref::<PrimaryKeyArray>()
657 .unwrap();
658 let key = pk_dict.keys().value(index);
659 let binary_values = pk_dict
660 .values()
661 .as_any()
662 .downcast_ref::<BinaryArray>()
663 .unwrap();
664 binary_values.value(key as usize)
665}
666
667fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
670 debug_assert!(range_start < range_end);
671
672 let last_ts = ts_values[range_end - 1];
673 let mut start = range_end - 1;
674 while start > range_start && ts_values[start - 1] == last_ts {
675 start -= 1;
676 }
677 start
678}
679
680#[cfg(test)]
681mod tests {
682 use std::sync::Arc;
683
684 use api::v1::OpType;
685 use datatypes::arrow::array::{
686 ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
687 UInt64Array,
688 };
689 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
690 use datatypes::arrow::record_batch::RecordBatch;
691
692 use super::*;
693 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
694
695 #[tokio::test]
696 async fn test_last_row_one_batch() {
697 let input = [new_batch(
698 b"k1",
699 &[1, 2],
700 &[11, 11],
701 &[OpType::Put, OpType::Put],
702 &[21, 22],
703 )];
704 let reader = VecBatchReader::new(&input);
705 let mut reader = LastRowReader::new(Box::new(reader));
706 check_reader_result(
707 &mut reader,
708 &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
709 )
710 .await;
711
712 let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
714 let reader = VecBatchReader::new(&input);
715 let mut reader = LastRowReader::new(Box::new(reader));
716 check_reader_result(
717 &mut reader,
718 &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
719 )
720 .await;
721 }
722
723 #[tokio::test]
724 async fn test_last_row_multi_batch() {
725 let input = [
726 new_batch(
727 b"k1",
728 &[1, 2],
729 &[11, 11],
730 &[OpType::Put, OpType::Put],
731 &[21, 22],
732 ),
733 new_batch(
734 b"k1",
735 &[3, 4],
736 &[11, 11],
737 &[OpType::Put, OpType::Put],
738 &[23, 24],
739 ),
740 new_batch(
741 b"k2",
742 &[1, 2],
743 &[11, 11],
744 &[OpType::Put, OpType::Put],
745 &[31, 32],
746 ),
747 ];
748 let reader = VecBatchReader::new(&input);
749 let mut reader = LastRowReader::new(Box::new(reader));
750 check_reader_result(
751 &mut reader,
752 &[
753 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
754 new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
755 ],
756 )
757 .await;
758 }
759
760 fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
762 let num_rows = timestamps.len();
763 assert_eq!(primary_keys.len(), num_rows);
764 assert_eq!(fields.len(), num_rows);
765
766 let columns: Vec<ArrayRef> = vec![
767 Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
769 Arc::new(TimestampMillisecondArray::from_iter_values(
771 timestamps.iter().copied(),
772 )),
773 {
775 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
776 for &pk in primary_keys {
777 builder.append(pk).unwrap();
778 }
779 Arc::new(builder.finish())
780 },
781 Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
783 Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
785 ];
786
787 RecordBatch::try_new(test_flat_schema(), columns).unwrap()
788 }
789
790 fn test_flat_schema() -> SchemaRef {
791 let fields = vec![
792 Field::new("field0", DataType::Int64, false),
793 Field::new(
794 "ts",
795 DataType::Timestamp(TimeUnit::Millisecond, None),
796 false,
797 ),
798 Field::new(
799 "__primary_key",
800 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
801 false,
802 ),
803 Field::new("__sequence", DataType::UInt64, false),
804 Field::new("__op_type", DataType::UInt8, false),
805 ];
806 Arc::new(Schema::new(fields))
807 }
808
809 fn collect_flat_results(
811 selector: &mut FlatLastTimestampSelector,
812 batches: Vec<RecordBatch>,
813 ) -> Vec<(Vec<u8>, i64)> {
814 let mut output_buffer = BatchBuffer::new();
815 let mut results = Vec::new();
816 for batch in batches {
817 selector.on_next(batch, &mut output_buffer).unwrap();
818 for r in output_buffer.batches.drain(..) {
819 extract_flat_rows(&r, &mut results);
820 }
821 output_buffer.num_rows = 0;
822 }
823 selector.finish(&mut output_buffer).unwrap();
824 for r in output_buffer.batches.drain(..) {
825 extract_flat_rows(&r, &mut results);
826 }
827 results
828 }
829
830 fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
832 let ts_col = batch
833 .column(1)
834 .as_any()
835 .downcast_ref::<TimestampMillisecondArray>()
836 .unwrap();
837 let pk_col = batch
838 .column(2)
839 .as_any()
840 .downcast_ref::<PrimaryKeyArray>()
841 .unwrap();
842 let binary_values = pk_col
843 .values()
844 .as_any()
845 .downcast_ref::<BinaryArray>()
846 .unwrap();
847
848 for i in 0..batch.num_rows() {
849 let key_idx = pk_col.keys().value(i);
850 let pk = binary_values.value(key_idx as usize).to_vec();
851 let ts = ts_col.value(i);
852 out.push((pk, ts));
853 }
854 }
855
856 #[test]
857 fn test_flat_single_batch_one_key() {
858 let mut selector = FlatLastTimestampSelector::default();
859 let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
860 let results = collect_flat_results(&mut selector, vec![batch]);
861 assert_eq!(vec![(b"k1".to_vec(), 3)], results);
862 }
863
864 #[test]
865 fn test_flat_single_batch_multiple_keys() {
866 let mut selector = FlatLastTimestampSelector::default();
867 let batch = new_flat_batch(
868 &[b"k1", b"k1", b"k2", b"k2", b"k3"],
869 &[1, 2, 3, 4, 5],
870 &[10, 20, 30, 40, 50],
871 );
872 let results = collect_flat_results(&mut selector, vec![batch]);
873 assert_eq!(
874 vec![
875 (b"k1".to_vec(), 2),
876 (b"k2".to_vec(), 4),
877 (b"k3".to_vec(), 5),
878 ],
879 results
880 );
881 }
882
883 #[test]
884 fn test_flat_key_spans_batches() {
885 let mut selector = FlatLastTimestampSelector::default();
886 let batches = vec![
887 new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
888 new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
889 new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
890 ];
891 let results = collect_flat_results(&mut selector, batches);
892 assert_eq!(
893 vec![
894 (b"k1".to_vec(), 3),
895 (b"k2".to_vec(), 5),
896 (b"k3".to_vec(), 6),
897 ],
898 results
899 );
900 }
901
902 #[test]
903 fn test_flat_duplicate_last_timestamps() {
904 let mut selector = FlatLastTimestampSelector::default();
905 let batch = new_flat_batch(
907 &[b"k1", b"k1", b"k1", b"k2"],
908 &[1, 3, 3, 5],
909 &[10, 20, 30, 40],
910 );
911 let results = collect_flat_results(&mut selector, vec![batch]);
912 assert_eq!(
913 vec![
914 (b"k1".to_vec(), 3),
915 (b"k1".to_vec(), 3),
916 (b"k2".to_vec(), 5),
917 ],
918 results
919 );
920 }
921
922 #[test]
923 fn test_flat_duplicate_last_timestamps_across_batches() {
924 let mut selector = FlatLastTimestampSelector::default();
925 let batches = vec![
927 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
928 new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
929 ];
930 let results = collect_flat_results(&mut selector, batches);
931 assert_eq!(
932 vec![
933 (b"k1".to_vec(), 3),
934 (b"k1".to_vec(), 3),
935 (b"k2".to_vec(), 5),
936 ],
937 results
938 );
939 }
940
941 #[test]
942 fn test_flat_pending_chain_dropped_by_higher_timestamp() {
943 let mut selector = FlatLastTimestampSelector::default();
944 let batches = vec![
945 new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
946 new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
947 new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
948 ];
949 let results = collect_flat_results(&mut selector, batches);
950 assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
951 }
952
953 #[test]
954 fn test_flat_finish_is_one_shot() {
955 let mut selector = FlatLastTimestampSelector::default();
956 let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
957 let mut output_buffer = BatchBuffer::new();
958
959 selector.on_next(batch, &mut output_buffer).unwrap();
961 let mut pre_finish = Vec::new();
962 for r in output_buffer.batches.drain(..) {
963 extract_flat_rows(&r, &mut pre_finish);
964 }
965 output_buffer.num_rows = 0;
966 assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
967
968 selector.finish(&mut output_buffer).unwrap();
970 assert!(!output_buffer.is_empty());
971 output_buffer.batches.clear();
972 output_buffer.num_rows = 0;
973
974 selector.finish(&mut output_buffer).unwrap();
976 assert!(output_buffer.is_empty());
977 }
978}