mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::arrow::array::{Array, BinaryArray};
21use datatypes::arrow::compute::concat_batches;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::UInt32Vector;
24use snafu::ResultExt;
25use store_api::storage::{FileId, TimeSeriesRowSelector};
26
27use crate::cache::{
28    CacheStrategy, SelectorResult, SelectorResultKey, SelectorResultValue,
29    selector_result_cache_hit, selector_result_cache_miss,
30};
31use crate::error::{ComputeArrowSnafu, Result};
32use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
33use crate::read::{Batch, BatchReader, BoxedBatchReader};
34use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
35use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
36use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
37use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
38
39/// Reader to keep the last row for each time series.
40/// It assumes that batches from the input reader are
41/// - sorted
42/// - all deleted rows has been filtered.
43/// - not empty
44///
45/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
46/// it focus on time series (the same key).
47pub(crate) struct LastRowReader {
48    /// Inner reader.
49    reader: BoxedBatchReader,
50    /// The last batch pending to return.
51    selector: LastRowSelector,
52}
53
54impl LastRowReader {
55    /// Creates a new `LastRowReader`.
56    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
57        Self {
58            reader,
59            selector: LastRowSelector::default(),
60        }
61    }
62
63    /// Returns the last row of the next key.
64    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
65        while let Some(batch) = self.reader.next_batch().await? {
66            if let Some(yielded) = self.selector.on_next(batch) {
67                return Ok(Some(yielded));
68            }
69        }
70        Ok(self.selector.finish())
71    }
72}
73
74#[async_trait]
75impl BatchReader for LastRowReader {
76    async fn next_batch(&mut self) -> Result<Option<Batch>> {
77        self.next_last_row().await
78    }
79}
80
81/// Cached last row reader for specific row group.
82/// If the last rows for current row group are already cached, this reader returns the cached value.
83/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
84/// upon finish.
85pub(crate) enum RowGroupLastRowCachedReader {
86    /// Cache hit, reads last rows from cached value.
87    Hit(LastRowCacheReader),
88    /// Cache miss, reads from row group reader and update cache.
89    Miss(RowGroupLastRowReader),
90}
91
92impl RowGroupLastRowCachedReader {
93    pub(crate) fn new(
94        file_id: FileId,
95        row_group_idx: usize,
96        cache_strategy: CacheStrategy,
97        row_group_reader: RowGroupReader,
98    ) -> Self {
99        let key = SelectorResultKey {
100            file_id,
101            row_group_idx,
102            selector: TimeSeriesRowSelector::LastRow,
103        };
104
105        if let Some(value) = cache_strategy.get_selector_result(&key) {
106            let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
107            let schema_matches =
108                value.projection == row_group_reader.read_format().projection_indices();
109            if is_primary_key && schema_matches {
110                // Format and schema match, use cache batches.
111                Self::new_hit(value)
112            } else {
113                Self::new_miss(key, row_group_reader, cache_strategy)
114            }
115        } else {
116            Self::new_miss(key, row_group_reader, cache_strategy)
117        }
118    }
119
120    /// Gets the underlying reader metrics if uncached.
121    pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
122        match self {
123            RowGroupLastRowCachedReader::Hit(_) => None,
124            RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
125        }
126    }
127
128    /// Creates new Hit variant and updates metrics.
129    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
130        selector_result_cache_hit();
131        Self::Hit(LastRowCacheReader { value, idx: 0 })
132    }
133
134    /// Creates new Miss variant and updates metrics.
135    fn new_miss(
136        key: SelectorResultKey,
137        row_group_reader: RowGroupReader,
138        cache_strategy: CacheStrategy,
139    ) -> Self {
140        selector_result_cache_miss();
141        Self::Miss(RowGroupLastRowReader::new(
142            key,
143            row_group_reader,
144            cache_strategy,
145        ))
146    }
147}
148
149#[async_trait]
150impl BatchReader for RowGroupLastRowCachedReader {
151    async fn next_batch(&mut self) -> Result<Option<Batch>> {
152        match self {
153            RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
154            RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
155        }
156    }
157}
158
159/// Last row reader that returns the cached last rows for row group.
160pub(crate) struct LastRowCacheReader {
161    value: Arc<SelectorResultValue>,
162    idx: usize,
163}
164
165impl LastRowCacheReader {
166    /// Iterates cached last rows.
167    async fn next_batch(&mut self) -> Result<Option<Batch>> {
168        let batches = match &self.value.result {
169            SelectorResult::PrimaryKey(batches) => batches,
170            SelectorResult::Flat(_) => unreachable!(),
171        };
172        if self.idx < batches.len() {
173            let res = Ok(Some(batches[self.idx].clone()));
174            self.idx += 1;
175            res
176        } else {
177            Ok(None)
178        }
179    }
180}
181
182pub(crate) struct RowGroupLastRowReader {
183    key: SelectorResultKey,
184    reader: RowGroupReader,
185    selector: LastRowSelector,
186    yielded_batches: Vec<Batch>,
187    cache_strategy: CacheStrategy,
188    /// Index buffer to take a new batch from the last row.
189    take_index: UInt32Vector,
190}
191
192impl RowGroupLastRowReader {
193    fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
194        Self {
195            key,
196            reader,
197            selector: LastRowSelector::default(),
198            yielded_batches: vec![],
199            cache_strategy,
200            take_index: UInt32Vector::from_vec(vec![0]),
201        }
202    }
203
204    async fn next_batch(&mut self) -> Result<Option<Batch>> {
205        while let Some(batch) = self.reader.next_batch().await? {
206            if let Some(yielded) = self.selector.on_next(batch) {
207                push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
208                return Ok(Some(yielded));
209            }
210        }
211        let last_batch = if let Some(last_batch) = self.selector.finish() {
212            push_yielded_batches(
213                last_batch.clone(),
214                &self.take_index,
215                &mut self.yielded_batches,
216            )?;
217            Some(last_batch)
218        } else {
219            None
220        };
221
222        // All last rows in row group are yielded, update cache.
223        self.maybe_update_cache();
224        Ok(last_batch)
225    }
226
227    /// Updates row group's last row cache if cache manager is present.
228    fn maybe_update_cache(&mut self) {
229        if self.yielded_batches.is_empty() {
230            // we always expect that row groups yields batches.
231            return;
232        }
233        let value = Arc::new(SelectorResultValue::new(
234            std::mem::take(&mut self.yielded_batches),
235            self.reader.read_format().projection_indices().to_vec(),
236        ));
237        self.cache_strategy.put_selector_result(self.key, value);
238    }
239
240    fn metrics(&self) -> &ReaderMetrics {
241        self.reader.metrics()
242    }
243}
244
245/// Push last row into `yielded_batches`.
246fn push_yielded_batches(
247    mut batch: Batch,
248    take_index: &UInt32Vector,
249    yielded_batches: &mut Vec<Batch>,
250) -> Result<()> {
251    assert_eq!(1, batch.num_rows());
252    batch.take_in_place(take_index)?;
253    yielded_batches.push(batch);
254
255    Ok(())
256}
257
258/// Common struct that selects only the last row of each time series.
259#[derive(Default)]
260pub struct LastRowSelector {
261    last_batch: Option<Batch>,
262}
263
264impl LastRowSelector {
265    /// Handles next batch. Return the yielding batch if present.
266    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
267        if let Some(last) = &self.last_batch {
268            if last.primary_key() == batch.primary_key() {
269                // Same key, update last batch.
270                self.last_batch = Some(batch);
271                None
272            } else {
273                // Different key, return the last row in `last` and update `last_batch` by
274                // current batch.
275                debug_assert!(!last.is_empty());
276                let last_row = last.slice(last.num_rows() - 1, 1);
277                self.last_batch = Some(batch);
278                Some(last_row)
279            }
280        } else {
281            self.last_batch = Some(batch);
282            None
283        }
284    }
285
286    /// Finishes the selector and returns the pending batch if any.
287    pub fn finish(&mut self) -> Option<Batch> {
288        if let Some(last) = self.last_batch.take() {
289            // This is the last key.
290            let last_row = last.slice(last.num_rows() - 1, 1);
291            return Some(last_row);
292        }
293        None
294    }
295}
296
297/// Cached last row reader for flat format row group.
298/// If the last rows are already cached (as flat `RecordBatch`), returns cached values.
299/// Otherwise, reads from the row group, selects last rows, and updates the cache.
300pub(crate) enum FlatRowGroupLastRowCachedReader {
301    /// Cache hit, reads last rows from cached value.
302    Hit(FlatLastRowCacheReader),
303    /// Cache miss, reads from row group reader and updates cache.
304    Miss(FlatRowGroupLastRowReader),
305}
306
307impl FlatRowGroupLastRowCachedReader {
308    pub(crate) fn new(
309        file_id: FileId,
310        row_group_idx: usize,
311        cache_strategy: CacheStrategy,
312        projection: &[usize],
313        reader: FlatRowGroupReader,
314    ) -> Self {
315        let key = SelectorResultKey {
316            file_id,
317            row_group_idx,
318            selector: TimeSeriesRowSelector::LastRow,
319        };
320
321        if let Some(value) = cache_strategy.get_selector_result(&key) {
322            let is_flat = matches!(&value.result, SelectorResult::Flat(_));
323            let schema_matches = value.projection == projection;
324            if is_flat && schema_matches {
325                Self::new_hit(value)
326            } else {
327                Self::new_miss(key, projection, reader, cache_strategy)
328            }
329        } else {
330            Self::new_miss(key, projection, reader, cache_strategy)
331        }
332    }
333
334    /// Returns the next RecordBatch.
335    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
336        match self {
337            FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
338            FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
339        }
340    }
341
342    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
343        selector_result_cache_hit();
344        Self::Hit(FlatLastRowCacheReader { value, idx: 0 })
345    }
346
347    fn new_miss(
348        key: SelectorResultKey,
349        projection: &[usize],
350        reader: FlatRowGroupReader,
351        cache_strategy: CacheStrategy,
352    ) -> Self {
353        selector_result_cache_miss();
354        Self::Miss(FlatRowGroupLastRowReader::new(
355            key,
356            projection.to_vec(),
357            reader,
358            cache_strategy,
359        ))
360    }
361}
362
363/// Iterates over cached flat last rows.
364pub(crate) struct FlatLastRowCacheReader {
365    value: Arc<SelectorResultValue>,
366    idx: usize,
367}
368
369impl FlatLastRowCacheReader {
370    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
371        let batches = match &self.value.result {
372            SelectorResult::Flat(batches) => batches,
373            SelectorResult::PrimaryKey(_) => unreachable!(),
374        };
375        if self.idx < batches.len() {
376            let res = Ok(Some(batches[self.idx].clone()));
377            self.idx += 1;
378            res
379        } else {
380            Ok(None)
381        }
382    }
383}
384
385/// Buffer that accumulates small `RecordBatch`es and tracks total row count.
386pub(crate) struct BatchBuffer {
387    batches: Vec<RecordBatch>,
388    num_rows: usize,
389}
390
391impl BatchBuffer {
392    fn new() -> Self {
393        Self {
394            batches: Vec::new(),
395            num_rows: 0,
396        }
397    }
398
399    /// Returns true if total buffered rows reaches `DEFAULT_READ_BATCH_SIZE`.
400    fn is_full(&self) -> bool {
401        self.num_rows >= DEFAULT_READ_BATCH_SIZE
402    }
403
404    /// Extends the buffer from a slice of batches.
405    fn extend_from_slice(&mut self, batches: &[RecordBatch]) {
406        for batch in batches {
407            self.num_rows += batch.num_rows();
408        }
409        self.batches.extend_from_slice(batches);
410    }
411
412    /// Returns true if the buffer has no batches.
413    fn is_empty(&self) -> bool {
414        self.batches.is_empty()
415    }
416
417    /// Concatenates all buffered batches into one, resets the buffer, and returns the result.
418    fn concat(&mut self) -> Result<RecordBatch> {
419        debug_assert!(!self.batches.is_empty());
420        let schema = self.batches[0].schema();
421        let merged = concat_batches(&schema, &self.batches).context(ComputeArrowSnafu)?;
422        self.batches.clear();
423        self.num_rows = 0;
424        Ok(merged)
425    }
426}
427
428/// Reads last rows from a flat format row group and caches the results.
429pub(crate) struct FlatRowGroupLastRowReader {
430    key: SelectorResultKey,
431    reader: FlatRowGroupReader,
432    selector: FlatLastTimestampSelector,
433    yielded_batches: Vec<RecordBatch>,
434    cache_strategy: CacheStrategy,
435    projection: Vec<usize>,
436    /// Accumulates small selector-output batches before concatenating.
437    pending: BatchBuffer,
438}
439
440impl FlatRowGroupLastRowReader {
441    fn new(
442        key: SelectorResultKey,
443        projection: Vec<usize>,
444        reader: FlatRowGroupReader,
445        cache_strategy: CacheStrategy,
446    ) -> Self {
447        Self {
448            key,
449            reader,
450            selector: FlatLastTimestampSelector::default(),
451            yielded_batches: vec![],
452            cache_strategy,
453            projection,
454            pending: BatchBuffer::new(),
455        }
456    }
457
458    /// Concatenates pending batches and records the result in `yielded_batches`.
459    fn flush_pending(&mut self) -> Result<Option<RecordBatch>> {
460        if self.pending.is_empty() {
461            return Ok(None);
462        }
463        let merged = self.pending.concat()?;
464        self.yielded_batches.push(merged.clone());
465        Ok(Some(merged))
466    }
467
468    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
469        if self.pending.is_full() {
470            return self.flush_pending();
471        }
472
473        while let Some(batch) = self.reader.next_batch()? {
474            self.selector.on_next(batch, &mut self.pending)?;
475            if self.pending.is_full() {
476                return self.flush_pending();
477            }
478        }
479
480        // Reader exhausted — flush remaining selector state.
481        self.selector.finish(&mut self.pending)?;
482        if !self.pending.is_empty() {
483            let result = self.flush_pending();
484            // All last rows in row group are yielded, update cache.
485            self.maybe_update_cache();
486            return result;
487        }
488
489        // All last rows in row group are yielded, update cache.
490        self.maybe_update_cache();
491        Ok(None)
492    }
493
494    fn maybe_update_cache(&mut self) {
495        if self.yielded_batches.is_empty() {
496            return;
497        }
498        let batches = std::mem::take(&mut self.yielded_batches);
499        let value = Arc::new(SelectorResultValue::new_flat(
500            batches,
501            self.projection.clone(),
502        ));
503        self.cache_strategy.put_selector_result(self.key, value);
504    }
505}
506
507/// Selects the last-timestamp rows per primary key from flat `RecordBatch`.
508///
509/// Assumes that input batches are sorted by primary key then by timestamp,
510/// and contain only PUT operations (no DELETE).
511#[derive(Default)]
512pub(crate) struct FlatLastTimestampSelector {
513    /// State for the currently in-progress primary key.
514    current_key: Option<LastKeyState>,
515}
516
517#[derive(Debug)]
518struct LastKeyState {
519    key: Vec<u8>,
520    last_timestamp: i64,
521    slices: Vec<RecordBatch>,
522}
523
524impl LastKeyState {
525    fn new(key: Vec<u8>, last_timestamp: i64, first_slice: RecordBatch) -> Self {
526        Self {
527            key,
528            last_timestamp,
529            slices: vec![first_slice],
530        }
531    }
532}
533
534impl FlatLastTimestampSelector {
535    /// Processes the next batch and appends completed-key results into `output_buffer`.
536    pub(crate) fn on_next(
537        &mut self,
538        batch: RecordBatch,
539        output_buffer: &mut BatchBuffer,
540    ) -> Result<()> {
541        if batch.num_rows() == 0 {
542            return Ok(());
543        }
544
545        let num_columns = batch.num_columns();
546        let pk_col_idx = primary_key_column_index(num_columns);
547        let ts_col_idx = time_index_column_index(num_columns);
548
549        let pk_array = batch
550            .column(pk_col_idx)
551            .as_any()
552            .downcast_ref::<PrimaryKeyArray>()
553            .unwrap();
554        let offsets = primary_key_offsets(pk_array)?;
555        if offsets.is_empty() {
556            return Ok(());
557        }
558
559        let ts_values = timestamp_array_to_i64_slice(batch.column(ts_col_idx));
560        for i in 0..offsets.len() - 1 {
561            let range_start = offsets[i];
562            let range_end = offsets[i + 1];
563            let range_key = primary_key_bytes_at(&batch, pk_col_idx, range_start);
564            let range_last_ts = ts_values[range_end - 1];
565            let range_last_ts_start = last_timestamp_start(ts_values, range_start, range_end);
566            let range_slice = batch.slice(range_last_ts_start, range_end - range_last_ts_start);
567
568            match self.current_key.as_mut() {
569                Some(state) if state.key.as_slice() == range_key => {
570                    if range_last_ts > state.last_timestamp {
571                        state.last_timestamp = range_last_ts;
572                        state.slices.clear();
573                        state.slices.push(range_slice);
574                    } else if range_last_ts == state.last_timestamp {
575                        state.slices.push(range_slice);
576                    }
577                }
578                Some(_) => {
579                    self.flush_current_key(output_buffer);
580                    self.current_key = Some(LastKeyState::new(
581                        range_key.to_vec(),
582                        range_last_ts,
583                        range_slice,
584                    ));
585                }
586                None => {
587                    self.current_key = Some(LastKeyState::new(
588                        range_key.to_vec(),
589                        range_last_ts,
590                        range_slice,
591                    ));
592                }
593            }
594        }
595
596        Ok(())
597    }
598
599    /// Finishes the selector and appends remaining results into `output_buffer`.
600    pub(crate) fn finish(&mut self, output_buffer: &mut BatchBuffer) -> Result<()> {
601        self.flush_current_key(output_buffer);
602        Ok(())
603    }
604
605    fn flush_current_key(&mut self, output_buffer: &mut BatchBuffer) {
606        let Some(state) = self.current_key.take() else {
607            return;
608        };
609        output_buffer.extend_from_slice(&state.slices);
610    }
611}
612
613/// Gets the primary key bytes at `index` from the primary key dictionary column.
614fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] {
615    let pk_dict = batch
616        .column(pk_col_idx)
617        .as_any()
618        .downcast_ref::<PrimaryKeyArray>()
619        .unwrap();
620    let key = pk_dict.keys().value(index);
621    let binary_values = pk_dict
622        .values()
623        .as_any()
624        .downcast_ref::<BinaryArray>()
625        .unwrap();
626    binary_values.value(key as usize)
627}
628
629/// Finds the start index of rows sharing the last (maximum) timestamp
630/// within the range `[range_start, range_end)`.
631fn last_timestamp_start(ts_values: &[i64], range_start: usize, range_end: usize) -> usize {
632    debug_assert!(range_start < range_end);
633
634    let last_ts = ts_values[range_end - 1];
635    let mut start = range_end - 1;
636    while start > range_start && ts_values[start - 1] == last_ts {
637        start -= 1;
638    }
639    start
640}
641
642#[cfg(test)]
643mod tests {
644    use std::sync::Arc;
645
646    use api::v1::OpType;
647    use datatypes::arrow::array::{
648        ArrayRef, BinaryDictionaryBuilder, Int64Array, TimestampMillisecondArray, UInt8Array,
649        UInt64Array,
650    };
651    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
652    use datatypes::arrow::record_batch::RecordBatch;
653
654    use super::*;
655    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
656
657    #[tokio::test]
658    async fn test_last_row_one_batch() {
659        let input = [new_batch(
660            b"k1",
661            &[1, 2],
662            &[11, 11],
663            &[OpType::Put, OpType::Put],
664            &[21, 22],
665        )];
666        let reader = VecBatchReader::new(&input);
667        let mut reader = LastRowReader::new(Box::new(reader));
668        check_reader_result(
669            &mut reader,
670            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
671        )
672        .await;
673
674        // Only one row.
675        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
676        let reader = VecBatchReader::new(&input);
677        let mut reader = LastRowReader::new(Box::new(reader));
678        check_reader_result(
679            &mut reader,
680            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
681        )
682        .await;
683    }
684
685    #[tokio::test]
686    async fn test_last_row_multi_batch() {
687        let input = [
688            new_batch(
689                b"k1",
690                &[1, 2],
691                &[11, 11],
692                &[OpType::Put, OpType::Put],
693                &[21, 22],
694            ),
695            new_batch(
696                b"k1",
697                &[3, 4],
698                &[11, 11],
699                &[OpType::Put, OpType::Put],
700                &[23, 24],
701            ),
702            new_batch(
703                b"k2",
704                &[1, 2],
705                &[11, 11],
706                &[OpType::Put, OpType::Put],
707                &[31, 32],
708            ),
709        ];
710        let reader = VecBatchReader::new(&input);
711        let mut reader = LastRowReader::new(Box::new(reader));
712        check_reader_result(
713            &mut reader,
714            &[
715                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
716                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
717            ],
718        )
719        .await;
720    }
721
722    /// Helper to build a flat format RecordBatch for testing.
723    fn new_flat_batch(primary_keys: &[&[u8]], timestamps: &[i64], fields: &[i64]) -> RecordBatch {
724        let num_rows = timestamps.len();
725        assert_eq!(primary_keys.len(), num_rows);
726        assert_eq!(fields.len(), num_rows);
727
728        let columns: Vec<ArrayRef> = vec![
729            // field0 column
730            Arc::new(Int64Array::from_iter_values(fields.iter().copied())),
731            // ts column (time index)
732            Arc::new(TimestampMillisecondArray::from_iter_values(
733                timestamps.iter().copied(),
734            )),
735            // __primary_key column (dictionary(uint32, binary))
736            {
737                let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
738                for &pk in primary_keys {
739                    builder.append(pk).unwrap();
740                }
741                Arc::new(builder.finish())
742            },
743            // __sequence column
744            Arc::new(UInt64Array::from_iter_values(vec![1u64; num_rows])),
745            // __op_type column
746            Arc::new(UInt8Array::from_iter_values(vec![1u8; num_rows])),
747        ];
748
749        RecordBatch::try_new(test_flat_schema(), columns).unwrap()
750    }
751
752    fn test_flat_schema() -> SchemaRef {
753        let fields = vec![
754            Field::new("field0", DataType::Int64, false),
755            Field::new(
756                "ts",
757                DataType::Timestamp(TimeUnit::Millisecond, None),
758                false,
759            ),
760            Field::new(
761                "__primary_key",
762                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
763                false,
764            ),
765            Field::new("__sequence", DataType::UInt64, false),
766            Field::new("__op_type", DataType::UInt8, false),
767        ];
768        Arc::new(Schema::new(fields))
769    }
770
771    /// Collects all rows from the selector across all result batches.
772    fn collect_flat_results(
773        selector: &mut FlatLastTimestampSelector,
774        batches: Vec<RecordBatch>,
775    ) -> Vec<(Vec<u8>, i64)> {
776        let mut output_buffer = BatchBuffer::new();
777        let mut results = Vec::new();
778        for batch in batches {
779            selector.on_next(batch, &mut output_buffer).unwrap();
780            for r in output_buffer.batches.drain(..) {
781                extract_flat_rows(&r, &mut results);
782            }
783            output_buffer.num_rows = 0;
784        }
785        selector.finish(&mut output_buffer).unwrap();
786        for r in output_buffer.batches.drain(..) {
787            extract_flat_rows(&r, &mut results);
788        }
789        results
790    }
791
792    /// Extracts (primary_key, timestamp) pairs from a result batch.
793    fn extract_flat_rows(batch: &RecordBatch, out: &mut Vec<(Vec<u8>, i64)>) {
794        let ts_col = batch
795            .column(1)
796            .as_any()
797            .downcast_ref::<TimestampMillisecondArray>()
798            .unwrap();
799        let pk_col = batch
800            .column(2)
801            .as_any()
802            .downcast_ref::<PrimaryKeyArray>()
803            .unwrap();
804        let binary_values = pk_col
805            .values()
806            .as_any()
807            .downcast_ref::<BinaryArray>()
808            .unwrap();
809
810        for i in 0..batch.num_rows() {
811            let key_idx = pk_col.keys().value(i);
812            let pk = binary_values.value(key_idx as usize).to_vec();
813            let ts = ts_col.value(i);
814            out.push((pk, ts));
815        }
816    }
817
818    #[test]
819    fn test_flat_single_batch_one_key() {
820        let mut selector = FlatLastTimestampSelector::default();
821        let batch = new_flat_batch(&[b"k1", b"k1", b"k1"], &[1, 2, 3], &[10, 20, 30]);
822        let results = collect_flat_results(&mut selector, vec![batch]);
823        assert_eq!(vec![(b"k1".to_vec(), 3)], results);
824    }
825
826    #[test]
827    fn test_flat_single_batch_multiple_keys() {
828        let mut selector = FlatLastTimestampSelector::default();
829        let batch = new_flat_batch(
830            &[b"k1", b"k1", b"k2", b"k2", b"k3"],
831            &[1, 2, 3, 4, 5],
832            &[10, 20, 30, 40, 50],
833        );
834        let results = collect_flat_results(&mut selector, vec![batch]);
835        assert_eq!(
836            vec![
837                (b"k1".to_vec(), 2),
838                (b"k2".to_vec(), 4),
839                (b"k3".to_vec(), 5),
840            ],
841            results
842        );
843    }
844
845    #[test]
846    fn test_flat_key_spans_batches() {
847        let mut selector = FlatLastTimestampSelector::default();
848        let batches = vec![
849            new_flat_batch(&[b"k1", b"k1"], &[1, 2], &[10, 20]),
850            new_flat_batch(&[b"k1", b"k2"], &[3, 4], &[30, 40]),
851            new_flat_batch(&[b"k2", b"k3"], &[5, 6], &[50, 60]),
852        ];
853        let results = collect_flat_results(&mut selector, batches);
854        assert_eq!(
855            vec![
856                (b"k1".to_vec(), 3),
857                (b"k2".to_vec(), 5),
858                (b"k3".to_vec(), 6),
859            ],
860            results
861        );
862    }
863
864    #[test]
865    fn test_flat_duplicate_last_timestamps() {
866        let mut selector = FlatLastTimestampSelector::default();
867        // k1 has two rows with the same last timestamp (3).
868        let batch = new_flat_batch(
869            &[b"k1", b"k1", b"k1", b"k2"],
870            &[1, 3, 3, 5],
871            &[10, 20, 30, 40],
872        );
873        let results = collect_flat_results(&mut selector, vec![batch]);
874        assert_eq!(
875            vec![
876                (b"k1".to_vec(), 3),
877                (b"k1".to_vec(), 3),
878                (b"k2".to_vec(), 5),
879            ],
880            results
881        );
882    }
883
884    #[test]
885    fn test_flat_duplicate_last_timestamps_across_batches() {
886        let mut selector = FlatLastTimestampSelector::default();
887        // k1's last timestamp (3) spans two batches.
888        let batches = vec![
889            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
890            new_flat_batch(&[b"k1", b"k2"], &[3, 5], &[30, 40]),
891        ];
892        let results = collect_flat_results(&mut selector, batches);
893        assert_eq!(
894            vec![
895                (b"k1".to_vec(), 3),
896                (b"k1".to_vec(), 3),
897                (b"k2".to_vec(), 5),
898            ],
899            results
900        );
901    }
902
903    #[test]
904    fn test_flat_pending_chain_dropped_by_higher_timestamp() {
905        let mut selector = FlatLastTimestampSelector::default();
906        let batches = vec![
907            new_flat_batch(&[b"k1", b"k1"], &[1, 3], &[10, 20]),
908            new_flat_batch(&[b"k1", b"k1"], &[3, 3], &[21, 22]),
909            new_flat_batch(&[b"k1", b"k1"], &[4, 4], &[23, 24]),
910        ];
911        let results = collect_flat_results(&mut selector, batches);
912        assert_eq!(vec![(b"k1".to_vec(), 4), (b"k1".to_vec(), 4)], results);
913    }
914
915    #[test]
916    fn test_flat_finish_is_one_shot() {
917        let mut selector = FlatLastTimestampSelector::default();
918        let batch = new_flat_batch(&[b"k1", b"k1", b"k2"], &[1, 2, 3], &[10, 20, 30]);
919        let mut output_buffer = BatchBuffer::new();
920
921        // Feed one batch: completed keys can be emitted before EOF.
922        selector.on_next(batch, &mut output_buffer).unwrap();
923        let mut pre_finish = Vec::new();
924        for r in output_buffer.batches.drain(..) {
925            extract_flat_rows(&r, &mut pre_finish);
926        }
927        output_buffer.num_rows = 0;
928        assert_eq!(vec![(b"k1".to_vec(), 2)], pre_finish);
929
930        // Simulate EOF by calling finish().
931        selector.finish(&mut output_buffer).unwrap();
932        assert!(!output_buffer.is_empty());
933        output_buffer.batches.clear();
934        output_buffer.num_rows = 0;
935
936        // A second finish after EOF should not yield any more rows.
937        selector.finish(&mut output_buffer).unwrap();
938        assert!(output_buffer.is_empty());
939    }
940}