log_store/kafka/util/
range.rs1use std::cmp::{max, min};
16use std::iter::Peekable;
17use std::ops::Range;
18
19use store_api::logstore::EntryId;
20
21pub(crate) struct ConvertIndexToRange<'a, I: Iterator<Item = &'a EntryId>> {
23 base: Option<EntryId>,
24 iter: Peekable<I>,
25 avg_size: usize,
26}
27
28impl<'a, I: Iterator<Item = &'a EntryId>> ConvertIndexToRange<'a, I> {
29 pub fn new(mut iter: Peekable<I>, avg_size: usize) -> Self {
30 let base = iter.peek().cloned().cloned();
31
32 Self {
33 base,
34 iter,
35 avg_size,
36 }
37 }
38}
39
40impl<'a, I: Iterator<Item = &'a EntryId>> Iterator for ConvertIndexToRange<'a, I> {
41 type Item = Range<usize>;
42
43 fn next(&mut self) -> Option<Self::Item> {
44 let (base, val) = (&self.base?, self.iter.next()?);
45 let start = (*val - *base) as usize * self.avg_size;
46 let end = start + self.avg_size + 1;
47 Some(start..end)
48 }
49}
50
51pub(crate) struct MergeRange<I: Iterator<Item = Range<usize>>> {
63 iter: I,
64 window_size: usize,
65}
66
67impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
68 pub fn new(iter: I, window_size: usize) -> Self {
69 Self { iter, window_size }
70 }
71}
72
73fn merge(this: &mut Range<usize>, other: &Range<usize>) {
75 this.start = min(this.start, other.start);
76 this.end = max(this.end, other.end);
77}
78
79impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
80 pub(crate) fn merge(mut self) -> Option<(Range<usize>, usize)> {
82 let mut merged_range = self.iter.next();
83 let this = merged_range.as_mut()?;
84 let mut merged = 1;
85 for next in self.iter {
86 let window = next.start - this.start;
87 if window > self.window_size {
88 break;
89 } else {
90 merge(this, &next);
91 merged += 1;
92 }
93 }
94 merged_range.map(|range| (range, merged))
95 }
96}
97
98#[cfg(test)]
99#[allow(clippy::single_range_in_vec_init)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_convert_index_to_range() {
105 let avg_size = 1024;
106 let index = [1u64, 4, 10, 15];
107 let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
108
109 assert_eq!(converter.next(), Some(0..avg_size + 1));
110 assert_eq!(converter.next(), Some(3 * avg_size..4 * avg_size + 1));
111 assert_eq!(converter.next(), Some(9 * avg_size..10 * avg_size + 1));
112 assert_eq!(converter.next(), Some(14 * avg_size..15 * avg_size + 1));
113 assert_eq!(converter.next(), None);
114
115 let index = [];
116 let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
117 assert_eq!(converter.next(), None);
118 }
119
120 #[test]
121 fn test_merge_range() {
122 let size_range = [(10usize..13), (12..14), (16..18), (19..29)];
123 let merger = MergeRange::new(size_range.into_iter(), 9);
124 assert_eq!(merger.merge(), Some((10..29, 4)));
125
126 let size_range = [(10usize..13), (12..14), (16..18)];
127 let merger = MergeRange::new(size_range.into_iter(), 5);
128 assert_eq!(merger.merge(), Some((10..14, 2)));
129
130 let size_range = [(10usize..13), (15..17), (16..18)];
131 let merger = MergeRange::new(size_range.into_iter(), 5);
132 assert_eq!(merger.merge(), Some((10..17, 2)));
133
134 let size_range = [(10usize..13)];
135 let merger = MergeRange::new(size_range.into_iter(), 4);
136 assert_eq!(merger.merge(), Some((10..13, 1)));
137
138 let size_range = [(10usize..13)];
139 let merger = MergeRange::new(size_range.into_iter(), 2);
140 assert_eq!(merger.merge(), Some((10..13, 1)));
141
142 let size_range = [];
143 let merger = MergeRange::new(size_range.into_iter(), 2);
144 assert_eq!(merger.merge(), None);
145 }
146}