mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::vectors::UInt32Vector;
21use store_api::storage::TimeSeriesRowSelector;
22
23use crate::cache::{
24    selector_result_cache_hit, selector_result_cache_miss, CacheStrategy, SelectorResultKey,
25    SelectorResultValue,
26};
27use crate::error::Result;
28use crate::read::{Batch, BatchReader, BoxedBatchReader};
29use crate::sst::file::FileId;
30use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
31
32/// Reader to keep the last row for each time series.
33/// It assumes that batches from the input reader are
34/// - sorted
35/// - all deleted rows has been filtered.
36/// - not empty
37///
38/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
39/// it focus on time series (the same key).
40pub(crate) struct LastRowReader {
41    /// Inner reader.
42    reader: BoxedBatchReader,
43    /// The last batch pending to return.
44    selector: LastRowSelector,
45}
46
47impl LastRowReader {
48    /// Creates a new `LastRowReader`.
49    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
50        Self {
51            reader,
52            selector: LastRowSelector::default(),
53        }
54    }
55
56    /// Returns the last row of the next key.
57    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
58        while let Some(batch) = self.reader.next_batch().await? {
59            if let Some(yielded) = self.selector.on_next(batch) {
60                return Ok(Some(yielded));
61            }
62        }
63        Ok(self.selector.finish())
64    }
65}
66
67#[async_trait]
68impl BatchReader for LastRowReader {
69    async fn next_batch(&mut self) -> Result<Option<Batch>> {
70        self.next_last_row().await
71    }
72}
73
74/// Cached last row reader for specific row group.
75/// If the last rows for current row group are already cached, this reader returns the cached value.
76/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
77/// upon finish.
78pub(crate) enum RowGroupLastRowCachedReader {
79    /// Cache hit, reads last rows from cached value.
80    Hit(LastRowCacheReader),
81    /// Cache miss, reads from row group reader and update cache.
82    Miss(RowGroupLastRowReader),
83}
84
85impl RowGroupLastRowCachedReader {
86    pub(crate) fn new(
87        file_id: FileId,
88        row_group_idx: usize,
89        cache_strategy: CacheStrategy,
90        row_group_reader: RowGroupReader,
91    ) -> Self {
92        let key = SelectorResultKey {
93            file_id,
94            row_group_idx,
95            selector: TimeSeriesRowSelector::LastRow,
96        };
97
98        if let Some(value) = cache_strategy.get_selector_result(&key) {
99            let schema_matches =
100                value.projection == row_group_reader.read_format().projection_indices();
101            if schema_matches {
102                // Schema matches, use cache batches.
103                Self::new_hit(value)
104            } else {
105                Self::new_miss(key, row_group_reader, cache_strategy)
106            }
107        } else {
108            Self::new_miss(key, row_group_reader, cache_strategy)
109        }
110    }
111
112    /// Gets the underlying reader metrics if uncached.
113    pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
114        match self {
115            RowGroupLastRowCachedReader::Hit(_) => None,
116            RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
117        }
118    }
119
120    /// Creates new Hit variant and updates metrics.
121    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
122        selector_result_cache_hit();
123        Self::Hit(LastRowCacheReader { value, idx: 0 })
124    }
125
126    /// Creates new Miss variant and updates metrics.
127    fn new_miss(
128        key: SelectorResultKey,
129        row_group_reader: RowGroupReader,
130        cache_strategy: CacheStrategy,
131    ) -> Self {
132        selector_result_cache_miss();
133        Self::Miss(RowGroupLastRowReader::new(
134            key,
135            row_group_reader,
136            cache_strategy,
137        ))
138    }
139}
140
141#[async_trait]
142impl BatchReader for RowGroupLastRowCachedReader {
143    async fn next_batch(&mut self) -> Result<Option<Batch>> {
144        match self {
145            RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
146            RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
147        }
148    }
149}
150
151/// Last row reader that returns the cached last rows for row group.
152pub(crate) struct LastRowCacheReader {
153    value: Arc<SelectorResultValue>,
154    idx: usize,
155}
156
157impl LastRowCacheReader {
158    /// Iterates cached last rows.
159    async fn next_batch(&mut self) -> Result<Option<Batch>> {
160        if self.idx < self.value.result.len() {
161            let res = Ok(Some(self.value.result[self.idx].clone()));
162            self.idx += 1;
163            res
164        } else {
165            Ok(None)
166        }
167    }
168}
169
170pub(crate) struct RowGroupLastRowReader {
171    key: SelectorResultKey,
172    reader: RowGroupReader,
173    selector: LastRowSelector,
174    yielded_batches: Vec<Batch>,
175    cache_strategy: CacheStrategy,
176    /// Index buffer to take a new batch from the last row.
177    take_index: UInt32Vector,
178}
179
180impl RowGroupLastRowReader {
181    fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
182        Self {
183            key,
184            reader,
185            selector: LastRowSelector::default(),
186            yielded_batches: vec![],
187            cache_strategy,
188            take_index: UInt32Vector::from_vec(vec![0]),
189        }
190    }
191
192    async fn next_batch(&mut self) -> Result<Option<Batch>> {
193        while let Some(batch) = self.reader.next_batch().await? {
194            if let Some(yielded) = self.selector.on_next(batch) {
195                push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
196                return Ok(Some(yielded));
197            }
198        }
199        let last_batch = if let Some(last_batch) = self.selector.finish() {
200            push_yielded_batches(
201                last_batch.clone(),
202                &self.take_index,
203                &mut self.yielded_batches,
204            )?;
205            Some(last_batch)
206        } else {
207            None
208        };
209
210        // All last rows in row group are yielded, update cache.
211        self.maybe_update_cache();
212        Ok(last_batch)
213    }
214
215    /// Updates row group's last row cache if cache manager is present.
216    fn maybe_update_cache(&mut self) {
217        if self.yielded_batches.is_empty() {
218            // we always expect that row groups yields batches.
219            return;
220        }
221        let value = Arc::new(SelectorResultValue {
222            result: std::mem::take(&mut self.yielded_batches),
223            projection: self.reader.read_format().projection_indices().to_vec(),
224        });
225        self.cache_strategy.put_selector_result(self.key, value);
226    }
227
228    fn metrics(&self) -> &ReaderMetrics {
229        self.reader.metrics()
230    }
231}
232
233/// Push last row into `yielded_batches`.
234fn push_yielded_batches(
235    mut batch: Batch,
236    take_index: &UInt32Vector,
237    yielded_batches: &mut Vec<Batch>,
238) -> Result<()> {
239    assert_eq!(1, batch.num_rows());
240    batch.take_in_place(take_index)?;
241    yielded_batches.push(batch);
242
243    Ok(())
244}
245
246/// Common struct that selects only the last row of each time series.
247#[derive(Default)]
248pub struct LastRowSelector {
249    last_batch: Option<Batch>,
250}
251
252impl LastRowSelector {
253    /// Handles next batch. Return the yielding batch if present.
254    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
255        if let Some(last) = &self.last_batch {
256            if last.primary_key() == batch.primary_key() {
257                // Same key, update last batch.
258                self.last_batch = Some(batch);
259                None
260            } else {
261                // Different key, return the last row in `last` and update `last_batch` by
262                // current batch.
263                debug_assert!(!last.is_empty());
264                let last_row = last.slice(last.num_rows() - 1, 1);
265                self.last_batch = Some(batch);
266                Some(last_row)
267            }
268        } else {
269            self.last_batch = Some(batch);
270            None
271        }
272    }
273
274    /// Finishes the selector and returns the pending batch if any.
275    pub fn finish(&mut self) -> Option<Batch> {
276        if let Some(last) = self.last_batch.take() {
277            // This is the last key.
278            let last_row = last.slice(last.num_rows() - 1, 1);
279            return Some(last_row);
280        }
281        None
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use api::v1::OpType;
288
289    use super::*;
290    use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
291
292    #[tokio::test]
293    async fn test_last_row_one_batch() {
294        let input = [new_batch(
295            b"k1",
296            &[1, 2],
297            &[11, 11],
298            &[OpType::Put, OpType::Put],
299            &[21, 22],
300        )];
301        let reader = VecBatchReader::new(&input);
302        let mut reader = LastRowReader::new(Box::new(reader));
303        check_reader_result(
304            &mut reader,
305            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
306        )
307        .await;
308
309        // Only one row.
310        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
311        let reader = VecBatchReader::new(&input);
312        let mut reader = LastRowReader::new(Box::new(reader));
313        check_reader_result(
314            &mut reader,
315            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
316        )
317        .await;
318    }
319
320    #[tokio::test]
321    async fn test_last_row_multi_batch() {
322        let input = [
323            new_batch(
324                b"k1",
325                &[1, 2],
326                &[11, 11],
327                &[OpType::Put, OpType::Put],
328                &[21, 22],
329            ),
330            new_batch(
331                b"k1",
332                &[3, 4],
333                &[11, 11],
334                &[OpType::Put, OpType::Put],
335                &[23, 24],
336            ),
337            new_batch(
338                b"k2",
339                &[1, 2],
340                &[11, 11],
341                &[OpType::Put, OpType::Put],
342                &[31, 32],
343            ),
344        ];
345        let reader = VecBatchReader::new(&input);
346        let mut reader = LastRowReader::new(Box::new(reader));
347        check_reader_result(
348            &mut reader,
349            &[
350                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
351                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
352            ],
353        )
354        .await;
355    }
356}