Skip to main content

mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use futures::{Stream, TryStreamExt};
24use snafu::ResultExt;
25use store_api::storage::{FileId, TimeSeriesRowSelector};
26
27use crate::cache::{
28    CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
29    selector_result_cache_hit, selector_result_cache_miss,
30};
31use crate::error::{ComputeArrowSnafu, Result};
32use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
33use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
34use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
35use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
36use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
37use crate::sst::parquet::read_columns::ParquetReadColumns;
38use crate::sst::parquet::reader::FlatRowGroupReader;
39
40/// Reader to keep the last row for each time series.
41/// It assumes that batches from the input reader are
42/// - sorted
43/// - all deleted rows has been filtered.
44/// - not empty
45///
46/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
47/// it focus on time series (the same key).
48#[allow(dead_code)]
49pub(crate) struct LastRowReader {
50    /// Inner reader.
51    reader: BoxedBatchReader,
52    /// The last batch pending to return.
53    selector: LastRowSelector,
54}
55
56#[allow(dead_code)]
57impl LastRowReader {
58    /// Creates a new `LastRowReader`.
59    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
60        Self {
61            reader,
62            selector: LastRowSelector::default(),
63        }
64    }
65
66    /// Returns the last row of the next key.
67    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
68        while let Some(batch) = self.reader.next_batch().await? {
69            if let Some(yielded) = self.selector.on_next(batch) {
70                return Ok(Some(yielded));
71            }
72        }
73        Ok(self.selector.finish())
74    }
75}
76
77#[async_trait]
78impl BatchReader for LastRowReader {
79    async fn next_batch(&mut self) -> Result<Option<Batch>> {
80        self.next_last_row().await
81    }
82}
83
84/// Common struct that selects only the last row of each time series.
85#[derive(Default)]
86pub struct LastRowSelector {
87    last_batch: Option<Batch>,
88}
89
90impl LastRowSelector {
91    /// Handles next batch. Return the yielding batch if present.
92    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
93        if let Some(last) = &self.last_batch {
94            if last.primary_key() == batch.primary_key() {
95                // Same key, update last batch.
96                self.last_batch = Some(batch);
97                None
98            } else {
99                // Different key, return the last row in `last` and update `last_batch` by
100                // current batch.
101                debug_assert!(!last.is_empty());
102                let last_row = last.slice(last.num_rows() - 1, 1);
103                self.last_batch = Some(batch);
104                Some(last_row)
105            }
106        } else {
107            self.last_batch = Some(batch);
108            None
109        }
110    }
111
112    /// Finishes the selector and returns the pending batch if any.
113    pub fn finish(&mut self) -> Option<Batch> {
114        if let Some(last) = self.last_batch.take() {
115            // This is the last key.
116            let last_row = last.slice(last.num_rows() - 1, 1);
117            return Some(last_row);
118        }
119        None
120    }
121}
122
123/// Cached last row reader for flat format row group.
124/// If the last rows are already cached (as flat `RecordBatch`), returns cached values.
125/// Otherwise, reads from the row group, selects last rows, and updates the cache.
126pub(crate) enum FlatRowGroupLastRowCachedReader {
127    /// Cache hit, reads last rows from cached value.
128    Hit(FlatLastRowCacheReader),
129    /// Cache miss, reads from row group reader and updates cache.
130    Miss(FlatRowGroupLastRowReader),
131}
132
133impl FlatRowGroupLastRowCachedReader {
134    pub(crate) fn new(
135        file_id: FileId,
136        row_group_idx: usize,
137        cache_strategy: CacheStrategy,
138        read_cols: &ParquetReadColumns,
139        reader: FlatRowGroupReader,
140    ) -> Self {
141        let key = SelectorResultKey {
142            file_id,
143            row_group_idx,
144            selector: TimeSeriesRowSelector::LastRow,
145        };
146
147        if let Some(value) = cache_strategy.get_selector_result(&key) {
148            let is_flat = matches!(&value.result, SelectorResult::Flat(_));
149            let schema_matches = value.read_cols == *read_cols;
150            if is_flat && schema_matches {
151                Self::new_hit(value)
152            } else {
153                Self::new_miss(key, read_cols, reader, cache_strategy)
154            }
155        } else {
156            Self::new_miss(key, read_cols, reader, cache_strategy)
157        }
158    }
159
160    /// Returns the next RecordBatch.
161    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
162        match self {
163            FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
164            FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
165        }
166    }
167
168    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
169        selector_result_cache_hit();
170        Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
171    }
172
173    fn new_miss(
174        key: SelectorResultKey,
175        read_cols: &ParquetReadColumns,
176        reader: FlatRowGroupReader,
177        cache_strategy: CacheStrategy,
178    ) -> Self {
179        selector_result_cache_miss();
180        Self::Miss(FlatRowGroupLastRowReader::new(
181            key,
182            read_cols.clone(),
183            reader,
184            cache_strategy,
185        ))
186    }
187}
188
189/// Iterates over cached flat last rows.
190pub(crate) struct FlatLastRowCacheReader {
191    value: Arc<SelectorResultValue>,
192    idx: usize,
193}
194
195impl FlatLastRowCacheReader {
196    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
197        let batches = match &self.value.result {
198            SelectorResult::Flat(batches) => batches,
199            SelectorResult::PrimaryKey(_) => unreachable!(),
200        };
201        if self.idx < batches.len() {
202            let res = Ok(Some(batches[self.idx].clone()));
203            self.idx += 1;
204            res
205        } else {
206            Ok(None)
207        }
208    }
209}
210
211/// Buffer that accumulates small `RecordBatch`es and tracks total row count.
212pub(crate) struct BatchBuffer {
213    batches: Vec<RecordBatch>,
214    num_rows: usize,
215}
216
217impl BatchBuffer {
218    fn new() -> Self {
219        Self {
220            batches: Vec::new(),
221            num_rows: 0,
222        }
223    }
224
225    /// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`.
226    fn is_full(&self) -> bool {
227        self.num_rows >= DEFAULT_READ_BATCH_SIZE
228    }
229
230    /// Extends the buffer from a slice of batches.
231    fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
232        for batch in batches {
233            self.num_rows += batch.num_rows();
234        }
235        self.batches.extend_from_slice(batches);
236    }
237
238    /// Returns true if the buffer has no batches.
239    fn is_empty(&self) -> bool {
240        self.batches.is_empty()
241    }
242
243    /// Concatenates all buffered batches into one, resets the buffer, and returns the result.
244    fn concat(&mut self) -> Result<RecordBatch> {
245        debug_assert!(!self.batches.is_empty());
246        let schema = self.batches[0].schema();
247        let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
248        self.batches.clear();
249        self.num_rows = 0;
250        Ok(merged)
251    }
252}
253
254/// Reads last rows from a flat format row group and caches the results.
255pub(crate) struct FlatRowGroupLastRowReader {
256    key: SelectorResultKey,
257    reader: FlatRowGroupReader,
258    selector: FlatLastTimestampSelector,
259    yielded_batches: Vec<RecordBatch>,
260    cache_strategy: CacheStrategy,
261    read_cols: ParquetReadColumns,
262    /// Accumulates small selector-output batches before concatenating.
263    pending: BatchBuffer,
264}
265
266impl FlatRowGroupLastRowReader {
267    fn new(
268        key: SelectorResultKey,
269        read_cols: ParquetReadColumns,
270        reader: FlatRowGroupReader,
271        cache_strategy: CacheStrategy,
272    ) -> Self {
273        Self {
274            key,
275            reader,
276            selector: FlatLastTimestampSelector::default(),
277            yielded_batches: vec![],
278            cache_strategy,
279            read_cols,
280            pending: BatchBuffer::new(),
281        }
282    }
283
284    /// Concatenates pending batches and records the result in `yielded_batches`.
285    fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
286        if self.pending.is_empty() {
287            return Ok(None);
288        }
289        let merged = self.pending.concat()?;
290        self.yielded_batches.push(merged.clone());
291        Ok(Some(merged))
292    }
293
294    async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
295        if self.pending.is_full() {
296            return self.flush_pending();
297        }
298
299        while let Some(batch) = self.reader.next_batch().await? {
300            self.selector.on_next(batch, &mut self.pending)?;
301            if self.pending.is_full() {
302                return self.flush_pending();
303            }
304        }
305
306        // Reader exhausted — flush remaining selector state.
307        self.selector.finish(&mut self.pending)?;
308        if !self.pending.is_empty() {
309            let result = self.flush_pending();
310            // All last rows in row group are yielded, update cache.
311            self.maybe_update_cache();
312            return result;
313        }
314
315        // All last rows in row group are yielded, update cache.
316        self.maybe_update_cache();
317        Ok(None)
318    }
319
320    fn maybe_update_cache(&mut self) {
321        if self.yielded_batches.is_empty() {
322            return;
323        }
324        let batches = std::mem::take(&mut self.yielded_batches);
325        let value = Arc::new(SelectorResultValue::new_flat(
326            batches,
327            self.read_cols.clone(),
328        ));
329        self.cache_strategy.put_selector_result(self.key, value);
330    }
331}
332
333/// Selects the last-timestamp rows per primary key from flat `RecordBatch`.
334///
335/// Assumes that input batches are sorted by primary key then by timestamp,
336/// and contain only PUT operations (no DELETE).
337#[derive(Default)]
338pub(crate) struct FlatLastTimestampSelector {
339    /// State for the currently in-progress primary key.
340    current_key: Option<LastKeyState>,
341}
342
343#[derive(Debug)]
344struct LastKeyState {
345    key: Vec<u8>,
346    last_timestamp: i64,
347    slices: Vec<RecordBatch>,
348}
349
350impl LastKeyState {
351    fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
352        Self {
353            key,
354            last_timestamp,
355            slices: vec![first_slice],
356        }
357    }
358}
359
360impl FlatLastTimestampSelector {
361    /// Processes the next batch and appends completed-key results into `output_buffer`.
362    pub(crate) fn on_next(
363        &mut self,
364        batch: RecordBatch,
365        output_buffer: &mut BatchBuffer,
366    ) -> Result<()> {
367        if batch.num_rows() == 0 {
368            return Ok(());
369        }
370
371        let num_columns = batch.num_columns();
372        let pk_col_idx = primary_key_column_index(num_columns);
373        let ts_col_idx = time_index_column_index(num_columns);
374
375        let pk_array = batch
376            .column(pk_col_idx)
377            .as_any()
378            .downcast_ref::<PrimaryKeyArray>()
379            .unwrap();
380        let offsets = primary_key_offsets(pk_array)?;
381        if offsets.is_empty() {
382            return Ok(());
383        }
384
385        let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
386        for i in 0..offsets.len() - 1 {
387            let range_start = offsets[i];
388            let range_end = offsets[i + 1];
389            let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
390            let range_last_ts = ts_values[range_end - 1];
391            let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
392            let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
393
394            match self.current_key.as_mut() {
395                Some(state) if state.key.as_slice() == range_key => {
396                    if range_last_ts > state.last_timestamp {
397                        state.last_timestamp = range_last_ts;
398                        state.slices.clear();
399                        state.slices.push(range_slice);
400                    } else if range_last_ts == state.last_timestamp {
401                        state.slices.push(range_slice);
402                    }
403                }
404                Some(_) => {
405                    self.flush_current_key(output_buffer);
406                    self.current_key = Some(LastKeyState::new(
407                        range_key.to_vec(),
408                        range_last_ts,
409                        range_slice,
410                    ));
411                }
412                None => {
413                    self.current_key = Some(LastKeyState::new(
414                        range_key.to_vec(),
415                        range_last_ts,
416                        range_slice,
417                    ));
418                }
419            }
420        }
421
422        Ok(())
423    }
424
425    /// Finishes the selector and appends remaining results into `output_buffer`.
426    pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
427        self.flush_current_key(output_buffer);
428        Ok(())
429    }
430
431    fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
432        let Some(state) = self.current_key.take() else {
433            return;
434        };
435        output_buffer.extend_from_slice(&state.slices);
436    }
437}
438
439/// Reader that keeps only the last row of each time series from a flat RecordBatch stream.
440/// Assumes input is sorted, deduped, and contains no delete operations.
441pub(crate) struct FlatLastRowReader {
442    stream: BoxedRecordBatchStream,
443    selector: FlatLastTimestampSelector,
444    pending: BatchBuffer,
445}
446
447impl FlatLastRowReader {
448    /// Creates a new `FlatLastRowReader`.
449    pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
450        Self {
451            stream,
452            selector: FlatLastTimestampSelector::default(),
453            pending: BatchBuffer::new(),
454        }
455    }
456
457    /// Converts the reader into a stream of RecordBatches.
458    pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
459        async_stream::try_stream! {
460            while let Some(batch) = self.stream.try_next().await? {
461                self.selector.on_next(batch, &mut self.pending)?;
462                if self.pending.is_full() {
463                    yield self.pending.concat()?;
464                }
465            }
466            self.selector.finish(&mut self.pending)?;
467            if !self.pending.is_empty() {
468                yield self.pending.concat()?;
469            }
470        }
471    }
472}
473
474/// Gets the primary key bytes at `index` from the primary key dictionary column.
475fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
476    let pk_dict = batch
477        .column(pk_col_idx)
478        .as_any()
479        .downcast_ref::<PrimaryKeyArray>()
480        .unwrap();
481    let key = pk_dict.keys().value(index);
482    let binary_values = pk_dict
483        .values()
484        .as_any()
485        .downcast_ref::<BinaryArray>()
486        .unwrap();
487    binary_values.value(key as usize)
488}
489
490/// Finds the start index of rows sharing the last (maximum) timestamp
491/// within the range `[range_start, range_end)`.
492fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
493    debug_assert!(range_start < range_end);
494
495    let last_ts = ts_values[range_end - 1];
496    let mut start = range_end - 1;
497    while start > range_start && ts_values[start - 1] == last_ts {
498        start -= 1;
499    }
500    start
501}
502
503#[cfg(test)]
504mod tests {
505    use std::sync::Arc;
506
507    use api::v1::OpType;
508    use datatypes::arrow::array::{
509        ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
510        UInt64Array,
511    };
512    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
513    use datatypes::arrow::record_batch::RecordBatch;
514
515    use super::*;
516    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
517
518    #[tokio::test]
519    async fn test_last_row_one_batch() {
520        let input = [new_batch(
521            b"k1",
522            &[1, 2],
523            &[11, 11],
524            &[OpType::Put, OpType::Put],
525            &[21, 22],
526        )];
527        let reader = VecBatchReader::new(&input);
528        let mut reader = LastRowReader::new(Box::new(reader));
529        check_reader_result(
530            &mut reader,
531            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
532        )
533        .await;
534
535        // Only one row.
536        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
537        let reader = VecBatchReader::new(&input);
538        let mut reader = LastRowReader::new(Box::new(reader));
539        check_reader_result(
540            &mut reader,
541            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
542        )
543        .await;
544    }
545
546    #[tokio::test]
547    async fn test_last_row_multi_batch() {
548        let input = [
549            new_batch(
550                b"k1",
551                &[1, 2],
552                &[11, 11],
553                &[OpType::Put, OpType::Put],
554                &[21, 22],
555            ),
556            new_batch(
557                b"k1",
558                &[3, 4],
559                &[11, 11],
560                &[OpType::Put, OpType::Put],
561                &[23, 24],
562            ),
563            new_batch(
564                b"k2",
565                &[1, 2],
566                &[11, 11],
567                &[OpType::Put, OpType::Put],
568                &[31, 32],
569            ),
570        ];
571        let reader = VecBatchReader::new(&input);
572        let mut reader = LastRowReader::new(Box::new(reader));
573        check_reader_result(
574            &mut reader,
575            &[
576                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
577                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
578            ],
579        )
580        .await;
581    }
582
583    /// Helper to build a flat format RecordBatch for testing.
584    fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
585        let num_rows = timestamps.len();
586        assert_eq!(primary_keys.len(), num_rows);
587        assert_eq!(fields.len(), num_rows);
588
589        let columns: Vec<ArrayRef> = vec![
590            // field0 column
591            Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
592            // ts column (time index)
593            Arc::new(TimestampMillisecondArray::from_iter_values(
594                timestamps.iter().copied(),
595            )),
596            // __primary_key column (dictionary(uint32, binary))
597            {
598                let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
599                for &pk in primary_keys {
600                    builder.append(pk).unwrap();
601                }
602                Arc::new(builder.finish())
603            },
604            // __sequence column
605            Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
606            // __op_type column
607            Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
608        ];
609
610        RecordBatch::try_new(test_flat_schema(), columns).unwrap()
611    }
612
613    fn test_flat_schema() -> SchemaRef {
614        let fields = vec![
615            Field::new("field0", DataType::Int64, false),
616            Field::new(
617                "ts",
618                DataType::Timestamp(TimeUnit::Millisecond, None),
619                false,
620            ),
621            Field::new(
622                "__primary_key",
623                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
624                false,
625            ),
626            Field::new("__sequence", DataType::UInt64, false),
627            Field::new("__op_type", DataType::UInt8, false),
628        ];
629        Arc::new(Schema::new(fields))
630    }
631
632    /// Collects all rows from the selector across all result batches.
633    fn collect_flat_results(
634        selector: &mut FlatLastTimestampSelector,
635        batches: Vec<RecordBatch>,
636    ) -> Vec<(Vec<u8>, i64)> {
637        let mut output_buffer = BatchBuffer::new();
638        let mut results = Vec::new();
639        for batch in batches {
640            selector.on_next(batch, &mut output_buffer).unwrap();
641            for r in output_buffer.batches.drain(..) {
642                extract_flat_rows(&r, &mut results);
643            }
644            output_buffer.num_rows = 0;
645        }
646        selector.finish(&mut output_buffer).unwrap();
647        for r in output_buffer.batches.drain(..) {
648            extract_flat_rows(&r, &mut results);
649        }
650        results
651    }
652
653    /// Extracts (primary_key, timestamp) pairs from a result batch.
654    fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
655        let ts_col = batch
656            .column(1)
657            .as_any()
658            .downcast_ref::<TimestampMillisecondArray>()
659            .unwrap();
660        let pk_col = batch
661            .column(2)
662            .as_any()
663            .downcast_ref::<PrimaryKeyArray>()
664            .unwrap();
665        let binary_values = pk_col
666            .values()
667            .as_any()
668            .downcast_ref::<BinaryArray>()
669            .unwrap();
670
671        for i in 0..batch.num_rows() {
672            let key_idx = pk_col.keys().value(i);
673            let pk = binary_values.value(key_idx as usize).to_vec();
674            let ts = ts_col.value(i);
675            out.push((pk, ts));
676        }
677    }
678
679    #[test]
680    fn test_flat_single_batch_one_key() {
681        let mut selector = FlatLastTimestampSelector::default();
682        let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
683        let results = collect_flat_results(&mut selector, vec![batch]);
684        assert_eq!(vec![(b"k1".to_vec(), 3)], results);
685    }
686
687    #[test]
688    fn test_flat_single_batch_multiple_keys() {
689        let mut selector = FlatLastTimestampSelector::default();
690        let batch = new_flat_batch(
691            &[b"k1", b"k1", b"k2", b"k2", b"k3"],
692            &[1, 2, 3, 4, 5],
693            &[10, 20, 30, 40, 50],
694        );
695        let results = collect_flat_results(&mut selector, vec![batch]);
696        assert_eq!(
697            vec![
698                (b"k1".to_vec(), 2),
699                (b"k2".to_vec(), 4),
700                (b"k3".to_vec(), 5),
701            ],
702            results
703        );
704    }
705
706    #[test]
707    fn test_flat_key_spans_batches() {
708        let mut selector = FlatLastTimestampSelector::default();
709        let batches = vec![
710            new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
711            new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
712            new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
713        ];
714        let results = collect_flat_results(&mut selector, batches);
715        assert_eq!(
716            vec![
717                (b"k1".to_vec(), 3),
718                (b"k2".to_vec(), 5),
719                (b"k3".to_vec(), 6),
720            ],
721            results
722        );
723    }
724
725    #[test]
726    fn test_flat_duplicate_last_timestamps() {
727        let mut selector = FlatLastTimestampSelector::default();
728        // k1 has two rows with the same last timestamp (3).
729        let batch = new_flat_batch(
730            &[b"k1", b"k1", b"k1", b"k2"],
731            &[1, 3, 3, 5],
732            &[10, 20, 30, 40],
733        );
734        let results = collect_flat_results(&mut selector, vec![batch]);
735        assert_eq!(
736            vec![
737                (b"k1".to_vec(), 3),
738                (b"k1".to_vec(), 3),
739                (b"k2".to_vec(), 5),
740            ],
741            results
742        );
743    }
744
745    #[test]
746    fn test_flat_duplicate_last_timestamps_across_batches() {
747        let mut selector = FlatLastTimestampSelector::default();
748        // k1's last timestamp (3) spans two batches.
749        let batches = vec![
750            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
751            new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
752        ];
753        let results = collect_flat_results(&mut selector, batches);
754        assert_eq!(
755            vec![
756                (b"k1".to_vec(), 3),
757                (b"k1".to_vec(), 3),
758                (b"k2".to_vec(), 5),
759            ],
760            results
761        );
762    }
763
764    #[test]
765    fn test_flat_pending_chain_dropped_by_higher_timestamp() {
766        let mut selector = FlatLastTimestampSelector::default();
767        let batches = vec![
768            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
769            new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
770            new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
771        ];
772        let results = collect_flat_results(&mut selector, batches);
773        assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
774    }
775
776    #[test]
777    fn test_flat_finish_is_one_shot() {
778        let mut selector = FlatLastTimestampSelector::default();
779        let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
780        let mut output_buffer = BatchBuffer::new();
781
782        // Feed one batch: completed keys can be emitted before EOF.
783        selector.on_next(batch, &mut output_buffer).unwrap();
784        let mut pre_finish = Vec::new();
785        for r in output_buffer.batches.drain(..) {
786            extract_flat_rows(&r, &mut pre_finish);
787        }
788        output_buffer.num_rows = 0;
789        assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
790
791        // Simulate EOF by calling finish().
792        selector.finish(&mut output_buffer).unwrap();
793        assert!(!output_buffer.is_empty());
794        output_buffer.batches.clear();
795        output_buffer.num_rows = 0;
796
797        // A second finish after EOF should not yield any more rows.
798        selector.finish(&mut output_buffer).unwrap();
799        assert!(output_buffer.is_empty());
800    }
801}