mito2/memtable/partition_tree/
dedup.rs1use std::ops::Range;
16
17use crate::error::Result;
18use crate::memtable::partition_tree::data::DataBatch;
19use crate::memtable::partition_tree::shard::DataBatchSource;
20use crate::memtable::partition_tree::PkId;
21
22pub struct DedupReader<T> {
24 prev_batch_last_row: Option<(PkId, i64)>,
25 current_batch_range: Option<Range<usize>>,
26 inner: T,
27}
28
29impl<T: DataBatchSource> DedupReader<T> {
30 pub fn try_new(inner: T) -> Result<Self> {
32 let mut res = Self {
33 prev_batch_last_row: None,
34 current_batch_range: None,
35 inner,
36 };
37 res.next()?;
38 Ok(res)
39 }
40}
41
42impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
43 fn is_valid(&self) -> bool {
44 self.current_batch_range.is_some()
45 }
46
47 fn next(&mut self) -> Result<()> {
48 while self.inner.is_valid() {
49 match &mut self.prev_batch_last_row {
50 None => {
51 let current_batch = self.inner.current_data_batch();
53 let pk_id = self.inner.current_pk_id();
54 let (last_ts, _) = current_batch.last_row();
55 self.prev_batch_last_row = Some((pk_id, last_ts));
56 self.current_batch_range = Some(0..current_batch.num_rows());
57 break;
58 }
59 Some(prev_last_row) => {
60 self.inner.next()?;
61 if !self.inner.is_valid() {
62 self.current_batch_range = None;
64 break;
65 }
66 let current_batch = self.inner.current_data_batch();
67 let current_pk_id = self.inner.current_pk_id();
68 let (first_ts, _) = current_batch.first_row();
69 let rows_in_batch = current_batch.num_rows();
70
71 let (start, end) = if &(current_pk_id, first_ts) == prev_last_row {
72 if rows_in_batch == 1 {
74 continue;
76 } else {
77 (1, rows_in_batch)
79 }
80 } else {
81 (0, rows_in_batch)
83 };
84
85 let (last_ts, _) = current_batch.last_row();
86 *prev_last_row = (current_pk_id, last_ts);
87 self.current_batch_range = Some(start..end);
88 break;
89 }
90 }
91 }
92 Ok(())
93 }
94
95 fn current_pk_id(&self) -> PkId {
96 self.inner.current_pk_id()
97 }
98
99 fn current_key(&self) -> Option<&[u8]> {
100 self.inner.current_key()
101 }
102
103 fn current_data_batch(&self) -> DataBatch {
104 let range = self.current_batch_range.as_ref().unwrap();
105 let data_batch = self.inner.current_data_batch();
106 data_batch.slice(range.start, range.len())
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use store_api::metadata::RegionMetadataRef;
113
114 use super::*;
115 use crate::memtable::partition_tree::data::{DataBuffer, DataParts, DataPartsReader};
116 use crate::test_util::memtable_util::{
117 extract_data_batch, metadata_for_test, write_rows_to_buffer,
118 };
119
120 struct MockSource(DataPartsReader);
121
122 impl DataBatchSource for MockSource {
123 fn is_valid(&self) -> bool {
124 self.0.is_valid()
125 }
126
127 fn next(&mut self) -> Result<()> {
128 self.0.next()
129 }
130
131 fn current_pk_id(&self) -> PkId {
132 PkId {
133 shard_id: 0,
134 pk_index: self.0.current_data_batch().pk_index(),
135 }
136 }
137
138 fn current_key(&self) -> Option<&[u8]> {
139 None
140 }
141
142 fn current_data_batch(&self) -> DataBatch {
143 self.0.current_data_batch()
144 }
145 }
146
147 fn build_data_buffer(
148 meta: RegionMetadataRef,
149 rows: Vec<(u16, Vec<i64>)>,
150 seq: &mut u64,
151 ) -> DataBuffer {
152 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
153
154 for row in rows {
155 let (pk_index, timestamps) = row;
156 let num_rows = timestamps.len() as u64;
157 let v = timestamps.iter().map(|v| Some(*v as f64)).collect();
158
159 write_rows_to_buffer(&mut buffer, &meta, pk_index, timestamps, v, *seq);
160 *seq += num_rows;
161 }
162 buffer
163 }
164
165 fn check_data_parts_reader_dedup(
166 parts: Vec<Vec<(u16, Vec<i64>)>>,
167 expected: Vec<(u16, Vec<(i64, u64)>)>,
168 ) {
169 let meta = metadata_for_test();
170 let mut seq = 0;
171
172 let mut frozens = Vec::with_capacity(parts.len());
173 for part in parts {
174 let mut buffer1 = build_data_buffer(meta.clone(), part, &mut seq);
175 let part1 = buffer1.freeze(None, false).unwrap();
176 frozens.push(part1);
177 }
178
179 let parts = DataParts::new(meta, 10, true).with_frozen(frozens);
180
181 let mut res = Vec::with_capacity(expected.len());
182 let mut reader =
183 DedupReader::try_new(MockSource(parts.read().unwrap().build().unwrap())).unwrap();
184 while reader.is_valid() {
185 let batch = reader.current_data_batch();
186 res.push(extract_data_batch(&batch));
187 reader.next().unwrap();
188 }
189
190 assert_eq!(expected, res);
191 }
192
193 #[test]
194 fn test_data_parts_reader_dedup() {
195 check_data_parts_reader_dedup(vec![vec![(0, vec![1, 2])]], vec![(0, vec![(1, 0), (2, 1)])]);
196
197 check_data_parts_reader_dedup(
198 vec![
199 vec![(0, vec![1, 2])],
200 vec![(0, vec![1, 2])],
201 vec![(0, vec![2, 3])],
202 ],
203 vec![(0, vec![(1, 2)]), (0, vec![(2, 4)]), (0, vec![(3, 5)])],
204 );
205
206 check_data_parts_reader_dedup(
207 vec![vec![(0, vec![1])], vec![(0, vec![2])], vec![(0, vec![3])]],
208 vec![(0, vec![(1, 0)]), (0, vec![(2, 1)]), (0, vec![(3, 2)])],
209 );
210
211 check_data_parts_reader_dedup(
212 vec![vec![(0, vec![1])], vec![(0, vec![1])], vec![(0, vec![1])]],
213 vec![(0, vec![(1, 2)])],
214 );
215
216 check_data_parts_reader_dedup(
217 vec![vec![(0, vec![1])], vec![(1, vec![1])], vec![(2, vec![1])]],
218 vec![(0, vec![(1, 0)]), (1, vec![(1, 1)]), (2, vec![(1, 2)])],
219 );
220 }
221}