mito2/memtable/partition_tree/
dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16
17use crate::error::Result;
18use crate::memtable::partition_tree::data::DataBatch;
19use crate::memtable::partition_tree::shard::DataBatchSource;
20use crate::memtable::partition_tree::PkId;
21
22/// A reader that dedup sorted batches from a merger.
23pub struct DedupReader<T> {
24    prev_batch_last_row: Option<(PkId, i64)>,
25    current_batch_range: Option<Range<usize>>,
26    inner: T,
27}
28
29impl<T: DataBatchSource> DedupReader<T> {
30    /// Creates a new dedup reader.
31    pub fn try_new(inner: T) -> Result<Self> {
32        let mut res = Self {
33            prev_batch_last_row: None,
34            current_batch_range: None,
35            inner,
36        };
37        res.next()?;
38        Ok(res)
39    }
40}
41
42impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
43    fn is_valid(&self) -> bool {
44        self.current_batch_range.is_some()
45    }
46
47    fn next(&mut self) -> Result<()> {
48        while self.inner.is_valid() {
49            match &mut self.prev_batch_last_row {
50                None => {
51                    // First shot, fill prev_batch_last_row and current_batch_range with first batch.
52                    let current_batch = self.inner.current_data_batch();
53                    let pk_id = self.inner.current_pk_id();
54                    let (last_ts, _) = current_batch.last_row();
55                    self.prev_batch_last_row = Some((pk_id, last_ts));
56                    self.current_batch_range = Some(0..current_batch.num_rows());
57                    break;
58                }
59                Some(prev_last_row) => {
60                    self.inner.next()?;
61                    if !self.inner.is_valid() {
62                        // Resets current_batch_range if inner reader is exhausted.
63                        self.current_batch_range = None;
64                        break;
65                    }
66                    let current_batch = self.inner.current_data_batch();
67                    let current_pk_id = self.inner.current_pk_id();
68                    let (first_ts, _) = current_batch.first_row();
69                    let rows_in_batch = current_batch.num_rows();
70
71                    let (start, end) = if &(current_pk_id, first_ts) == prev_last_row {
72                        // First row in this batch duplicated with the last row in previous batch
73                        if rows_in_batch == 1 {
74                            // If batch is exhausted, move to next batch.
75                            continue;
76                        } else {
77                            // Skip the first row, start from offset 1.
78                            (1, rows_in_batch)
79                        }
80                    } else {
81                        // No duplicates found, yield whole batch.
82                        (0, rows_in_batch)
83                    };
84
85                    let (last_ts, _) = current_batch.last_row();
86                    *prev_last_row = (current_pk_id, last_ts);
87                    self.current_batch_range = Some(start..end);
88                    break;
89                }
90            }
91        }
92        Ok(())
93    }
94
95    fn current_pk_id(&self) -> PkId {
96        self.inner.current_pk_id()
97    }
98
99    fn current_key(&self) -> Option<&[u8]> {
100        self.inner.current_key()
101    }
102
103    fn current_data_batch(&self) -> DataBatch {
104        let range = self.current_batch_range.as_ref().unwrap();
105        let data_batch = self.inner.current_data_batch();
106        data_batch.slice(range.start, range.len())
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use store_api::metadata::RegionMetadataRef;
113
114    use super::*;
115    use crate::memtable::partition_tree::data::{DataBuffer, DataParts, DataPartsReader};
116    use crate::test_util::memtable_util::{
117        extract_data_batch, metadata_for_test, write_rows_to_buffer,
118    };
119
120    struct MockSource(DataPartsReader);
121
122    impl DataBatchSource for MockSource {
123        fn is_valid(&self) -> bool {
124            self.0.is_valid()
125        }
126
127        fn next(&mut self) -> Result<()> {
128            self.0.next()
129        }
130
131        fn current_pk_id(&self) -> PkId {
132            PkId {
133                shard_id: 0,
134                pk_index: self.0.current_data_batch().pk_index(),
135            }
136        }
137
138        fn current_key(&self) -> Option<&[u8]> {
139            None
140        }
141
142        fn current_data_batch(&self) -> DataBatch {
143            self.0.current_data_batch()
144        }
145    }
146
147    fn build_data_buffer(
148        meta: RegionMetadataRef,
149        rows: Vec<(u16, Vec<i64>)>,
150        seq: &mut u64,
151    ) -> DataBuffer {
152        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
153
154        for row in rows {
155            let (pk_index, timestamps) = row;
156            let num_rows = timestamps.len() as u64;
157            let v = timestamps.iter().map(|v| Some(*v as f64)).collect();
158
159            write_rows_to_buffer(&mut buffer, &meta, pk_index, timestamps, v, *seq);
160            *seq += num_rows;
161        }
162        buffer
163    }
164
165    fn check_data_parts_reader_dedup(
166        parts: Vec<Vec<(u16, Vec<i64>)>>,
167        expected: Vec<(u16, Vec<(i64, u64)>)>,
168    ) {
169        let meta = metadata_for_test();
170        let mut seq = 0;
171
172        let mut frozens = Vec::with_capacity(parts.len());
173        for part in parts {
174            let mut buffer1 = build_data_buffer(meta.clone(), part, &mut seq);
175            let part1 = buffer1.freeze(None, false).unwrap();
176            frozens.push(part1);
177        }
178
179        let parts = DataParts::new(meta, 10, true).with_frozen(frozens);
180
181        let mut res = Vec::with_capacity(expected.len());
182        let mut reader =
183            DedupReader::try_new(MockSource(parts.read().unwrap().build().unwrap())).unwrap();
184        while reader.is_valid() {
185            let batch = reader.current_data_batch();
186            res.push(extract_data_batch(&batch));
187            reader.next().unwrap();
188        }
189
190        assert_eq!(expected, res);
191    }
192
193    #[test]
194    fn test_data_parts_reader_dedup() {
195        check_data_parts_reader_dedup(vec![vec![(0, vec![1, 2])]], vec![(0, vec![(1, 0), (2, 1)])]);
196
197        check_data_parts_reader_dedup(
198            vec![
199                vec![(0, vec![1, 2])],
200                vec![(0, vec![1, 2])],
201                vec![(0, vec![2, 3])],
202            ],
203            vec![(0, vec![(1, 2)]), (0, vec![(2, 4)]), (0, vec![(3, 5)])],
204        );
205
206        check_data_parts_reader_dedup(
207            vec![vec![(0, vec![1])], vec![(0, vec![2])], vec![(0, vec![3])]],
208            vec![(0, vec![(1, 0)]), (0, vec![(2, 1)]), (0, vec![(3, 2)])],
209        );
210
211        check_data_parts_reader_dedup(
212            vec![vec![(0, vec![1])], vec![(0, vec![1])], vec![(0, vec![1])]],
213            vec![(0, vec![(1, 2)])],
214        );
215
216        check_data_parts_reader_dedup(
217            vec![vec![(0, vec![1])], vec![(1, vec![1])], vec![(2, vec![1])]],
218            vec![(0, vec![(1, 0)]), (1, vec![(1, 1)]), (2, vec![(1, 2)])],
219        );
220    }
221}