mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use futures::{Stream, TryStreamExt};
25use snafu::ResultExt;
26use store_api::storage::{FileId, TimeSeriesRowSelector};
27
28use crate::cache::{
29    CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
30    selector_result_cache_hit, selector_result_cache_miss,
31};
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
34use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
35use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
36use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
37use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
38use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
39
40/// Reader to keep the last row for each time series.
41/// It assumes that batches from the input reader are
42/// - sorted
43/// - all deleted rows has been filtered.
44/// - not empty
45///
46/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
47/// it focus on time series (the same key).
48pub(crate) struct LastRowReader {
49    /// Inner reader.
50    reader: BoxedBatchReader,
51    /// The last batch pending to return.
52    selector: LastRowSelector,
53}
54
55impl LastRowReader {
56    /// Creates a new `LastRowReader`.
57    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
58        Self {
59            reader,
60            selector: LastRowSelector::default(),
61        }
62    }
63
64    /// Returns the last row of the next key.
65    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
66        while let Some(batch) = self.reader.next_batch().await? {
67            if let Some(yielded) = self.selector.on_next(batch) {
68                return Ok(Some(yielded));
69            }
70        }
71        Ok(self.selector.finish())
72    }
73}
74
75#[async_trait]
76impl BatchReader for LastRowReader {
77    async fn next_batch(&mut self) -> Result<Option<Batch>> {
78        self.next_last_row().await
79    }
80}
81
82/// Cached last row reader for specific row group.
83/// If the last rows for current row group are already cached, this reader returns the cached value.
84/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
85/// upon finish.
86pub(crate) enum RowGroupLastRowCachedReader {
87    /// Cache hit, reads last rows from cached value.
88    Hit(LastRowCacheReader),
89    /// Cache miss, reads from row group reader and update cache.
90    Miss(RowGroupLastRowReader),
91}
92
93impl RowGroupLastRowCachedReader {
94    pub(crate) fn new(
95        file_id: FileId,
96        row_group_idx: usize,
97        cache_strategy: CacheStrategy,
98        row_group_reader: RowGroupReader,
99    ) -> Self {
100        let key = SelectorResultKey {
101            file_id,
102            row_group_idx,
103            selector: TimeSeriesRowSelector::LastRow,
104        };
105
106        if let Some(value) = cache_strategy.get_selector_result(&key) {
107            let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
108            let schema_matches =
109                value.projection == row_group_reader.read_format().projection_indices();
110            if is_primary_key && schema_matches {
111                // Format and schema match, use cache batches.
112                Self::new_hit(value)
113            } else {
114                Self::new_miss(key, row_group_reader, cache_strategy)
115            }
116        } else {
117            Self::new_miss(key, row_group_reader, cache_strategy)
118        }
119    }
120
121    /// Gets the underlying reader metrics if uncached.
122    pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
123        match self {
124            RowGroupLastRowCachedReader::Hit(_) => None,
125            RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
126        }
127    }
128
129    /// Creates new Hit variant and updates metrics.
130    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
131        selector_result_cache_hit();
132        Self::Hit(LastRowCacheReader { value, idx: 0 })
133    }
134
135    /// Creates new Miss variant and updates metrics.
136    fn new_miss(
137        key: SelectorResultKey,
138        row_group_reader: RowGroupReader,
139        cache_strategy: CacheStrategy,
140    ) -> Self {
141        selector_result_cache_miss();
142        Self::Miss(RowGroupLastRowReader::new(
143            key,
144            row_group_reader,
145            cache_strategy,
146        ))
147    }
148}
149
150#[async_trait]
151impl BatchReader for RowGroupLastRowCachedReader {
152    async fn next_batch(&mut self) -> Result<Option<Batch>> {
153        match self {
154            RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
155            RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
156        }
157    }
158}
159
160/// Last row reader that returns the cached last rows for row group.
161pub(crate) struct LastRowCacheReader {
162    value: Arc<SelectorResultValue>,
163    idx: usize,
164}
165
166impl LastRowCacheReader {
167    /// Iterates cached last rows.
168    async fn next_batch(&mut self) -> Result<Option<Batch>> {
169        let batches = match &self.value.result {
170            SelectorResult::PrimaryKey(batches) => batches,
171            SelectorResult::Flat(_) => unreachable!(),
172        };
173        if self.idx < batches.len() {
174            let res = Ok(Some(batches[self.idx].clone()));
175            self.idx += 1;
176            res
177        } else {
178            Ok(None)
179        }
180    }
181}
182
183pub(crate) struct RowGroupLastRowReader {
184    key: SelectorResultKey,
185    reader: RowGroupReader,
186    selector: LastRowSelector,
187    yielded_batches: Vec<Batch>,
188    cache_strategy: CacheStrategy,
189    /// Index buffer to take a new batch from the last row.
190    take_index: UInt32Vector,
191}
192
193impl RowGroupLastRowReader {
194    fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
195        Self {
196            key,
197            reader,
198            selector: LastRowSelector::default(),
199            yielded_batches: vec![],
200            cache_strategy,
201            take_index: UInt32Vector::from_vec(vec![0]),
202        }
203    }
204
205    async fn next_batch(&mut self) -> Result<Option<Batch>> {
206        while let Some(batch) = self.reader.next_batch().await? {
207            if let Some(yielded) = self.selector.on_next(batch) {
208                push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
209                return Ok(Some(yielded));
210            }
211        }
212        let last_batch = if let Some(last_batch) = self.selector.finish() {
213            push_yielded_batches(
214                last_batch.clone(),
215                &self.take_index,
216                &mut self.yielded_batches,
217            )?;
218            Some(last_batch)
219        } else {
220            None
221        };
222
223        // All last rows in row group are yielded, update cache.
224        self.maybe_update_cache();
225        Ok(last_batch)
226    }
227
228    /// Updates row group's last row cache if cache manager is present.
229    fn maybe_update_cache(&mut self) {
230        if self.yielded_batches.is_empty() {
231            // we always expect that row groups yields batches.
232            return;
233        }
234        let value = Arc::new(SelectorResultValue::new(
235            std::mem::take(&mut self.yielded_batches),
236            self.reader.read_format().projection_indices().to_vec(),
237        ));
238        self.cache_strategy.put_selector_result(self.key, value);
239    }
240
241    fn metrics(&self) -> &ReaderMetrics {
242        self.reader.metrics()
243    }
244}
245
246/// Push last row into `yielded_batches`.
247fn push_yielded_batches(
248    mut batch: Batch,
249    take_index: &UInt32Vector,
250    yielded_batches: &mut Vec<Batch>,
251) -> Result<()> {
252    assert_eq!(1, batch.num_rows());
253    batch.take_in_place(take_index)?;
254    yielded_batches.push(batch);
255
256    Ok(())
257}
258
259/// Common struct that selects only the last row of each time series.
260#[derive(Default)]
261pub struct LastRowSelector {
262    last_batch: Option<Batch>,
263}
264
265impl LastRowSelector {
266    /// Handles next batch. Return the yielding batch if present.
267    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
268        if let Some(last) = &self.last_batch {
269            if last.primary_key() == batch.primary_key() {
270                // Same key, update last batch.
271                self.last_batch = Some(batch);
272                None
273            } else {
274                // Different key, return the last row in `last` and update `last_batch` by
275                // current batch.
276                debug_assert!(!last.is_empty());
277                let last_row = last.slice(last.num_rows() - 1, 1);
278                self.last_batch = Some(batch);
279                Some(last_row)
280            }
281        } else {
282            self.last_batch = Some(batch);
283            None
284        }
285    }
286
287    /// Finishes the selector and returns the pending batch if any.
288    pub fn finish(&mut self) -> Option<Batch> {
289        if let Some(last) = self.last_batch.take() {
290            // This is the last key.
291            let last_row = last.slice(last.num_rows() - 1, 1);
292            return Some(last_row);
293        }
294        None
295    }
296}
297
298/// Cached last row reader for flat format row group.
299/// If the last rows are already cached (as flat `RecordBatch`), returns cached values.
300/// Otherwise, reads from the row group, selects last rows, and updates the cache.
301pub(crate) enum FlatRowGroupLastRowCachedReader {
302    /// Cache hit, reads last rows from cached value.
303    Hit(FlatLastRowCacheReader),
304    /// Cache miss, reads from row group reader and updates cache.
305    Miss(FlatRowGroupLastRowReader),
306}
307
308impl FlatRowGroupLastRowCachedReader {
309    pub(crate) fn new(
310        file_id: FileId,
311        row_group_idx: usize,
312        cache_strategy: CacheStrategy,
313        projection: &[usize],
314        reader: FlatRowGroupReader,
315    ) -> Self {
316        let key = SelectorResultKey {
317            file_id,
318            row_group_idx,
319            selector: TimeSeriesRowSelector::LastRow,
320        };
321
322        if let Some(value) = cache_strategy.get_selector_result(&key) {
323            let is_flat = matches!(&value.result, SelectorResult::Flat(_));
324            let schema_matches = value.projection == projection;
325            if is_flat && schema_matches {
326                Self::new_hit(value)
327            } else {
328                Self::new_miss(key, projection, reader, cache_strategy)
329            }
330        } else {
331            Self::new_miss(key, projection, reader, cache_strategy)
332        }
333    }
334
335    /// Returns the next RecordBatch.
336    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
337        match self {
338            FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
339            FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
340        }
341    }
342
343    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
344        selector_result_cache_hit();
345        Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
346    }
347
348    fn new_miss(
349        key: SelectorResultKey,
350        projection: &[usize],
351        reader: FlatRowGroupReader,
352        cache_strategy: CacheStrategy,
353    ) -> Self {
354        selector_result_cache_miss();
355        Self::Miss(FlatRowGroupLastRowReader::new(
356            key,
357            projection.to_vec(),
358            reader,
359            cache_strategy,
360        ))
361    }
362}
363
364/// Iterates over cached flat last rows.
365pub(crate) struct FlatLastRowCacheReader {
366    value: Arc<SelectorResultValue>,
367    idx: usize,
368}
369
370impl FlatLastRowCacheReader {
371    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
372        let batches = match &self.value.result {
373            SelectorResult::Flat(batches) => batches,
374            SelectorResult::PrimaryKey(_) => unreachable!(),
375        };
376        if self.idx < batches.len() {
377            let res = Ok(Some(batches[self.idx].clone()));
378            self.idx += 1;
379            res
380        } else {
381            Ok(None)
382        }
383    }
384}
385
386/// Buffer that accumulates small `RecordBatch`es and tracks total row count.
387pub(crate) struct BatchBuffer {
388    batches: Vec<RecordBatch>,
389    num_rows: usize,
390}
391
392impl BatchBuffer {
393    fn new() -> Self {
394        Self {
395            batches: Vec::new(),
396            num_rows: 0,
397        }
398    }
399
400    /// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`.
401    fn is_full(&self) -> bool {
402        self.num_rows >= DEFAULT_READ_BATCH_SIZE
403    }
404
405    /// Extends the buffer from a slice of batches.
406    fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
407        for batch in batches {
408            self.num_rows += batch.num_rows();
409        }
410        self.batches.extend_from_slice(batches);
411    }
412
413    /// Returns true if the buffer has no batches.
414    fn is_empty(&self) -> bool {
415        self.batches.is_empty()
416    }
417
418    /// Concatenates all buffered batches into one, resets the buffer, and returns the result.
419    fn concat(&mut self) -> Result<RecordBatch> {
420        debug_assert!(!self.batches.is_empty());
421        let schema = self.batches[0].schema();
422        let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
423        self.batches.clear();
424        self.num_rows = 0;
425        Ok(merged)
426    }
427}
428
429/// Reads last rows from a flat format row group and caches the results.
430pub(crate) struct FlatRowGroupLastRowReader {
431    key: SelectorResultKey,
432    reader: FlatRowGroupReader,
433    selector: FlatLastTimestampSelector,
434    yielded_batches: Vec<RecordBatch>,
435    cache_strategy: CacheStrategy,
436    projection: Vec<usize>,
437    /// Accumulates small selector-output batches before concatenating.
438    pending: BatchBuffer,
439}
440
441impl FlatRowGroupLastRowReader {
442    fn new(
443        key: SelectorResultKey,
444        projection: Vec<usize>,
445        reader: FlatRowGroupReader,
446        cache_strategy: CacheStrategy,
447    ) -> Self {
448        Self {
449            key,
450            reader,
451            selector: FlatLastTimestampSelector::default(),
452            yielded_batches: vec![],
453            cache_strategy,
454            projection,
455            pending: BatchBuffer::new(),
456        }
457    }
458
459    /// Concatenates pending batches and records the result in `yielded_batches`.
460    fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
461        if self.pending.is_empty() {
462            return Ok(None);
463        }
464        let merged = self.pending.concat()?;
465        self.yielded_batches.push(merged.clone());
466        Ok(Some(merged))
467    }
468
469    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
470        if self.pending.is_full() {
471            return self.flush_pending();
472        }
473
474        while let Some(batch) = self.reader.next_batch()? {
475            self.selector.on_next(batch, &mut self.pending)?;
476            if self.pending.is_full() {
477                return self.flush_pending();
478            }
479        }
480
481        // Reader exhausted — flush remaining selector state.
482        self.selector.finish(&mut self.pending)?;
483        if !self.pending.is_empty() {
484            let result = self.flush_pending();
485            // All last rows in row group are yielded, update cache.
486            self.maybe_update_cache();
487            return result;
488        }
489
490        // All last rows in row group are yielded, update cache.
491        self.maybe_update_cache();
492        Ok(None)
493    }
494
495    fn maybe_update_cache(&mut self) {
496        if self.yielded_batches.is_empty() {
497            return;
498        }
499        let batches = std::mem::take(&mut self.yielded_batches);
500        let value = Arc::new(SelectorResultValue::new_flat(
501            batches,
502            self.projection.clone(),
503        ));
504        self.cache_strategy.put_selector_result(self.key, value);
505    }
506}
507
508/// Selects the last-timestamp rows per primary key from flat `RecordBatch`.
509///
510/// Assumes that input batches are sorted by primary key then by timestamp,
511/// and contain only PUT operations (no DELETE).
512#[derive(Default)]
513pub(crate) struct FlatLastTimestampSelector {
514    /// State for the currently in-progress primary key.
515    current_key: Option<LastKeyState>,
516}
517
518#[derive(Debug)]
519struct LastKeyState {
520    key: Vec<u8>,
521    last_timestamp: i64,
522    slices: Vec<RecordBatch>,
523}
524
525impl LastKeyState {
526    fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
527        Self {
528            key,
529            last_timestamp,
530            slices: vec![first_slice],
531        }
532    }
533}
534
535impl FlatLastTimestampSelector {
536    /// Processes the next batch and appends completed-key results into `output_buffer`.
537    pub(crate) fn on_next(
538        &mut self,
539        batch: RecordBatch,
540        output_buffer: &mut BatchBuffer,
541    ) -> Result<()> {
542        if batch.num_rows() == 0 {
543            return Ok(());
544        }
545
546        let num_columns = batch.num_columns();
547        let pk_col_idx = primary_key_column_index(num_columns);
548        let ts_col_idx = time_index_column_index(num_columns);
549
550        let pk_array = batch
551            .column(pk_col_idx)
552            .as_any()
553            .downcast_ref::<PrimaryKeyArray>()
554            .unwrap();
555        let offsets = primary_key_offsets(pk_array)?;
556        if offsets.is_empty() {
557            return Ok(());
558        }
559
560        let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
561        for i in 0..offsets.len() - 1 {
562            let range_start = offsets[i];
563            let range_end = offsets[i + 1];
564            let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
565            let range_last_ts = ts_values[range_end - 1];
566            let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
567            let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
568
569            match self.current_key.as_mut() {
570                Some(state) if state.key.as_slice() == range_key => {
571                    if range_last_ts > state.last_timestamp {
572                        state.last_timestamp = range_last_ts;
573                        state.slices.clear();
574                        state.slices.push(range_slice);
575                    } else if range_last_ts == state.last_timestamp {
576                        state.slices.push(range_slice);
577                    }
578                }
579                Some(_) => {
580                    self.flush_current_key(output_buffer);
581                    self.current_key = Some(LastKeyState::new(
582                        range_key.to_vec(),
583                        range_last_ts,
584                        range_slice,
585                    ));
586                }
587                None => {
588                    self.current_key = Some(LastKeyState::new(
589                        range_key.to_vec(),
590                        range_last_ts,
591                        range_slice,
592                    ));
593                }
594            }
595        }
596
597        Ok(())
598    }
599
600    /// Finishes the selector and appends remaining results into `output_buffer`.
601    pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
602        self.flush_current_key(output_buffer);
603        Ok(())
604    }
605
606    fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
607        let Some(state) = self.current_key.take() else {
608            return;
609        };
610        output_buffer.extend_from_slice(&state.slices);
611    }
612}
613
614/// Reader that keeps only the last row of each time series from a flat RecordBatch stream.
615/// Assumes input is sorted, deduped, and contains no delete operations.
616pub(crate) struct FlatLastRowReader {
617    stream: BoxedRecordBatchStream,
618    selector: FlatLastTimestampSelector,
619    pending: BatchBuffer,
620}
621
622impl FlatLastRowReader {
623    /// Creates a new `FlatLastRowReader`.
624    pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
625        Self {
626            stream,
627            selector: FlatLastTimestampSelector::default(),
628            pending: BatchBuffer::new(),
629        }
630    }
631
632    /// Converts the reader into a stream of RecordBatches.
633    pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
634        async_stream::try_stream! {
635            while let Some(batch) = self.stream.try_next().await? {
636                self.selector.on_next(batch, &mut self.pending)?;
637                if self.pending.is_full() {
638                    yield self.pending.concat()?;
639                }
640            }
641            self.selector.finish(&mut self.pending)?;
642            if !self.pending.is_empty() {
643                yield self.pending.concat()?;
644            }
645        }
646    }
647}
648
649/// Gets the primary key bytes at `index` from the primary key dictionary column.
650fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
651    let pk_dict = batch
652        .column(pk_col_idx)
653        .as_any()
654        .downcast_ref::<PrimaryKeyArray>()
655        .unwrap();
656    let key = pk_dict.keys().value(index);
657    let binary_values = pk_dict
658        .values()
659        .as_any()
660        .downcast_ref::<BinaryArray>()
661        .unwrap();
662    binary_values.value(key as usize)
663}
664
665/// Finds the start index of rows sharing the last (maximum) timestamp
666/// within the range `[range_start, range_end)`.
667fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
668    debug_assert!(range_start < range_end);
669
670    let last_ts = ts_values[range_end - 1];
671    let mut start = range_end - 1;
672    while start > range_start && ts_values[start - 1] == last_ts {
673        start -= 1;
674    }
675    start
676}
677
678#[cfg(test)]
679mod tests {
680    use std::sync::Arc;
681
682    use api::v1::OpType;
683    use datatypes::arrow::array::{
684        ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
685        UInt64Array,
686    };
687    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
688    use datatypes::arrow::record_batch::RecordBatch;
689
690    use super::*;
691    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
692
693    #[tokio::test]
694    async fn test_last_row_one_batch() {
695        let input = [new_batch(
696            b"k1",
697            &[1, 2],
698            &[11, 11],
699            &[OpType::Put, OpType::Put],
700            &[21, 22],
701        )];
702        let reader = VecBatchReader::new(&input);
703        let mut reader = LastRowReader::new(Box::new(reader));
704        check_reader_result(
705            &mut reader,
706            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
707        )
708        .await;
709
710        // Only one row.
711        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
712        let reader = VecBatchReader::new(&input);
713        let mut reader = LastRowReader::new(Box::new(reader));
714        check_reader_result(
715            &mut reader,
716            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
717        )
718        .await;
719    }
720
721    #[tokio::test]
722    async fn test_last_row_multi_batch() {
723        let input = [
724            new_batch(
725                b"k1",
726                &[1, 2],
727                &[11, 11],
728                &[OpType::Put, OpType::Put],
729                &[21, 22],
730            ),
731            new_batch(
732                b"k1",
733                &[3, 4],
734                &[11, 11],
735                &[OpType::Put, OpType::Put],
736                &[23, 24],
737            ),
738            new_batch(
739                b"k2",
740                &[1, 2],
741                &[11, 11],
742                &[OpType::Put, OpType::Put],
743                &[31, 32],
744            ),
745        ];
746        let reader = VecBatchReader::new(&input);
747        let mut reader = LastRowReader::new(Box::new(reader));
748        check_reader_result(
749            &mut reader,
750            &[
751                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
752                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
753            ],
754        )
755        .await;
756    }
757
758    /// Helper to build a flat format RecordBatch for testing.
759    fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
760        let num_rows = timestamps.len();
761        assert_eq!(primary_keys.len(), num_rows);
762        assert_eq!(fields.len(), num_rows);
763
764        let columns: Vec<ArrayRef> = vec![
765            // field0 column
766            Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
767            // ts column (time index)
768            Arc::new(TimestampMillisecondArray::from_iter_values(
769                timestamps.iter().copied(),
770            )),
771            // __primary_key column (dictionary(uint32, binary))
772            {
773                let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
774                for &pk in primary_keys {
775                    builder.append(pk).unwrap();
776                }
777                Arc::new(builder.finish())
778            },
779            // __sequence column
780            Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
781            // __op_type column
782            Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
783        ];
784
785        RecordBatch::try_new(test_flat_schema(), columns).unwrap()
786    }
787
788    fn test_flat_schema() -> SchemaRef {
789        let fields = vec![
790            Field::new("field0", DataType::Int64, false),
791            Field::new(
792                "ts",
793                DataType::Timestamp(TimeUnit::Millisecond, None),
794                false,
795            ),
796            Field::new(
797                "__primary_key",
798                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
799                false,
800            ),
801            Field::new("__sequence", DataType::UInt64, false),
802            Field::new("__op_type", DataType::UInt8, false),
803        ];
804        Arc::new(Schema::new(fields))
805    }
806
807    /// Collects all rows from the selector across all result batches.
808    fn collect_flat_results(
809        selector: &mut FlatLastTimestampSelector,
810        batches: Vec<RecordBatch>,
811    ) -> Vec<(Vec<u8>, i64)> {
812        let mut output_buffer = BatchBuffer::new();
813        let mut results = Vec::new();
814        for batch in batches {
815            selector.on_next(batch, &mut output_buffer).unwrap();
816            for r in output_buffer.batches.drain(..) {
817                extract_flat_rows(&r, &mut results);
818            }
819            output_buffer.num_rows = 0;
820        }
821        selector.finish(&mut output_buffer).unwrap();
822        for r in output_buffer.batches.drain(..) {
823            extract_flat_rows(&r, &mut results);
824        }
825        results
826    }
827
828    /// Extracts (primary_key, timestamp) pairs from a result batch.
829    fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
830        let ts_col = batch
831            .column(1)
832            .as_any()
833            .downcast_ref::<TimestampMillisecondArray>()
834            .unwrap();
835        let pk_col = batch
836            .column(2)
837            .as_any()
838            .downcast_ref::<PrimaryKeyArray>()
839            .unwrap();
840        let binary_values = pk_col
841            .values()
842            .as_any()
843            .downcast_ref::<BinaryArray>()
844            .unwrap();
845
846        for i in 0..batch.num_rows() {
847            let key_idx = pk_col.keys().value(i);
848            let pk = binary_values.value(key_idx as usize).to_vec();
849            let ts = ts_col.value(i);
850            out.push((pk, ts));
851        }
852    }
853
854    #[test]
855    fn test_flat_single_batch_one_key() {
856        let mut selector = FlatLastTimestampSelector::default();
857        let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
858        let results = collect_flat_results(&mut selector, vec![batch]);
859        assert_eq!(vec![(b"k1".to_vec(), 3)], results);
860    }
861
862    #[test]
863    fn test_flat_single_batch_multiple_keys() {
864        let mut selector = FlatLastTimestampSelector::default();
865        let batch = new_flat_batch(
866            &[b"k1", b"k1", b"k2", b"k2", b"k3"],
867            &[1, 2, 3, 4, 5],
868            &[10, 20, 30, 40, 50],
869        );
870        let results = collect_flat_results(&mut selector, vec![batch]);
871        assert_eq!(
872            vec![
873                (b"k1".to_vec(), 2),
874                (b"k2".to_vec(), 4),
875                (b"k3".to_vec(), 5),
876            ],
877            results
878        );
879    }
880
881    #[test]
882    fn test_flat_key_spans_batches() {
883        let mut selector = FlatLastTimestampSelector::default();
884        let batches = vec![
885            new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
886            new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
887            new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
888        ];
889        let results = collect_flat_results(&mut selector, batches);
890        assert_eq!(
891            vec![
892                (b"k1".to_vec(), 3),
893                (b"k2".to_vec(), 5),
894                (b"k3".to_vec(), 6),
895            ],
896            results
897        );
898    }
899
900    #[test]
901    fn test_flat_duplicate_last_timestamps() {
902        let mut selector = FlatLastTimestampSelector::default();
903        // k1 has two rows with the same last timestamp (3).
904        let batch = new_flat_batch(
905            &[b"k1", b"k1", b"k1", b"k2"],
906            &[1, 3, 3, 5],
907            &[10, 20, 30, 40],
908        );
909        let results = collect_flat_results(&mut selector, vec![batch]);
910        assert_eq!(
911            vec![
912                (b"k1".to_vec(), 3),
913                (b"k1".to_vec(), 3),
914                (b"k2".to_vec(), 5),
915            ],
916            results
917        );
918    }
919
920    #[test]
921    fn test_flat_duplicate_last_timestamps_across_batches() {
922        let mut selector = FlatLastTimestampSelector::default();
923        // k1's last timestamp (3) spans two batches.
924        let batches = vec![
925            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
926            new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
927        ];
928        let results = collect_flat_results(&mut selector, batches);
929        assert_eq!(
930            vec![
931                (b"k1".to_vec(), 3),
932                (b"k1".to_vec(), 3),
933                (b"k2".to_vec(), 5),
934            ],
935            results
936        );
937    }
938
939    #[test]
940    fn test_flat_pending_chain_dropped_by_higher_timestamp() {
941        let mut selector = FlatLastTimestampSelector::default();
942        let batches = vec![
943            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
944            new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
945            new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
946        ];
947        let results = collect_flat_results(&mut selector, batches);
948        assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
949    }
950
951    #[test]
952    fn test_flat_finish_is_one_shot() {
953        let mut selector = FlatLastTimestampSelector::default();
954        let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
955        let mut output_buffer = BatchBuffer::new();
956
957        // Feed one batch: completed keys can be emitted before EOF.
958        selector.on_next(batch, &mut output_buffer).unwrap();
959        let mut pre_finish = Vec::new();
960        for r in output_buffer.batches.drain(..) {
961            extract_flat_rows(&r, &mut pre_finish);
962        }
963        output_buffer.num_rows = 0;
964        assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
965
966        // Simulate EOF by calling finish().
967        selector.finish(&mut output_buffer).unwrap();
968        assert!(!output_buffer.is_empty());
969        output_buffer.batches.clear();
970        output_buffer.num_rows = 0;
971
972        // A second finish after EOF should not yield any more rows.
973        selector.finish(&mut output_buffer).unwrap();
974        assert!(output_buffer.is_empty());
975    }
976}