Skip to main content

log_store/kafka/index/
iterator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{max, min};
16use std::collections::{BTreeSet, VecDeque};
17use std::fmt::Debug;
18use std::ops::Range;
19
20use store_api::logstore::EntryId;
21
22use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
23
24#[derive(Debug, PartialEq, Eq)]
25pub(crate) struct NextBatchHint {
26    pub(crate) bytes: usize,
27    pub(crate) len: usize,
28}
29
30/// An iterator over WAL (Write-Ahead Log) entries index for a region.
31pub trait RegionWalIndexIterator: Send + Sync + Debug {
32    /// Returns next batch hint.
33    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
34
35    // Peeks the next EntryId without advancing the iterator.
36    fn peek(&self) -> Option<EntryId>;
37
38    // Advances the iterator and returns the next EntryId.
39    fn next(&mut self) -> Option<EntryId>;
40
41    #[cfg(test)]
42    fn as_any(&self) -> &dyn std::any::Any;
43}
44
45/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
46#[derive(Debug)]
47pub struct RegionWalRange {
48    current_entry_id: EntryId,
49    end_entry_id: EntryId,
50    max_batch_size: usize,
51}
52
53impl RegionWalRange {
54    pub fn new(range: Range<EntryId>, max_batch_size: usize) -> Self {
55        Self {
56            current_entry_id: range.start,
57            end_entry_id: range.end,
58            max_batch_size,
59        }
60    }
61
62    fn next_batch_size(&self) -> Option<u64> {
63        if self.current_entry_id < self.end_entry_id {
64            Some(self.end_entry_id.saturating_sub(self.current_entry_id))
65        } else {
66            None
67        }
68    }
69}
70
71impl RegionWalIndexIterator for RegionWalRange {
72    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
73        if let Some(size) = self.next_batch_size() {
74            let bytes = min(size as usize * avg_size, self.max_batch_size);
75            let len = bytes / avg_size;
76
77            return Some(NextBatchHint { bytes, len });
78        }
79
80        None
81    }
82
83    fn peek(&self) -> Option<EntryId> {
84        if self.current_entry_id < self.end_entry_id {
85            Some(self.current_entry_id)
86        } else {
87            None
88        }
89    }
90
91    fn next(&mut self) -> Option<EntryId> {
92        if self.current_entry_id < self.end_entry_id {
93            let next = self.current_entry_id;
94            self.current_entry_id += 1;
95            Some(next)
96        } else {
97            None
98        }
99    }
100
101    #[cfg(test)]
102    fn as_any(&self) -> &dyn std::any::Any {
103        self
104    }
105}
106
107pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
108
109/// Represents an index of Write-Ahead Log entries for a region,
110/// stored as a vector of [EntryId]s.
111#[derive(Debug)]
112pub struct RegionWalVecIndex {
113    index: VecDeque<EntryId>,
114    min_batch_window_size: usize,
115}
116
117impl RegionWalVecIndex {
118    pub fn new<I: IntoIterator<Item = EntryId>>(index: I, min_batch_window_size: usize) -> Self {
119        Self {
120            index: index.into_iter().collect::<VecDeque<_>>(),
121            min_batch_window_size,
122        }
123    }
124}
125
126impl RegionWalIndexIterator for RegionWalVecIndex {
127    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
128        let merger = MergeRange::new(
129            ConvertIndexToRange::new(self.index.iter().peekable(), avg_size),
130            self.min_batch_window_size,
131        );
132
133        merger.merge().map(|(range, size)| NextBatchHint {
134            bytes: range.end - range.start - 1,
135            len: size,
136        })
137    }
138
139    fn peek(&self) -> Option<EntryId> {
140        self.index.front().cloned()
141    }
142
143    fn next(&mut self) -> Option<EntryId> {
144        self.index.pop_front()
145    }
146
147    #[cfg(test)]
148    fn as_any(&self) -> &dyn std::any::Any {
149        self
150    }
151}
152
153/// Represents an iterator over multiple region WAL indexes.
154///
155/// Allowing iteration through multiple WAL indexes.
156#[derive(Debug)]
157pub struct MultipleRegionWalIndexIterator {
158    iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
159}
160
161impl MultipleRegionWalIndexIterator {
162    pub fn new<I: IntoIterator<Item = Box<dyn RegionWalIndexIterator>>>(iterator: I) -> Self {
163        Self {
164            iterator: iterator.into_iter().collect::<VecDeque<_>>(),
165        }
166    }
167}
168
169impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
170    fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
171        for iter in &self.iterator {
172            if let Some(batch) = iter.next_batch_hint(avg_size) {
173                return Some(batch);
174            }
175        }
176
177        None
178    }
179
180    fn peek(&self) -> Option<EntryId> {
181        for iter in &self.iterator {
182            let peek = iter.peek();
183            if peek.is_some() {
184                return peek;
185            }
186        }
187
188        None
189    }
190
191    fn next(&mut self) -> Option<EntryId> {
192        while !self.iterator.is_empty() {
193            let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none();
194            if remove {
195                self.iterator.pop_front();
196            } else {
197                break;
198            }
199        }
200
201        self.iterator.front_mut().and_then(|iter| iter.next())
202    }
203
204    #[cfg(test)]
205    fn as_any(&self) -> &dyn std::any::Any {
206        self
207    }
208}
209
210/// Builds [`RegionWalIndexIterator`].
211///
212/// Returns None means there are no entries to replay.
213pub fn build_region_wal_index_iterator(
214    start_entry_id: EntryId,
215    end_entry_id: EntryId,
216    region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
217    max_batch_bytes: usize,
218    min_window_size: usize,
219) -> Option<Box<dyn RegionWalIndexIterator>> {
220    if (start_entry_id..end_entry_id).is_empty() {
221        return None;
222    }
223
224    match region_indexes {
225        Some((region_indexes, last_index)) => {
226            if region_indexes.is_empty() && last_index >= end_entry_id {
227                return None;
228            }
229
230            let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
231            if !region_indexes.is_empty() {
232                let index = RegionWalVecIndex::new(region_indexes, min_window_size);
233                iterator.push(Box::new(index));
234            }
235            let known_last_index = max(last_index, start_entry_id);
236            if known_last_index < end_entry_id {
237                let range = known_last_index..end_entry_id;
238                let index = RegionWalRange::new(range, max_batch_bytes);
239                iterator.push(Box::new(index));
240            }
241
242            Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
243        }
244        None => {
245            let range = start_entry_id..end_entry_id;
246
247            Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_region_wal_range() {
258        let range = RegionWalRange::new(0..1024, 1024);
259        assert_eq!(
260            range.next_batch_hint(10),
261            Some(NextBatchHint {
262                bytes: 1024,
263                len: 102
264            })
265        );
266
267        let mut range = RegionWalRange::new(0..1, 1024);
268
269        assert_eq!(range.next_batch_size(), Some(1));
270        assert_eq!(range.peek(), Some(0));
271
272        // Advance 1 step
273        assert_eq!(range.next(), Some(0));
274        assert_eq!(range.next_batch_size(), None);
275
276        // Advance 1 step
277        assert_eq!(range.next(), None);
278        assert_eq!(range.next_batch_size(), None);
279        // No effect
280        assert_eq!(range.next(), None);
281        assert_eq!(range.next_batch_size(), None);
282
283        let mut range = RegionWalRange::new(0..0, 1024);
284        assert_eq!(range.next_batch_size(), None);
285        // No effect
286        assert_eq!(range.next(), None);
287        assert_eq!(range.next_batch_size(), None);
288    }
289
290    #[test]
291    fn test_region_wal_vec_index() {
292        let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30);
293        assert_eq!(
294            index.next_batch_hint(10),
295            Some(NextBatchHint { bytes: 30, len: 3 })
296        );
297        assert_eq!(index.peek(), Some(0));
298        // Advance 1 step
299        assert_eq!(index.next(), Some(0));
300        assert_eq!(
301            index.next_batch_hint(10),
302            Some(NextBatchHint { bytes: 20, len: 2 })
303        );
304        // Advance 1 step
305        assert_eq!(index.next(), Some(1));
306        assert_eq!(
307            index.next_batch_hint(10),
308            Some(NextBatchHint { bytes: 10, len: 1 })
309        );
310        // Advance 1 step
311        assert_eq!(index.next(), Some(2));
312        assert_eq!(
313            index.next_batch_hint(10),
314            Some(NextBatchHint { bytes: 20, len: 2 })
315        );
316        // Advance 1 step
317        assert_eq!(index.next(), Some(7));
318        assert_eq!(
319            index.next_batch_hint(10),
320            Some(NextBatchHint { bytes: 40, len: 2 })
321        );
322        // Advance 1 step
323        assert_eq!(index.next(), Some(8));
324        assert_eq!(
325            index.next_batch_hint(10),
326            Some(NextBatchHint { bytes: 10, len: 1 })
327        );
328        // Advance 1 step
329        assert_eq!(index.next(), Some(11));
330        assert_eq!(index.next_batch_hint(10), None);
331
332        // No effect
333        assert_eq!(index.next(), None);
334        assert_eq!(index.next_batch_hint(10), None);
335
336        let mut index = RegionWalVecIndex::new([], 1024);
337        assert_eq!(index.next_batch_hint(10), None);
338        assert_eq!(index.peek(), None);
339        // No effect
340        assert_eq!(index.peek(), None);
341        assert_eq!(index.next(), None);
342        assert_eq!(index.next_batch_hint(10), None);
343    }
344
345    #[test]
346    fn test_multiple_region_wal_iterator() {
347        let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
348        let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _;
349        let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _;
350        let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]);
351
352        // The next batch is 0, 1, 2
353        assert_eq!(
354            iter.next_batch_hint(10),
355            Some(NextBatchHint { bytes: 30, len: 3 })
356        );
357        assert_eq!(iter.peek(), Some(0));
358        // Advance 1 step
359        assert_eq!(iter.next(), Some(0));
360
361        // The next batch is 1, 2
362        assert_eq!(
363            iter.next_batch_hint(10),
364            Some(NextBatchHint { bytes: 20, len: 2 })
365        );
366        assert_eq!(iter.peek(), Some(1));
367        // Advance 1 step
368        assert_eq!(iter.next(), Some(1));
369
370        // The next batch is 2
371        assert_eq!(
372            iter.next_batch_hint(10),
373            Some(NextBatchHint { bytes: 10, len: 1 })
374        );
375        assert_eq!(iter.peek(), Some(2));
376
377        // Advance 1 step
378        assert_eq!(iter.next(), Some(2));
379        // The next batch is 7, 8, 11
380        assert_eq!(
381            iter.next_batch_hint(10),
382            Some(NextBatchHint { bytes: 50, len: 3 })
383        );
384        assert_eq!(iter.peek(), Some(7));
385
386        // Advance 1 step
387        assert_eq!(iter.next(), Some(7));
388        // The next batch is 8, 11
389        assert_eq!(
390            iter.next_batch_hint(10),
391            Some(NextBatchHint { bytes: 40, len: 2 })
392        );
393        assert_eq!(iter.peek(), Some(8));
394
395        // Advance 1 step
396        assert_eq!(iter.next(), Some(8));
397        // The next batch is 11
398        assert_eq!(
399            iter.next_batch_hint(10),
400            Some(NextBatchHint { bytes: 10, len: 1 })
401        );
402        assert_eq!(iter.peek(), Some(11));
403        // Advance 1 step
404        assert_eq!(iter.next(), Some(11));
405
406        assert_eq!(iter.next_batch_hint(10), None,);
407        assert_eq!(iter.peek(), None);
408        assert!(!iter.iterator.is_empty());
409        assert_eq!(iter.next(), None);
410        assert!(iter.iterator.is_empty());
411
412        // No effect
413        assert_eq!(iter.next(), None);
414        assert_eq!(iter.next_batch_hint(10), None,);
415        assert_eq!(iter.peek(), None);
416        assert_eq!(iter.next(), None);
417    }
418
419    #[test]
420    fn test_build_region_wal_index_iterator() {
421        let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
422        assert!(iterator.is_none());
423
424        let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
425        assert!(iterator.is_none());
426
427        let iterator =
428            build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
429        assert!(iterator.is_none());
430
431        let iterator =
432            build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
433        assert!(iterator.is_none());
434
435        let iterator =
436            build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
437        assert!(iterator.is_none());
438
439        let iterator = build_region_wal_index_iterator(
440            1,
441            1024,
442            Some((BTreeSet::from([512, 756]), 1024)),
443            5,
444            5,
445        )
446        .unwrap();
447        let iter = iterator
448            .as_any()
449            .downcast_ref::<MultipleRegionWalIndexIterator>()
450            .unwrap();
451        assert_eq!(iter.iterator.len(), 1);
452        let vec_index = iter.iterator[0]
453            .as_any()
454            .downcast_ref::<RegionWalVecIndex>()
455            .unwrap();
456        assert_eq!(vec_index.index, VecDeque::from([512, 756]));
457
458        let iterator = build_region_wal_index_iterator(
459            1,
460            1024,
461            Some((BTreeSet::from([512, 756]), 1023)),
462            5,
463            5,
464        )
465        .unwrap();
466        let iter = iterator
467            .as_any()
468            .downcast_ref::<MultipleRegionWalIndexIterator>()
469            .unwrap();
470        assert_eq!(iter.iterator.len(), 2);
471        let vec_index = iter.iterator[0]
472            .as_any()
473            .downcast_ref::<RegionWalVecIndex>()
474            .unwrap();
475        assert_eq!(vec_index.index, VecDeque::from([512, 756]));
476        let wal_range = iter.iterator[1]
477            .as_any()
478            .downcast_ref::<RegionWalRange>()
479            .unwrap();
480        assert_eq!(wal_range.current_entry_id, 1023);
481        assert_eq!(wal_range.end_entry_id, 1024);
482    }
483}