log_store/kafka/util/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{max, min};
16use std::iter::Peekable;
17use std::ops::Range;
18
19use store_api::logstore::EntryId;
20
21/// Convert a sequence of [`EntryId`]s into size ranges.
22pub(crate) struct ConvertIndexToRange<'a, I: Iterator<Item = &'a EntryId>> {
23    base: Option<EntryId>,
24    iter: Peekable<I>,
25    avg_size: usize,
26}
27
28impl<'a, I: Iterator<Item = &'a EntryId>> ConvertIndexToRange<'a, I> {
29    pub fn new(mut iter: Peekable<I>, avg_size: usize) -> Self {
30        let base = iter.peek().cloned().cloned();
31
32        Self {
33            base,
34            iter,
35            avg_size,
36        }
37    }
38}
39
40impl<'a, I: Iterator<Item = &'a EntryId>> Iterator for ConvertIndexToRange<'a, I> {
41    type Item = Range<usize>;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        let (base, val) = (&self.base?, self.iter.next()?);
45        let start = (*val - *base) as usize * self.avg_size;
46        let end = start + self.avg_size + 1;
47        Some(start..end)
48    }
49}
50
51/// Merge all ranges smaller than the `window_size`.
52///
53/// e.g.,
54///
55/// Case 1
56/// - input: range: [(0..3), (5..6), (5..8)], window_size: 6
57/// - output: range: (0..6)
58///
59/// Case 2
60/// - input: range: [(0..3)], window_size: 6
61/// - output: range: (0..3)
62pub(crate) struct MergeRange<I: Iterator<Item = Range<usize>>> {
63    iter: I,
64    window_size: usize,
65}
66
67impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
68    pub fn new(iter: I, window_size: usize) -> Self {
69        Self { iter, window_size }
70    }
71}
72
73/// Merges ranges.
74fn merge(this: &mut Range<usize>, other: &Range<usize>) {
75    this.start = min(this.start, other.start);
76    this.end = max(this.end, other.end);
77}
78
79impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
80    /// Calculates the size of the next merged range.
81    pub(crate) fn merge(mut self) -> Option<(Range<usize>, usize)> {
82        let mut merged_range = self.iter.next();
83        let this = merged_range.as_mut()?;
84        let mut merged = 1;
85        for next in self.iter {
86            let window = next.start - this.start;
87            if window > self.window_size {
88                break;
89            } else {
90                merge(this, &next);
91                merged += 1;
92            }
93        }
94        merged_range.map(|range| (range, merged))
95    }
96}
97
98#[cfg(test)]
99#[allow(clippy::single_range_in_vec_init)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_convert_index_to_range() {
105        let avg_size = 1024;
106        let index = [1u64, 4, 10, 15];
107        let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
108
109        assert_eq!(converter.next(), Some(0..avg_size + 1));
110        assert_eq!(converter.next(), Some(3 * avg_size..4 * avg_size + 1));
111        assert_eq!(converter.next(), Some(9 * avg_size..10 * avg_size + 1));
112        assert_eq!(converter.next(), Some(14 * avg_size..15 * avg_size + 1));
113        assert_eq!(converter.next(), None);
114
115        let index = [];
116        let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
117        assert_eq!(converter.next(), None);
118    }
119
120    #[test]
121    fn test_merge_range() {
122        let size_range = [(10usize..13), (12..14), (16..18), (19..29)];
123        let merger = MergeRange::new(size_range.into_iter(), 9);
124        assert_eq!(merger.merge(), Some((10..29, 4)));
125
126        let size_range = [(10usize..13), (12..14), (16..18)];
127        let merger = MergeRange::new(size_range.into_iter(), 5);
128        assert_eq!(merger.merge(), Some((10..14, 2)));
129
130        let size_range = [(10usize..13), (15..17), (16..18)];
131        let merger = MergeRange::new(size_range.into_iter(), 5);
132        assert_eq!(merger.merge(), Some((10..17, 2)));
133
134        let size_range = [(10usize..13)];
135        let merger = MergeRange::new(size_range.into_iter(), 4);
136        assert_eq!(merger.merge(), Some((10..13, 1)));
137
138        let size_range = [(10usize..13)];
139        let merger = MergeRange::new(size_range.into_iter(), 2);
140        assert_eq!(merger.merge(), Some((10..13, 1)));
141
142        let size_range = [];
143        let merger = MergeRange::new(size_range.into_iter(), 2);
144        assert_eq!(merger.merge(), None);
145    }
146}