1use std::cmp::{max, min};
16use std::collections::{BTreeSet, VecDeque};
17use std::fmt::Debug;
18use std::ops::Range;
19
20use store_api::logstore::EntryId;
21
22use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
23
24#[derive(Debug, PartialEq, Eq)]
25pub(crate) struct NextBatchHint {
26 pub(crate) bytes: usize,
27 pub(crate) len: usize,
28}
29
30pub trait RegionWalIndexIterator: Send + Sync + Debug {
32 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
34
35 fn peek(&self) -> Option<EntryId>;
37
38 fn next(&mut self) -> Option<EntryId>;
40
41 #[cfg(test)]
42 fn as_any(&self) -> &dyn std::any::Any;
43}
44
45#[derive(Debug)]
47pub struct RegionWalRange {
48 current_entry_id: EntryId,
49 end_entry_id: EntryId,
50 max_batch_size: usize,
51}
52
53impl RegionWalRange {
54 pub fn new(range: Range<EntryId>, max_batch_size: usize) -> Self {
55 Self {
56 current_entry_id: range.start,
57 end_entry_id: range.end,
58 max_batch_size,
59 }
60 }
61
62 fn next_batch_size(&self) -> Option<u64> {
63 if self.current_entry_id < self.end_entry_id {
64 Some(
65 self.end_entry_id
66 .checked_sub(self.current_entry_id)
67 .unwrap_or_default(),
68 )
69 } else {
70 None
71 }
72 }
73}
74
75impl RegionWalIndexIterator for RegionWalRange {
76 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
77 if let Some(size) = self.next_batch_size() {
78 let bytes = min(size as usize * avg_size, self.max_batch_size);
79 let len = bytes / avg_size;
80
81 return Some(NextBatchHint { bytes, len });
82 }
83
84 None
85 }
86
87 fn peek(&self) -> Option<EntryId> {
88 if self.current_entry_id < self.end_entry_id {
89 Some(self.current_entry_id)
90 } else {
91 None
92 }
93 }
94
95 fn next(&mut self) -> Option<EntryId> {
96 if self.current_entry_id < self.end_entry_id {
97 let next = self.current_entry_id;
98 self.current_entry_id += 1;
99 Some(next)
100 } else {
101 None
102 }
103 }
104
105 #[cfg(test)]
106 fn as_any(&self) -> &dyn std::any::Any {
107 self
108 }
109}
110
111pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
112
113#[derive(Debug)]
116pub struct RegionWalVecIndex {
117 index: VecDeque<EntryId>,
118 min_batch_window_size: usize,
119}
120
121impl RegionWalVecIndex {
122 pub fn new<I: IntoIterator<Item = EntryId>>(index: I, min_batch_window_size: usize) -> Self {
123 Self {
124 index: index.into_iter().collect::<VecDeque<_>>(),
125 min_batch_window_size,
126 }
127 }
128}
129
130impl RegionWalIndexIterator for RegionWalVecIndex {
131 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
132 let merger = MergeRange::new(
133 ConvertIndexToRange::new(self.index.iter().peekable(), avg_size),
134 self.min_batch_window_size,
135 );
136
137 merger.merge().map(|(range, size)| NextBatchHint {
138 bytes: range.end - range.start - 1,
139 len: size,
140 })
141 }
142
143 fn peek(&self) -> Option<EntryId> {
144 self.index.front().cloned()
145 }
146
147 fn next(&mut self) -> Option<EntryId> {
148 self.index.pop_front()
149 }
150
151 #[cfg(test)]
152 fn as_any(&self) -> &dyn std::any::Any {
153 self
154 }
155}
156
157#[derive(Debug)]
161pub struct MultipleRegionWalIndexIterator {
162 iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
163}
164
165impl MultipleRegionWalIndexIterator {
166 pub fn new<I: IntoIterator<Item = Box<dyn RegionWalIndexIterator>>>(iterator: I) -> Self {
167 Self {
168 iterator: iterator.into_iter().collect::<VecDeque<_>>(),
169 }
170 }
171}
172
173impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
174 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
175 for iter in &self.iterator {
176 if let Some(batch) = iter.next_batch_hint(avg_size) {
177 return Some(batch);
178 }
179 }
180
181 None
182 }
183
184 fn peek(&self) -> Option<EntryId> {
185 for iter in &self.iterator {
186 let peek = iter.peek();
187 if peek.is_some() {
188 return peek;
189 }
190 }
191
192 None
193 }
194
195 fn next(&mut self) -> Option<EntryId> {
196 while !self.iterator.is_empty() {
197 let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none();
198 if remove {
199 self.iterator.pop_front();
200 } else {
201 break;
202 }
203 }
204
205 self.iterator.front_mut().and_then(|iter| iter.next())
206 }
207
208 #[cfg(test)]
209 fn as_any(&self) -> &dyn std::any::Any {
210 self
211 }
212}
213
214pub fn build_region_wal_index_iterator(
218 start_entry_id: EntryId,
219 end_entry_id: EntryId,
220 region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
221 max_batch_bytes: usize,
222 min_window_size: usize,
223) -> Option<Box<dyn RegionWalIndexIterator>> {
224 if (start_entry_id..end_entry_id).is_empty() {
225 return None;
226 }
227
228 match region_indexes {
229 Some((region_indexes, last_index)) => {
230 if region_indexes.is_empty() && last_index >= end_entry_id {
231 return None;
232 }
233
234 let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
235 if !region_indexes.is_empty() {
236 let index = RegionWalVecIndex::new(region_indexes, min_window_size);
237 iterator.push(Box::new(index));
238 }
239 let known_last_index = max(last_index, start_entry_id);
240 if known_last_index < end_entry_id {
241 let range = known_last_index..end_entry_id;
242 let index = RegionWalRange::new(range, max_batch_bytes);
243 iterator.push(Box::new(index));
244 }
245
246 Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
247 }
248 None => {
249 let range = start_entry_id..end_entry_id;
250
251 Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
252 }
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[test]
261 fn test_region_wal_range() {
262 let range = RegionWalRange::new(0..1024, 1024);
263 assert_eq!(
264 range.next_batch_hint(10),
265 Some(NextBatchHint {
266 bytes: 1024,
267 len: 102
268 })
269 );
270
271 let mut range = RegionWalRange::new(0..1, 1024);
272
273 assert_eq!(range.next_batch_size(), Some(1));
274 assert_eq!(range.peek(), Some(0));
275
276 assert_eq!(range.next(), Some(0));
278 assert_eq!(range.next_batch_size(), None);
279
280 assert_eq!(range.next(), None);
282 assert_eq!(range.next_batch_size(), None);
283 assert_eq!(range.next(), None);
285 assert_eq!(range.next_batch_size(), None);
286
287 let mut range = RegionWalRange::new(0..0, 1024);
288 assert_eq!(range.next_batch_size(), None);
289 assert_eq!(range.next(), None);
291 assert_eq!(range.next_batch_size(), None);
292 }
293
294 #[test]
295 fn test_region_wal_vec_index() {
296 let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30);
297 assert_eq!(
298 index.next_batch_hint(10),
299 Some(NextBatchHint { bytes: 30, len: 3 })
300 );
301 assert_eq!(index.peek(), Some(0));
302 assert_eq!(index.next(), Some(0));
304 assert_eq!(
305 index.next_batch_hint(10),
306 Some(NextBatchHint { bytes: 20, len: 2 })
307 );
308 assert_eq!(index.next(), Some(1));
310 assert_eq!(
311 index.next_batch_hint(10),
312 Some(NextBatchHint { bytes: 10, len: 1 })
313 );
314 assert_eq!(index.next(), Some(2));
316 assert_eq!(
317 index.next_batch_hint(10),
318 Some(NextBatchHint { bytes: 20, len: 2 })
319 );
320 assert_eq!(index.next(), Some(7));
322 assert_eq!(
323 index.next_batch_hint(10),
324 Some(NextBatchHint { bytes: 40, len: 2 })
325 );
326 assert_eq!(index.next(), Some(8));
328 assert_eq!(
329 index.next_batch_hint(10),
330 Some(NextBatchHint { bytes: 10, len: 1 })
331 );
332 assert_eq!(index.next(), Some(11));
334 assert_eq!(index.next_batch_hint(10), None);
335
336 assert_eq!(index.next(), None);
338 assert_eq!(index.next_batch_hint(10), None);
339
340 let mut index = RegionWalVecIndex::new([], 1024);
341 assert_eq!(index.next_batch_hint(10), None);
342 assert_eq!(index.peek(), None);
343 assert_eq!(index.peek(), None);
345 assert_eq!(index.next(), None);
346 assert_eq!(index.next_batch_hint(10), None);
347 }
348
349 #[test]
350 fn test_multiple_region_wal_iterator() {
351 let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
352 let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _;
353 let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _;
354 let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]);
355
356 assert_eq!(
358 iter.next_batch_hint(10),
359 Some(NextBatchHint { bytes: 30, len: 3 })
360 );
361 assert_eq!(iter.peek(), Some(0));
362 assert_eq!(iter.next(), Some(0));
364
365 assert_eq!(
367 iter.next_batch_hint(10),
368 Some(NextBatchHint { bytes: 20, len: 2 })
369 );
370 assert_eq!(iter.peek(), Some(1));
371 assert_eq!(iter.next(), Some(1));
373
374 assert_eq!(
376 iter.next_batch_hint(10),
377 Some(NextBatchHint { bytes: 10, len: 1 })
378 );
379 assert_eq!(iter.peek(), Some(2));
380
381 assert_eq!(iter.next(), Some(2));
383 assert_eq!(
385 iter.next_batch_hint(10),
386 Some(NextBatchHint { bytes: 50, len: 3 })
387 );
388 assert_eq!(iter.peek(), Some(7));
389
390 assert_eq!(iter.next(), Some(7));
392 assert_eq!(
394 iter.next_batch_hint(10),
395 Some(NextBatchHint { bytes: 40, len: 2 })
396 );
397 assert_eq!(iter.peek(), Some(8));
398
399 assert_eq!(iter.next(), Some(8));
401 assert_eq!(
403 iter.next_batch_hint(10),
404 Some(NextBatchHint { bytes: 10, len: 1 })
405 );
406 assert_eq!(iter.peek(), Some(11));
407 assert_eq!(iter.next(), Some(11));
409
410 assert_eq!(iter.next_batch_hint(10), None,);
411 assert_eq!(iter.peek(), None);
412 assert!(!iter.iterator.is_empty());
413 assert_eq!(iter.next(), None);
414 assert!(iter.iterator.is_empty());
415
416 assert_eq!(iter.next(), None);
418 assert_eq!(iter.next_batch_hint(10), None,);
419 assert_eq!(iter.peek(), None);
420 assert_eq!(iter.next(), None);
421 }
422
423 #[test]
424 fn test_build_region_wal_index_iterator() {
425 let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
426 assert!(iterator.is_none());
427
428 let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
429 assert!(iterator.is_none());
430
431 let iterator =
432 build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
433 assert!(iterator.is_none());
434
435 let iterator =
436 build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
437 assert!(iterator.is_none());
438
439 let iterator =
440 build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
441 assert!(iterator.is_none());
442
443 let iterator = build_region_wal_index_iterator(
444 1,
445 1024,
446 Some((BTreeSet::from([512, 756]), 1024)),
447 5,
448 5,
449 )
450 .unwrap();
451 let iter = iterator
452 .as_any()
453 .downcast_ref::<MultipleRegionWalIndexIterator>()
454 .unwrap();
455 assert_eq!(iter.iterator.len(), 1);
456 let vec_index = iter.iterator[0]
457 .as_any()
458 .downcast_ref::<RegionWalVecIndex>()
459 .unwrap();
460 assert_eq!(vec_index.index, VecDeque::from([512, 756]));
461
462 let iterator = build_region_wal_index_iterator(
463 1,
464 1024,
465 Some((BTreeSet::from([512, 756]), 1023)),
466 5,
467 5,
468 )
469 .unwrap();
470 let iter = iterator
471 .as_any()
472 .downcast_ref::<MultipleRegionWalIndexIterator>()
473 .unwrap();
474 assert_eq!(iter.iterator.len(), 2);
475 let vec_index = iter.iterator[0]
476 .as_any()
477 .downcast_ref::<RegionWalVecIndex>()
478 .unwrap();
479 assert_eq!(vec_index.index, VecDeque::from([512, 756]));
480 let wal_range = iter.iterator[1]
481 .as_any()
482 .downcast_ref::<RegionWalRange>()
483 .unwrap();
484 assert_eq!(wal_range.current_entry_id, 1023);
485 assert_eq!(wal_range.end_entry_id, 1024);
486 }
487}