log_store/kafka/index/
iterator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{max, min};
16use std::collections::{BTreeSet, VecDeque};
17use std::fmt::Debug;
18use std::ops::Range;
19
20use store_api::logstore::EntryId;
21
22use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
23
24#[derive(Debug, PartialEq, Eq)]
25pub(crate) struct NextBatchHint {
26    pub(crate) bytes: usize,
27    pub(crate) len: usize,
28}
29
30/// An iterator over WAL (Write-Ahead Log) entries index for a region.
31pub trait RegionWalIndexIterator: Send + Sync + Debug {
32    /// Returns next batch hint.
33    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
34
35    // Peeks the next EntryId without advancing the iterator.
36    fn peek(&self) -> Option<EntryId>;
37
38    // Advances the iterator and returns the next EntryId.
39    fn next(&mut self) -> Option<EntryId>;
40
41    #[cfg(test)]
42    fn as_any(&self) -> &dyn std::any::Any;
43}
44
45/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
46#[derive(Debug)]
47pub struct RegionWalRange {
48    current_entry_id: EntryId,
49    end_entry_id: EntryId,
50    max_batch_size: usize,
51}
52
53impl RegionWalRange {
54    pub fn new(range: Range<EntryId>, max_batch_size: usize) -> Self {
55        Self {
56            current_entry_id: range.start,
57            end_entry_id: range.end,
58            max_batch_size,
59        }
60    }
61
62    fn next_batch_size(&self) -> Option<u64> {
63        if self.current_entry_id < self.end_entry_id {
64            Some(
65                self.end_entry_id
66                    .checked_sub(self.current_entry_id)
67                    .unwrap_or_default(),
68            )
69        } else {
70            None
71        }
72    }
73}
74
75impl RegionWalIndexIterator for RegionWalRange {
76    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
77        if let Some(size) = self.next_batch_size() {
78            let bytes = min(size as usize * avg_size, self.max_batch_size);
79            let len = bytes / avg_size;
80
81            return Some(NextBatchHint { bytes, len });
82        }
83
84        None
85    }
86
87    fn peek(&self) -> Option<EntryId> {
88        if self.current_entry_id < self.end_entry_id {
89            Some(self.current_entry_id)
90        } else {
91            None
92        }
93    }
94
95    fn next(&mut self) -> Option<EntryId> {
96        if self.current_entry_id < self.end_entry_id {
97            let next = self.current_entry_id;
98            self.current_entry_id += 1;
99            Some(next)
100        } else {
101            None
102        }
103    }
104
105    #[cfg(test)]
106    fn as_any(&self) -> &dyn std::any::Any {
107        self
108    }
109}
110
111pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
112
113/// Represents an index of Write-Ahead Log entries for a region,
114/// stored as a vector of [EntryId]s.
115#[derive(Debug)]
116pub struct RegionWalVecIndex {
117    index: VecDeque<EntryId>,
118    min_batch_window_size: usize,
119}
120
121impl RegionWalVecIndex {
122    pub fn new<I: IntoIterator<Item = EntryId>>(index: I, min_batch_window_size: usize) -> Self {
123        Self {
124            index: index.into_iter().collect::<VecDeque<_>>(),
125            min_batch_window_size,
126        }
127    }
128}
129
130impl RegionWalIndexIterator for RegionWalVecIndex {
131    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
132        let merger = MergeRange::new(
133            ConvertIndexToRange::new(self.index.iter().peekable(), avg_size),
134            self.min_batch_window_size,
135        );
136
137        merger.merge().map(|(range, size)| NextBatchHint {
138            bytes: range.end - range.start - 1,
139            len: size,
140        })
141    }
142
143    fn peek(&self) -> Option<EntryId> {
144        self.index.front().cloned()
145    }
146
147    fn next(&mut self) -> Option<EntryId> {
148        self.index.pop_front()
149    }
150
151    #[cfg(test)]
152    fn as_any(&self) -> &dyn std::any::Any {
153        self
154    }
155}
156
157/// Represents an iterator over multiple region WAL indexes.
158///
159/// Allowing iteration through multiple WAL indexes.
160#[derive(Debug)]
161pub struct MultipleRegionWalIndexIterator {
162    iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
163}
164
165impl MultipleRegionWalIndexIterator {
166    pub fn new<I: IntoIterator<Item = Box<dyn RegionWalIndexIterator>>>(iterator: I) -> Self {
167        Self {
168            iterator: iterator.into_iter().collect::<VecDeque<_>>(),
169        }
170    }
171}
172
173impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
174    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
175        for iter in &self.iterator {
176            if let Some(batch) = iter.next_batch_hint(avg_size) {
177                return Some(batch);
178            }
179        }
180
181        None
182    }
183
184    fn peek(&self) -> Option<EntryId> {
185        for iter in &self.iterator {
186            let peek = iter.peek();
187            if peek.is_some() {
188                return peek;
189            }
190        }
191
192        None
193    }
194
195    fn next(&mut self) -> Option<EntryId> {
196        while !self.iterator.is_empty() {
197            let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none();
198            if remove {
199                self.iterator.pop_front();
200            } else {
201                break;
202            }
203        }
204
205        self.iterator.front_mut().and_then(|iter| iter.next())
206    }
207
208    #[cfg(test)]
209    fn as_any(&self) -> &dyn std::any::Any {
210        self
211    }
212}
213
214/// Builds [`RegionWalIndexIterator`].
215///
216/// Returns None means there are no entries to replay.
217pub fn build_region_wal_index_iterator(
218    start_entry_id: EntryId,
219    end_entry_id: EntryId,
220    region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
221    max_batch_bytes: usize,
222    min_window_size: usize,
223) -> Option<Box<dyn RegionWalIndexIterator>> {
224    if (start_entry_id..end_entry_id).is_empty() {
225        return None;
226    }
227
228    match region_indexes {
229        Some((region_indexes, last_index)) => {
230            if region_indexes.is_empty() && last_index >= end_entry_id {
231                return None;
232            }
233
234            let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
235            if !region_indexes.is_empty() {
236                let index = RegionWalVecIndex::new(region_indexes, min_window_size);
237                iterator.push(Box::new(index));
238            }
239            let known_last_index = max(last_index, start_entry_id);
240            if known_last_index < end_entry_id {
241                let range = known_last_index..end_entry_id;
242                let index = RegionWalRange::new(range, max_batch_bytes);
243                iterator.push(Box::new(index));
244            }
245
246            Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
247        }
248        None => {
249            let range = start_entry_id..end_entry_id;
250
251            Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_region_wal_range() {
262        let range = RegionWalRange::new(0..1024, 1024);
263        assert_eq!(
264            range.next_batch_hint(10),
265            Some(NextBatchHint {
266                bytes: 1024,
267                len: 102
268            })
269        );
270
271        let mut range = RegionWalRange::new(0..1, 1024);
272
273        assert_eq!(range.next_batch_size(), Some(1));
274        assert_eq!(range.peek(), Some(0));
275
276        // Advance 1 step
277        assert_eq!(range.next(), Some(0));
278        assert_eq!(range.next_batch_size(), None);
279
280        // Advance 1 step
281        assert_eq!(range.next(), None);
282        assert_eq!(range.next_batch_size(), None);
283        // No effect
284        assert_eq!(range.next(), None);
285        assert_eq!(range.next_batch_size(), None);
286
287        let mut range = RegionWalRange::new(0..0, 1024);
288        assert_eq!(range.next_batch_size(), None);
289        // No effect
290        assert_eq!(range.next(), None);
291        assert_eq!(range.next_batch_size(), None);
292    }
293
294    #[test]
295    fn test_region_wal_vec_index() {
296        let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30);
297        assert_eq!(
298            index.next_batch_hint(10),
299            Some(NextBatchHint { bytes: 30, len: 3 })
300        );
301        assert_eq!(index.peek(), Some(0));
302        // Advance 1 step
303        assert_eq!(index.next(), Some(0));
304        assert_eq!(
305            index.next_batch_hint(10),
306            Some(NextBatchHint { bytes: 20, len: 2 })
307        );
308        // Advance 1 step
309        assert_eq!(index.next(), Some(1));
310        assert_eq!(
311            index.next_batch_hint(10),
312            Some(NextBatchHint { bytes: 10, len: 1 })
313        );
314        // Advance 1 step
315        assert_eq!(index.next(), Some(2));
316        assert_eq!(
317            index.next_batch_hint(10),
318            Some(NextBatchHint { bytes: 20, len: 2 })
319        );
320        // Advance 1 step
321        assert_eq!(index.next(), Some(7));
322        assert_eq!(
323            index.next_batch_hint(10),
324            Some(NextBatchHint { bytes: 40, len: 2 })
325        );
326        // Advance 1 step
327        assert_eq!(index.next(), Some(8));
328        assert_eq!(
329            index.next_batch_hint(10),
330            Some(NextBatchHint { bytes: 10, len: 1 })
331        );
332        // Advance 1 step
333        assert_eq!(index.next(), Some(11));
334        assert_eq!(index.next_batch_hint(10), None);
335
336        // No effect
337        assert_eq!(index.next(), None);
338        assert_eq!(index.next_batch_hint(10), None);
339
340        let mut index = RegionWalVecIndex::new([], 1024);
341        assert_eq!(index.next_batch_hint(10), None);
342        assert_eq!(index.peek(), None);
343        // No effect
344        assert_eq!(index.peek(), None);
345        assert_eq!(index.next(), None);
346        assert_eq!(index.next_batch_hint(10), None);
347    }
348
349    #[test]
350    fn test_multiple_region_wal_iterator() {
351        let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
352        let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _;
353        let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _;
354        let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]);
355
356        // The next batch is 0, 1, 2
357        assert_eq!(
358            iter.next_batch_hint(10),
359            Some(NextBatchHint { bytes: 30, len: 3 })
360        );
361        assert_eq!(iter.peek(), Some(0));
362        // Advance 1 step
363        assert_eq!(iter.next(), Some(0));
364
365        // The next batch is 1, 2
366        assert_eq!(
367            iter.next_batch_hint(10),
368            Some(NextBatchHint { bytes: 20, len: 2 })
369        );
370        assert_eq!(iter.peek(), Some(1));
371        // Advance 1 step
372        assert_eq!(iter.next(), Some(1));
373
374        // The next batch is 2
375        assert_eq!(
376            iter.next_batch_hint(10),
377            Some(NextBatchHint { bytes: 10, len: 1 })
378        );
379        assert_eq!(iter.peek(), Some(2));
380
381        // Advance 1 step
382        assert_eq!(iter.next(), Some(2));
383        // The next batch is 7, 8, 11
384        assert_eq!(
385            iter.next_batch_hint(10),
386            Some(NextBatchHint { bytes: 50, len: 3 })
387        );
388        assert_eq!(iter.peek(), Some(7));
389
390        // Advance 1 step
391        assert_eq!(iter.next(), Some(7));
392        // The next batch is 8, 11
393        assert_eq!(
394            iter.next_batch_hint(10),
395            Some(NextBatchHint { bytes: 40, len: 2 })
396        );
397        assert_eq!(iter.peek(), Some(8));
398
399        // Advance 1 step
400        assert_eq!(iter.next(), Some(8));
401        // The next batch is 11
402        assert_eq!(
403            iter.next_batch_hint(10),
404            Some(NextBatchHint { bytes: 10, len: 1 })
405        );
406        assert_eq!(iter.peek(), Some(11));
407        // Advance 1 step
408        assert_eq!(iter.next(), Some(11));
409
410        assert_eq!(iter.next_batch_hint(10), None,);
411        assert_eq!(iter.peek(), None);
412        assert!(!iter.iterator.is_empty());
413        assert_eq!(iter.next(), None);
414        assert!(iter.iterator.is_empty());
415
416        // No effect
417        assert_eq!(iter.next(), None);
418        assert_eq!(iter.next_batch_hint(10), None,);
419        assert_eq!(iter.peek(), None);
420        assert_eq!(iter.next(), None);
421    }
422
423    #[test]
424    fn test_build_region_wal_index_iterator() {
425        let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
426        assert!(iterator.is_none());
427
428        let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
429        assert!(iterator.is_none());
430
431        let iterator =
432            build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
433        assert!(iterator.is_none());
434
435        let iterator =
436            build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
437        assert!(iterator.is_none());
438
439        let iterator =
440            build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
441        assert!(iterator.is_none());
442
443        let iterator = build_region_wal_index_iterator(
444            1,
445            1024,
446            Some((BTreeSet::from([512, 756]), 1024)),
447            5,
448            5,
449        )
450        .unwrap();
451        let iter = iterator
452            .as_any()
453            .downcast_ref::<MultipleRegionWalIndexIterator>()
454            .unwrap();
455        assert_eq!(iter.iterator.len(), 1);
456        let vec_index = iter.iterator[0]
457            .as_any()
458            .downcast_ref::<RegionWalVecIndex>()
459            .unwrap();
460        assert_eq!(vec_index.index, VecDeque::from([512, 756]));
461
462        let iterator = build_region_wal_index_iterator(
463            1,
464            1024,
465            Some((BTreeSet::from([512, 756]), 1023)),
466            5,
467            5,
468        )
469        .unwrap();
470        let iter = iterator
471            .as_any()
472            .downcast_ref::<MultipleRegionWalIndexIterator>()
473            .unwrap();
474        assert_eq!(iter.iterator.len(), 2);
475        let vec_index = iter.iterator[0]
476            .as_any()
477            .downcast_ref::<RegionWalVecIndex>()
478            .unwrap();
479        assert_eq!(vec_index.index, VecDeque::from([512, 756]));
480        let wal_range = iter.iterator[1]
481            .as_any()
482            .downcast_ref::<RegionWalRange>()
483            .unwrap();
484        assert_eq!(wal_range.current_entry_id, 1023);
485        assert_eq!(wal_range.end_entry_id, 1024);
486    }
487}