Skip to main content

mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use futures::{Stream, TryStreamExt};
25use snafu::ResultExt;
26use store_api::storage::{FileId, TimeSeriesRowSelector};
27
28use crate::cache::{
29    CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
30    selector_result_cache_hit, selector_result_cache_miss,
31};
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
34use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
35use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
36use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
37use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
38use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
39
40/// Reader to keep the last row for each time series.
41/// It assumes that batches from the input reader are
42/// - sorted
43/// - all deleted rows has been filtered.
44/// - not empty
45///
46/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
47/// it focus on time series (the same key).
48#[allow(dead_code)]
49pub(crate) struct LastRowReader {
50    /// Inner reader.
51    reader: BoxedBatchReader,
52    /// The last batch pending to return.
53    selector: LastRowSelector,
54}
55
56#[allow(dead_code)]
57impl LastRowReader {
58    /// Creates a new `LastRowReader`.
59    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
60        Self {
61            reader,
62            selector: LastRowSelector::default(),
63        }
64    }
65
66    /// Returns the last row of the next key.
67    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
68        while let Some(batch) = self.reader.next_batch().await? {
69            if let Some(yielded) = self.selector.on_next(batch) {
70                return Ok(Some(yielded));
71            }
72        }
73        Ok(self.selector.finish())
74    }
75}
76
77#[async_trait]
78impl BatchReader for LastRowReader {
79    async fn next_batch(&mut self) -> Result<Option<Batch>> {
80        self.next_last_row().await
81    }
82}
83
84/// Cached last row reader for specific row group.
85/// If the last rows for current row group are already cached, this reader returns the cached value.
86/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
87/// upon finish.
88pub(crate) enum RowGroupLastRowCachedReader {
89    /// Cache hit, reads last rows from cached value.
90    Hit(LastRowCacheReader),
91    /// Cache miss, reads from row group reader and update cache.
92    Miss(RowGroupLastRowReader),
93}
94
95impl RowGroupLastRowCachedReader {
96    pub(crate) fn new(
97        file_id: FileId,
98        row_group_idx: usize,
99        cache_strategy: CacheStrategy,
100        row_group_reader: RowGroupReader,
101    ) -> Self {
102        let key = SelectorResultKey {
103            file_id,
104            row_group_idx,
105            selector: TimeSeriesRowSelector::LastRow,
106        };
107
108        if let Some(value) = cache_strategy.get_selector_result(&key) {
109            let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
110            let schema_matches =
111                value.projection == row_group_reader.read_format().projection_indices();
112            if is_primary_key && schema_matches {
113                // Format and schema match, use cache batches.
114                Self::new_hit(value)
115            } else {
116                Self::new_miss(key, row_group_reader, cache_strategy)
117            }
118        } else {
119            Self::new_miss(key, row_group_reader, cache_strategy)
120        }
121    }
122
123    /// Gets the underlying reader metrics if uncached.
124    pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
125        match self {
126            RowGroupLastRowCachedReader::Hit(_) => None,
127            RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
128        }
129    }
130
131    /// Creates new Hit variant and updates metrics.
132    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
133        selector_result_cache_hit();
134        Self::Hit(LastRowCacheReader { value, idx: 0 })
135    }
136
137    /// Creates new Miss variant and updates metrics.
138    fn new_miss(
139        key: SelectorResultKey,
140        row_group_reader: RowGroupReader,
141        cache_strategy: CacheStrategy,
142    ) -> Self {
143        selector_result_cache_miss();
144        Self::Miss(RowGroupLastRowReader::new(
145            key,
146            row_group_reader,
147            cache_strategy,
148        ))
149    }
150}
151
152#[async_trait]
153impl BatchReader for RowGroupLastRowCachedReader {
154    async fn next_batch(&mut self) -> Result<Option<Batch>> {
155        match self {
156            RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
157            RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
158        }
159    }
160}
161
162/// Last row reader that returns the cached last rows for row group.
163pub(crate) struct LastRowCacheReader {
164    value: Arc<SelectorResultValue>,
165    idx: usize,
166}
167
168impl LastRowCacheReader {
169    /// Iterates cached last rows.
170    async fn next_batch(&mut self) -> Result<Option<Batch>> {
171        let batches = match &self.value.result {
172            SelectorResult::PrimaryKey(batches) => batches,
173            SelectorResult::Flat(_) => unreachable!(),
174        };
175        if self.idx < batches.len() {
176            let res = Ok(Some(batches[self.idx].clone()));
177            self.idx += 1;
178            res
179        } else {
180            Ok(None)
181        }
182    }
183}
184
185pub(crate) struct RowGroupLastRowReader {
186    key: SelectorResultKey,
187    reader: RowGroupReader,
188    selector: LastRowSelector,
189    yielded_batches: Vec<Batch>,
190    cache_strategy: CacheStrategy,
191    /// Index buffer to take a new batch from the last row.
192    take_index: UInt32Vector,
193}
194
195impl RowGroupLastRowReader {
196    fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
197        Self {
198            key,
199            reader,
200            selector: LastRowSelector::default(),
201            yielded_batches: vec![],
202            cache_strategy,
203            take_index: UInt32Vector::from_vec(vec![0]),
204        }
205    }
206
207    async fn next_batch(&mut self) -> Result<Option<Batch>> {
208        while let Some(batch) = self.reader.next_batch().await? {
209            if let Some(yielded) = self.selector.on_next(batch) {
210                push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
211                return Ok(Some(yielded));
212            }
213        }
214        let last_batch = if let Some(last_batch) = self.selector.finish() {
215            push_yielded_batches(
216                last_batch.clone(),
217                &self.take_index,
218                &mut self.yielded_batches,
219            )?;
220            Some(last_batch)
221        } else {
222            None
223        };
224
225        // All last rows in row group are yielded, update cache.
226        self.maybe_update_cache();
227        Ok(last_batch)
228    }
229
230    /// Updates row group's last row cache if cache manager is present.
231    fn maybe_update_cache(&mut self) {
232        if self.yielded_batches.is_empty() {
233            // we always expect that row groups yields batches.
234            return;
235        }
236        let value = Arc::new(SelectorResultValue::new(
237            std::mem::take(&mut self.yielded_batches),
238            self.reader.read_format().projection_indices().to_vec(),
239        ));
240        self.cache_strategy.put_selector_result(self.key, value);
241    }
242
243    fn metrics(&self) -> &ReaderMetrics {
244        self.reader.metrics()
245    }
246}
247
248/// Push last row into `yielded_batches`.
249fn push_yielded_batches(
250    mut batch: Batch,
251    take_index: &UInt32Vector,
252    yielded_batches: &mut Vec<Batch>,
253) -> Result<()> {
254    assert_eq!(1, batch.num_rows());
255    batch.take_in_place(take_index)?;
256    yielded_batches.push(batch);
257
258    Ok(())
259}
260
261/// Common struct that selects only the last row of each time series.
262#[derive(Default)]
263pub struct LastRowSelector {
264    last_batch: Option<Batch>,
265}
266
267impl LastRowSelector {
268    /// Handles next batch. Return the yielding batch if present.
269    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
270        if let Some(last) = &self.last_batch {
271            if last.primary_key() == batch.primary_key() {
272                // Same key, update last batch.
273                self.last_batch = Some(batch);
274                None
275            } else {
276                // Different key, return the last row in `last` and update `last_batch` by
277                // current batch.
278                debug_assert!(!last.is_empty());
279                let last_row = last.slice(last.num_rows() - 1, 1);
280                self.last_batch = Some(batch);
281                Some(last_row)
282            }
283        } else {
284            self.last_batch = Some(batch);
285            None
286        }
287    }
288
289    /// Finishes the selector and returns the pending batch if any.
290    pub fn finish(&mut self) -> Option<Batch> {
291        if let Some(last) = self.last_batch.take() {
292            // This is the last key.
293            let last_row = last.slice(last.num_rows() - 1, 1);
294            return Some(last_row);
295        }
296        None
297    }
298}
299
300/// Cached last row reader for flat format row group.
301/// If the last rows are already cached (as flat `RecordBatch`), returns cached values.
302/// Otherwise, reads from the row group, selects last rows, and updates the cache.
303pub(crate) enum FlatRowGroupLastRowCachedReader {
304    /// Cache hit, reads last rows from cached value.
305    Hit(FlatLastRowCacheReader),
306    /// Cache miss, reads from row group reader and updates cache.
307    Miss(FlatRowGroupLastRowReader),
308}
309
310impl FlatRowGroupLastRowCachedReader {
311    pub(crate) fn new(
312        file_id: FileId,
313        row_group_idx: usize,
314        cache_strategy: CacheStrategy,
315        projection: &[usize],
316        reader: FlatRowGroupReader,
317    ) -> Self {
318        let key = SelectorResultKey {
319            file_id,
320            row_group_idx,
321            selector: TimeSeriesRowSelector::LastRow,
322        };
323
324        if let Some(value) = cache_strategy.get_selector_result(&key) {
325            let is_flat = matches!(&value.result, SelectorResult::Flat(_));
326            let schema_matches = value.projection == projection;
327            if is_flat && schema_matches {
328                Self::new_hit(value)
329            } else {
330                Self::new_miss(key, projection, reader, cache_strategy)
331            }
332        } else {
333            Self::new_miss(key, projection, reader, cache_strategy)
334        }
335    }
336
337    /// Returns the next RecordBatch.
338    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
339        match self {
340            FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
341            FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
342        }
343    }
344
345    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
346        selector_result_cache_hit();
347        Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
348    }
349
350    fn new_miss(
351        key: SelectorResultKey,
352        projection: &[usize],
353        reader: FlatRowGroupReader,
354        cache_strategy: CacheStrategy,
355    ) -> Self {
356        selector_result_cache_miss();
357        Self::Miss(FlatRowGroupLastRowReader::new(
358            key,
359            projection.to_vec(),
360            reader,
361            cache_strategy,
362        ))
363    }
364}
365
366/// Iterates over cached flat last rows.
367pub(crate) struct FlatLastRowCacheReader {
368    value: Arc<SelectorResultValue>,
369    idx: usize,
370}
371
372impl FlatLastRowCacheReader {
373    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
374        let batches = match &self.value.result {
375            SelectorResult::Flat(batches) => batches,
376            SelectorResult::PrimaryKey(_) => unreachable!(),
377        };
378        if self.idx < batches.len() {
379            let res = Ok(Some(batches[self.idx].clone()));
380            self.idx += 1;
381            res
382        } else {
383            Ok(None)
384        }
385    }
386}
387
388/// Buffer that accumulates small `RecordBatch`es and tracks total row count.
389pub(crate) struct BatchBuffer {
390    batches: Vec<RecordBatch>,
391    num_rows: usize,
392}
393
394impl BatchBuffer {
395    fn new() -> Self {
396        Self {
397            batches: Vec::new(),
398            num_rows: 0,
399        }
400    }
401
402    /// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`.
403    fn is_full(&self) -> bool {
404        self.num_rows >= DEFAULT_READ_BATCH_SIZE
405    }
406
407    /// Extends the buffer from a slice of batches.
408    fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
409        for batch in batches {
410            self.num_rows += batch.num_rows();
411        }
412        self.batches.extend_from_slice(batches);
413    }
414
415    /// Returns true if the buffer has no batches.
416    fn is_empty(&self) -> bool {
417        self.batches.is_empty()
418    }
419
420    /// Concatenates all buffered batches into one, resets the buffer, and returns the result.
421    fn concat(&mut self) -> Result<RecordBatch> {
422        debug_assert!(!self.batches.is_empty());
423        let schema = self.batches[0].schema();
424        let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
425        self.batches.clear();
426        self.num_rows = 0;
427        Ok(merged)
428    }
429}
430
431/// Reads last rows from a flat format row group and caches the results.
432pub(crate) struct FlatRowGroupLastRowReader {
433    key: SelectorResultKey,
434    reader: FlatRowGroupReader,
435    selector: FlatLastTimestampSelector,
436    yielded_batches: Vec<RecordBatch>,
437    cache_strategy: CacheStrategy,
438    projection: Vec<usize>,
439    /// Accumulates small selector-output batches before concatenating.
440    pending: BatchBuffer,
441}
442
443impl FlatRowGroupLastRowReader {
444    fn new(
445        key: SelectorResultKey,
446        projection: Vec<usize>,
447        reader: FlatRowGroupReader,
448        cache_strategy: CacheStrategy,
449    ) -> Self {
450        Self {
451            key,
452            reader,
453            selector: FlatLastTimestampSelector::default(),
454            yielded_batches: vec![],
455            cache_strategy,
456            projection,
457            pending: BatchBuffer::new(),
458        }
459    }
460
461    /// Concatenates pending batches and records the result in `yielded_batches`.
462    fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
463        if self.pending.is_empty() {
464            return Ok(None);
465        }
466        let merged = self.pending.concat()?;
467        self.yielded_batches.push(merged.clone());
468        Ok(Some(merged))
469    }
470
471    async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
472        if self.pending.is_full() {
473            return self.flush_pending();
474        }
475
476        while let Some(batch) = self.reader.next_batch().await? {
477            self.selector.on_next(batch, &mut self.pending)?;
478            if self.pending.is_full() {
479                return self.flush_pending();
480            }
481        }
482
483        // Reader exhausted — flush remaining selector state.
484        self.selector.finish(&mut self.pending)?;
485        if !self.pending.is_empty() {
486            let result = self.flush_pending();
487            // All last rows in row group are yielded, update cache.
488            self.maybe_update_cache();
489            return result;
490        }
491
492        // All last rows in row group are yielded, update cache.
493        self.maybe_update_cache();
494        Ok(None)
495    }
496
497    fn maybe_update_cache(&mut self) {
498        if self.yielded_batches.is_empty() {
499            return;
500        }
501        let batches = std::mem::take(&mut self.yielded_batches);
502        let value = Arc::new(SelectorResultValue::new_flat(
503            batches,
504            self.projection.clone(),
505        ));
506        self.cache_strategy.put_selector_result(self.key, value);
507    }
508}
509
510/// Selects the last-timestamp rows per primary key from flat `RecordBatch`.
511///
512/// Assumes that input batches are sorted by primary key then by timestamp,
513/// and contain only PUT operations (no DELETE).
514#[derive(Default)]
515pub(crate) struct FlatLastTimestampSelector {
516    /// State for the currently in-progress primary key.
517    current_key: Option<LastKeyState>,
518}
519
520#[derive(Debug)]
521struct LastKeyState {
522    key: Vec<u8>,
523    last_timestamp: i64,
524    slices: Vec<RecordBatch>,
525}
526
527impl LastKeyState {
528    fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
529        Self {
530            key,
531            last_timestamp,
532            slices: vec![first_slice],
533        }
534    }
535}
536
537impl FlatLastTimestampSelector {
538    /// Processes the next batch and appends completed-key results into `output_buffer`.
539    pub(crate) fn on_next(
540        &mut self,
541        batch: RecordBatch,
542        output_buffer: &mut BatchBuffer,
543    ) -> Result<()> {
544        if batch.num_rows() == 0 {
545            return Ok(());
546        }
547
548        let num_columns = batch.num_columns();
549        let pk_col_idx = primary_key_column_index(num_columns);
550        let ts_col_idx = time_index_column_index(num_columns);
551
552        let pk_array = batch
553            .column(pk_col_idx)
554            .as_any()
555            .downcast_ref::<PrimaryKeyArray>()
556            .unwrap();
557        let offsets = primary_key_offsets(pk_array)?;
558        if offsets.is_empty() {
559            return Ok(());
560        }
561
562        let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
563        for i in 0..offsets.len() - 1 {
564            let range_start = offsets[i];
565            let range_end = offsets[i + 1];
566            let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
567            let range_last_ts = ts_values[range_end - 1];
568            let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
569            let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
570
571            match self.current_key.as_mut() {
572                Some(state) if state.key.as_slice() == range_key => {
573                    if range_last_ts > state.last_timestamp {
574                        state.last_timestamp = range_last_ts;
575                        state.slices.clear();
576                        state.slices.push(range_slice);
577                    } else if range_last_ts == state.last_timestamp {
578                        state.slices.push(range_slice);
579                    }
580                }
581                Some(_) => {
582                    self.flush_current_key(output_buffer);
583                    self.current_key = Some(LastKeyState::new(
584                        range_key.to_vec(),
585                        range_last_ts,
586                        range_slice,
587                    ));
588                }
589                None => {
590                    self.current_key = Some(LastKeyState::new(
591                        range_key.to_vec(),
592                        range_last_ts,
593                        range_slice,
594                    ));
595                }
596            }
597        }
598
599        Ok(())
600    }
601
602    /// Finishes the selector and appends remaining results into `output_buffer`.
603    pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
604        self.flush_current_key(output_buffer);
605        Ok(())
606    }
607
608    fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
609        let Some(state) = self.current_key.take() else {
610            return;
611        };
612        output_buffer.extend_from_slice(&state.slices);
613    }
614}
615
616/// Reader that keeps only the last row of each time series from a flat RecordBatch stream.
617/// Assumes input is sorted, deduped, and contains no delete operations.
618pub(crate) struct FlatLastRowReader {
619    stream: BoxedRecordBatchStream,
620    selector: FlatLastTimestampSelector,
621    pending: BatchBuffer,
622}
623
624impl FlatLastRowReader {
625    /// Creates a new `FlatLastRowReader`.
626    pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self {
627        Self {
628            stream,
629            selector: FlatLastTimestampSelector::default(),
630            pending: BatchBuffer::new(),
631        }
632    }
633
634    /// Converts the reader into a stream of RecordBatches.
635    pub(crate) fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
636        async_stream::try_stream! {
637            while let Some(batch) = self.stream.try_next().await? {
638                self.selector.on_next(batch, &mut self.pending)?;
639                if self.pending.is_full() {
640                    yield self.pending.concat()?;
641                }
642            }
643            self.selector.finish(&mut self.pending)?;
644            if !self.pending.is_empty() {
645                yield self.pending.concat()?;
646            }
647        }
648    }
649}
650
651/// Gets the primary key bytes at `index` from the primary key dictionary column.
652fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
653    let pk_dict = batch
654        .column(pk_col_idx)
655        .as_any()
656        .downcast_ref::<PrimaryKeyArray>()
657        .unwrap();
658    let key = pk_dict.keys().value(index);
659    let binary_values = pk_dict
660        .values()
661        .as_any()
662        .downcast_ref::<BinaryArray>()
663        .unwrap();
664    binary_values.value(key as usize)
665}
666
667/// Finds the start index of rows sharing the last (maximum) timestamp
668/// within the range `[range_start, range_end)`.
669fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
670    debug_assert!(range_start < range_end);
671
672    let last_ts = ts_values[range_end - 1];
673    let mut start = range_end - 1;
674    while start > range_start && ts_values[start - 1] == last_ts {
675        start -= 1;
676    }
677    start
678}
679
680#[cfg(test)]
681mod tests {
682    use std::sync::Arc;
683
684    use api::v1::OpType;
685    use datatypes::arrow::array::{
686        ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
687        UInt64Array,
688    };
689    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
690    use datatypes::arrow::record_batch::RecordBatch;
691
692    use super::*;
693    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
694
695    #[tokio::test]
696    async fn test_last_row_one_batch() {
697        let input = [new_batch(
698            b"k1",
699            &[1, 2],
700            &[11, 11],
701            &[OpType::Put, OpType::Put],
702            &[21, 22],
703        )];
704        let reader = VecBatchReader::new(&input);
705        let mut reader = LastRowReader::new(Box::new(reader));
706        check_reader_result(
707            &mut reader,
708            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
709        )
710        .await;
711
712        // Only one row.
713        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
714        let reader = VecBatchReader::new(&input);
715        let mut reader = LastRowReader::new(Box::new(reader));
716        check_reader_result(
717            &mut reader,
718            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
719        )
720        .await;
721    }
722
723    #[tokio::test]
724    async fn test_last_row_multi_batch() {
725        let input = [
726            new_batch(
727                b"k1",
728                &[1, 2],
729                &[11, 11],
730                &[OpType::Put, OpType::Put],
731                &[21, 22],
732            ),
733            new_batch(
734                b"k1",
735                &[3, 4],
736                &[11, 11],
737                &[OpType::Put, OpType::Put],
738                &[23, 24],
739            ),
740            new_batch(
741                b"k2",
742                &[1, 2],
743                &[11, 11],
744                &[OpType::Put, OpType::Put],
745                &[31, 32],
746            ),
747        ];
748        let reader = VecBatchReader::new(&input);
749        let mut reader = LastRowReader::new(Box::new(reader));
750        check_reader_result(
751            &mut reader,
752            &[
753                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
754                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
755            ],
756        )
757        .await;
758    }
759
760    /// Helper to build a flat format RecordBatch for testing.
761    fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
762        let num_rows = timestamps.len();
763        assert_eq!(primary_keys.len(), num_rows);
764        assert_eq!(fields.len(), num_rows);
765
766        let columns: Vec<ArrayRef> = vec![
767            // field0 column
768            Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
769            // ts column (time index)
770            Arc::new(TimestampMillisecondArray::from_iter_values(
771                timestamps.iter().copied(),
772            )),
773            // __primary_key column (dictionary(uint32, binary))
774            {
775                let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
776                for &pk in primary_keys {
777                    builder.append(pk).unwrap();
778                }
779                Arc::new(builder.finish())
780            },
781            // __sequence column
782            Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
783            // __op_type column
784            Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
785        ];
786
787        RecordBatch::try_new(test_flat_schema(), columns).unwrap()
788    }
789
790    fn test_flat_schema() -> SchemaRef {
791        let fields = vec![
792            Field::new("field0", DataType::Int64, false),
793            Field::new(
794                "ts",
795                DataType::Timestamp(TimeUnit::Millisecond, None),
796                false,
797            ),
798            Field::new(
799                "__primary_key",
800                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
801                false,
802            ),
803            Field::new("__sequence", DataType::UInt64, false),
804            Field::new("__op_type", DataType::UInt8, false),
805        ];
806        Arc::new(Schema::new(fields))
807    }
808
809    /// Collects all rows from the selector across all result batches.
810    fn collect_flat_results(
811        selector: &mut FlatLastTimestampSelector,
812        batches: Vec<RecordBatch>,
813    ) -> Vec<(Vec<u8>, i64)> {
814        let mut output_buffer = BatchBuffer::new();
815        let mut results = Vec::new();
816        for batch in batches {
817            selector.on_next(batch, &mut output_buffer).unwrap();
818            for r in output_buffer.batches.drain(..) {
819                extract_flat_rows(&r, &mut results);
820            }
821            output_buffer.num_rows = 0;
822        }
823        selector.finish(&mut output_buffer).unwrap();
824        for r in output_buffer.batches.drain(..) {
825            extract_flat_rows(&r, &mut results);
826        }
827        results
828    }
829
830    /// Extracts (primary_key, timestamp) pairs from a result batch.
831    fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
832        let ts_col = batch
833            .column(1)
834            .as_any()
835            .downcast_ref::<TimestampMillisecondArray>()
836            .unwrap();
837        let pk_col = batch
838            .column(2)
839            .as_any()
840            .downcast_ref::<PrimaryKeyArray>()
841            .unwrap();
842        let binary_values = pk_col
843            .values()
844            .as_any()
845            .downcast_ref::<BinaryArray>()
846            .unwrap();
847
848        for i in 0..batch.num_rows() {
849            let key_idx = pk_col.keys().value(i);
850            let pk = binary_values.value(key_idx as usize).to_vec();
851            let ts = ts_col.value(i);
852            out.push((pk, ts));
853        }
854    }
855
856    #[test]
857    fn test_flat_single_batch_one_key() {
858        let mut selector = FlatLastTimestampSelector::default();
859        let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
860        let results = collect_flat_results(&mut selector, vec![batch]);
861        assert_eq!(vec![(b"k1".to_vec(), 3)], results);
862    }
863
864    #[test]
865    fn test_flat_single_batch_multiple_keys() {
866        let mut selector = FlatLastTimestampSelector::default();
867        let batch = new_flat_batch(
868            &[b"k1", b"k1", b"k2", b"k2", b"k3"],
869            &[1, 2, 3, 4, 5],
870            &[10, 20, 30, 40, 50],
871        );
872        let results = collect_flat_results(&mut selector, vec![batch]);
873        assert_eq!(
874            vec![
875                (b"k1".to_vec(), 2),
876                (b"k2".to_vec(), 4),
877                (b"k3".to_vec(), 5),
878            ],
879            results
880        );
881    }
882
883    #[test]
884    fn test_flat_key_spans_batches() {
885        let mut selector = FlatLastTimestampSelector::default();
886        let batches = vec![
887            new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
888            new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
889            new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
890        ];
891        let results = collect_flat_results(&mut selector, batches);
892        assert_eq!(
893            vec![
894                (b"k1".to_vec(), 3),
895                (b"k2".to_vec(), 5),
896                (b"k3".to_vec(), 6),
897            ],
898            results
899        );
900    }
901
902    #[test]
903    fn test_flat_duplicate_last_timestamps() {
904        let mut selector = FlatLastTimestampSelector::default();
905        // k1 has two rows with the same last timestamp (3).
906        let batch = new_flat_batch(
907            &[b"k1", b"k1", b"k1", b"k2"],
908            &[1, 3, 3, 5],
909            &[10, 20, 30, 40],
910        );
911        let results = collect_flat_results(&mut selector, vec![batch]);
912        assert_eq!(
913            vec![
914                (b"k1".to_vec(), 3),
915                (b"k1".to_vec(), 3),
916                (b"k2".to_vec(), 5),
917            ],
918            results
919        );
920    }
921
922    #[test]
923    fn test_flat_duplicate_last_timestamps_across_batches() {
924        let mut selector = FlatLastTimestampSelector::default();
925        // k1's last timestamp (3) spans two batches.
926        let batches = vec![
927            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
928            new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
929        ];
930        let results = collect_flat_results(&mut selector, batches);
931        assert_eq!(
932            vec![
933                (b"k1".to_vec(), 3),
934                (b"k1".to_vec(), 3),
935                (b"k2".to_vec(), 5),
936            ],
937            results
938        );
939    }
940
941    #[test]
942    fn test_flat_pending_chain_dropped_by_higher_timestamp() {
943        let mut selector = FlatLastTimestampSelector::default();
944        let batches = vec![
945            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
946            new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
947            new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
948        ];
949        let results = collect_flat_results(&mut selector, batches);
950        assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
951    }
952
953    #[test]
954    fn test_flat_finish_is_one_shot() {
955        let mut selector = FlatLastTimestampSelector::default();
956        let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
957        let mut output_buffer = BatchBuffer::new();
958
959        // Feed one batch: completed keys can be emitted before EOF.
960        selector.on_next(batch, &mut output_buffer).unwrap();
961        let mut pre_finish = Vec::new();
962        for r in output_buffer.batches.drain(..) {
963            extract_flat_rows(&r, &mut pre_finish);
964        }
965        output_buffer.num_rows = 0;
966        assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
967
968        // Simulate EOF by calling finish().
969        selector.finish(&mut output_buffer).unwrap();
970        assert!(!output_buffer.is_empty());
971        output_buffer.batches.clear();
972        output_buffer.num_rows = 0;
973
974        // A second finish after EOF should not yield any more rows.
975        selector.finish(&mut output_buffer).unwrap();
976        assert!(output_buffer.is_empty());
977    }
978}