mito2/read/
last_row.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to read the last row of each time series.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use datatypes::vectors::UInt32Vector;
21use store_api::storage::{FileId, TimeSeriesRowSelector};
22
23use crate::cache::{
24    CacheStrategy, SelectorResultKey, SelectorResultValue, selector_result_cache_hit,
25    selector_result_cache_miss,
26};
27use crate::error::Result;
28use crate::read::{Batch, BatchReader, BoxedBatchReader};
29use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
30
31/// Reader to keep the last row for each time series.
32/// It assumes that batches from the input reader are
33/// - sorted
34/// - all deleted rows has been filtered.
35/// - not empty
36///
37/// This reader is different from the [MergeMode](crate::region::options::MergeMode) as
38/// it focus on time series (the same key).
39pub(crate) struct LastRowReader {
40    /// Inner reader.
41    reader: BoxedBatchReader,
42    /// The last batch pending to return.
43    selector: LastRowSelector,
44}
45
46impl LastRowReader {
47    /// Creates a new `LastRowReader`.
48    pub(crate) fn new(reader: BoxedBatchReader) -> Self {
49        Self {
50            reader,
51            selector: LastRowSelector::default(),
52        }
53    }
54
55    /// Returns the last row of the next key.
56    pub(crate) async fn next_last_row(&mut self) -> Result<Option<Batch>> {
57        while let Some(batch) = self.reader.next_batch().await? {
58            if let Some(yielded) = self.selector.on_next(batch) {
59                return Ok(Some(yielded));
60            }
61        }
62        Ok(self.selector.finish())
63    }
64}
65
66#[async_trait]
67impl BatchReader for LastRowReader {
68    async fn next_batch(&mut self) -> Result<Option<Batch>> {
69        self.next_last_row().await
70    }
71}
72
73/// Cached last row reader for specific row group.
74/// If the last rows for current row group are already cached, this reader returns the cached value.
75/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
76/// upon finish.
77pub(crate) enum RowGroupLastRowCachedReader {
78    /// Cache hit, reads last rows from cached value.
79    Hit(LastRowCacheReader),
80    /// Cache miss, reads from row group reader and update cache.
81    Miss(RowGroupLastRowReader),
82}
83
84impl RowGroupLastRowCachedReader {
85    pub(crate) fn new(
86        file_id: FileId,
87        row_group_idx: usize,
88        cache_strategy: CacheStrategy,
89        row_group_reader: RowGroupReader,
90    ) -> Self {
91        let key = SelectorResultKey {
92            file_id,
93            row_group_idx,
94            selector: TimeSeriesRowSelector::LastRow,
95        };
96
97        if let Some(value) = cache_strategy.get_selector_result(&key) {
98            let schema_matches =
99                value.projection == row_group_reader.read_format().projection_indices();
100            if schema_matches {
101                // Schema matches, use cache batches.
102                Self::new_hit(value)
103            } else {
104                Self::new_miss(key, row_group_reader, cache_strategy)
105            }
106        } else {
107            Self::new_miss(key, row_group_reader, cache_strategy)
108        }
109    }
110
111    /// Gets the underlying reader metrics if uncached.
112    pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
113        match self {
114            RowGroupLastRowCachedReader::Hit(_) => None,
115            RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
116        }
117    }
118
119    /// Creates new Hit variant and updates metrics.
120    fn new_hit(value: Arc<SelectorResultValue>) -> Self {
121        selector_result_cache_hit();
122        Self::Hit(LastRowCacheReader { value, idx: 0 })
123    }
124
125    /// Creates new Miss variant and updates metrics.
126    fn new_miss(
127        key: SelectorResultKey,
128        row_group_reader: RowGroupReader,
129        cache_strategy: CacheStrategy,
130    ) -> Self {
131        selector_result_cache_miss();
132        Self::Miss(RowGroupLastRowReader::new(
133            key,
134            row_group_reader,
135            cache_strategy,
136        ))
137    }
138}
139
140#[async_trait]
141impl BatchReader for RowGroupLastRowCachedReader {
142    async fn next_batch(&mut self) -> Result<Option<Batch>> {
143        match self {
144            RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
145            RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
146        }
147    }
148}
149
150/// Last row reader that returns the cached last rows for row group.
151pub(crate) struct LastRowCacheReader {
152    value: Arc<SelectorResultValue>,
153    idx: usize,
154}
155
156impl LastRowCacheReader {
157    /// Iterates cached last rows.
158    async fn next_batch(&mut self) -> Result<Option<Batch>> {
159        if self.idx < self.value.result.len() {
160            let res = Ok(Some(self.value.result[self.idx].clone()));
161            self.idx += 1;
162            res
163        } else {
164            Ok(None)
165        }
166    }
167}
168
169pub(crate) struct RowGroupLastRowReader {
170    key: SelectorResultKey,
171    reader: RowGroupReader,
172    selector: LastRowSelector,
173    yielded_batches: Vec<Batch>,
174    cache_strategy: CacheStrategy,
175    /// Index buffer to take a new batch from the last row.
176    take_index: UInt32Vector,
177}
178
179impl RowGroupLastRowReader {
180    fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
181        Self {
182            key,
183            reader,
184            selector: LastRowSelector::default(),
185            yielded_batches: vec![],
186            cache_strategy,
187            take_index: UInt32Vector::from_vec(vec![0]),
188        }
189    }
190
191    async fn next_batch(&mut self) -> Result<Option<Batch>> {
192        while let Some(batch) = self.reader.next_batch().await? {
193            if let Some(yielded) = self.selector.on_next(batch) {
194                push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
195                return Ok(Some(yielded));
196            }
197        }
198        let last_batch = if let Some(last_batch) = self.selector.finish() {
199            push_yielded_batches(
200                last_batch.clone(),
201                &self.take_index,
202                &mut self.yielded_batches,
203            )?;
204            Some(last_batch)
205        } else {
206            None
207        };
208
209        // All last rows in row group are yielded, update cache.
210        self.maybe_update_cache();
211        Ok(last_batch)
212    }
213
214    /// Updates row group's last row cache if cache manager is present.
215    fn maybe_update_cache(&mut self) {
216        if self.yielded_batches.is_empty() {
217            // we always expect that row groups yields batches.
218            return;
219        }
220        let value = Arc::new(SelectorResultValue {
221            result: std::mem::take(&mut self.yielded_batches),
222            projection: self.reader.read_format().projection_indices().to_vec(),
223        });
224        self.cache_strategy.put_selector_result(self.key, value);
225    }
226
227    fn metrics(&self) -> &ReaderMetrics {
228        self.reader.metrics()
229    }
230}
231
232/// Push last row into `yielded_batches`.
233fn push_yielded_batches(
234    mut batch: Batch,
235    take_index: &UInt32Vector,
236    yielded_batches: &mut Vec<Batch>,
237) -> Result<()> {
238    assert_eq!(1, batch.num_rows());
239    batch.take_in_place(take_index)?;
240    yielded_batches.push(batch);
241
242    Ok(())
243}
244
245/// Common struct that selects only the last row of each time series.
246#[derive(Default)]
247pub struct LastRowSelector {
248    last_batch: Option<Batch>,
249}
250
251impl LastRowSelector {
252    /// Handles next batch. Return the yielding batch if present.
253    pub fn on_next(&mut self, batch: Batch) -> Option<Batch> {
254        if let Some(last) = &self.last_batch {
255            if last.primary_key() == batch.primary_key() {
256                // Same key, update last batch.
257                self.last_batch = Some(batch);
258                None
259            } else {
260                // Different key, return the last row in `last` and update `last_batch` by
261                // current batch.
262                debug_assert!(!last.is_empty());
263                let last_row = last.slice(last.num_rows() - 1, 1);
264                self.last_batch = Some(batch);
265                Some(last_row)
266            }
267        } else {
268            self.last_batch = Some(batch);
269            None
270        }
271    }
272
273    /// Finishes the selector and returns the pending batch if any.
274    pub fn finish(&mut self) -> Option<Batch> {
275        if let Some(last) = self.last_batch.take() {
276            // This is the last key.
277            let last_row = last.slice(last.num_rows() - 1, 1);
278            return Some(last_row);
279        }
280        None
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use api::v1::OpType;
287
288    use super::*;
289    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
290
291    #[tokio::test]
292    async fn test_last_row_one_batch() {
293        let input = [new_batch(
294            b"k1",
295            &[1, 2],
296            &[11, 11],
297            &[OpType::Put, OpType::Put],
298            &[21, 22],
299        )];
300        let reader = VecBatchReader::new(&input);
301        let mut reader = LastRowReader::new(Box::new(reader));
302        check_reader_result(
303            &mut reader,
304            &[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
305        )
306        .await;
307
308        // Only one row.
309        let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
310        let reader = VecBatchReader::new(&input);
311        let mut reader = LastRowReader::new(Box::new(reader));
312        check_reader_result(
313            &mut reader,
314            &[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
315        )
316        .await;
317    }
318
319    #[tokio::test]
320    async fn test_last_row_multi_batch() {
321        let input = [
322            new_batch(
323                b"k1",
324                &[1, 2],
325                &[11, 11],
326                &[OpType::Put, OpType::Put],
327                &[21, 22],
328            ),
329            new_batch(
330                b"k1",
331                &[3, 4],
332                &[11, 11],
333                &[OpType::Put, OpType::Put],
334                &[23, 24],
335            ),
336            new_batch(
337                b"k2",
338                &[1, 2],
339                &[11, 11],
340                &[OpType::Put, OpType::Put],
341                &[31, 32],
342            ),
343        ];
344        let reader = VecBatchReader::new(&input);
345        let mut reader = LastRowReader::new(Box::new(reader));
346        check_reader_result(
347            &mut reader,
348            &[
349                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[24]),
350                new_batch(b"k2", &[2], &[11], &[OpType::Put], &[32]),
351            ],
352        )
353        .await;
354    }
355}