1use std::cmp::{max, min};
16use std::collections::{BTreeSet, VecDeque};
17use std::fmt::Debug;
18use std::ops::Range;
19
20use store_api::logstore::EntryId;
21
22use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
23
24#[derive(Debug, PartialEq, Eq)]
25pub(crate) struct NextBatchHint {
26 pub(crate) bytes: usize,
27 pub(crate) len: usize,
28}
29
30pub trait RegionWalIndexIterator: Send + Sync + Debug {
32 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
34
35 fn peek(&self) -> Option<EntryId>;
37
38 fn next(&mut self) -> Option<EntryId>;
40
41 #[cfg(test)]
42 fn as_any(&self) -> &dyn std::any::Any;
43}
44
45#[derive(Debug)]
47pub struct RegionWalRange {
48 current_entry_id: EntryId,
49 end_entry_id: EntryId,
50 max_batch_size: usize,
51}
52
53impl RegionWalRange {
54 pub fn new(range: Range<EntryId>, max_batch_size: usize) -> Self {
55 Self {
56 current_entry_id: range.start,
57 end_entry_id: range.end,
58 max_batch_size,
59 }
60 }
61
62 fn next_batch_size(&self) -> Option<u64> {
63 if self.current_entry_id < self.end_entry_id {
64 Some(self.end_entry_id.saturating_sub(self.current_entry_id))
65 } else {
66 None
67 }
68 }
69}
70
71impl RegionWalIndexIterator for RegionWalRange {
72 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
73 if let Some(size) = self.next_batch_size() {
74 let bytes = min(size as usize * avg_size, self.max_batch_size);
75 let len = bytes / avg_size;
76
77 return Some(NextBatchHint { bytes, len });
78 }
79
80 None
81 }
82
83 fn peek(&self) -> Option<EntryId> {
84 if self.current_entry_id < self.end_entry_id {
85 Some(self.current_entry_id)
86 } else {
87 None
88 }
89 }
90
91 fn next(&mut self) -> Option<EntryId> {
92 if self.current_entry_id < self.end_entry_id {
93 let next = self.current_entry_id;
94 self.current_entry_id += 1;
95 Some(next)
96 } else {
97 None
98 }
99 }
100
101 #[cfg(test)]
102 fn as_any(&self) -> &dyn std::any::Any {
103 self
104 }
105}
106
107pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
108
109#[derive(Debug)]
112pub struct RegionWalVecIndex {
113 index: VecDeque<EntryId>,
114 min_batch_window_size: usize,
115}
116
117impl RegionWalVecIndex {
118 pub fn new<I: IntoIterator<Item = EntryId>>(index: I, min_batch_window_size: usize) -> Self {
119 Self {
120 index: index.into_iter().collect::<VecDeque<_>>(),
121 min_batch_window_size,
122 }
123 }
124}
125
126impl RegionWalIndexIterator for RegionWalVecIndex {
127 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
128 let merger = MergeRange::new(
129 ConvertIndexToRange::new(self.index.iter().peekable(), avg_size),
130 self.min_batch_window_size,
131 );
132
133 merger.merge().map(|(range, size)| NextBatchHint {
134 bytes: range.end - range.start - 1,
135 len: size,
136 })
137 }
138
139 fn peek(&self) -> Option<EntryId> {
140 self.index.front().cloned()
141 }
142
143 fn next(&mut self) -> Option<EntryId> {
144 self.index.pop_front()
145 }
146
147 #[cfg(test)]
148 fn as_any(&self) -> &dyn std::any::Any {
149 self
150 }
151}
152
153#[derive(Debug)]
157pub struct MultipleRegionWalIndexIterator {
158 iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
159}
160
161impl MultipleRegionWalIndexIterator {
162 pub fn new<I: IntoIterator<Item = Box<dyn RegionWalIndexIterator>>>(iterator: I) -> Self {
163 Self {
164 iterator: iterator.into_iter().collect::<VecDeque<_>>(),
165 }
166 }
167}
168
169impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
170 fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
171 for iter in &self.iterator {
172 if let Some(batch) = iter.next_batch_hint(avg_size) {
173 return Some(batch);
174 }
175 }
176
177 None
178 }
179
180 fn peek(&self) -> Option<EntryId> {
181 for iter in &self.iterator {
182 let peek = iter.peek();
183 if peek.is_some() {
184 return peek;
185 }
186 }
187
188 None
189 }
190
191 fn next(&mut self) -> Option<EntryId> {
192 while !self.iterator.is_empty() {
193 let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none();
194 if remove {
195 self.iterator.pop_front();
196 } else {
197 break;
198 }
199 }
200
201 self.iterator.front_mut().and_then(|iter| iter.next())
202 }
203
204 #[cfg(test)]
205 fn as_any(&self) -> &dyn std::any::Any {
206 self
207 }
208}
209
210pub fn build_region_wal_index_iterator(
214 start_entry_id: EntryId,
215 end_entry_id: EntryId,
216 region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
217 max_batch_bytes: usize,
218 min_window_size: usize,
219) -> Option<Box<dyn RegionWalIndexIterator>> {
220 if (start_entry_id..end_entry_id).is_empty() {
221 return None;
222 }
223
224 match region_indexes {
225 Some((region_indexes, last_index)) => {
226 if region_indexes.is_empty() && last_index >= end_entry_id {
227 return None;
228 }
229
230 let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
231 if !region_indexes.is_empty() {
232 let index = RegionWalVecIndex::new(region_indexes, min_window_size);
233 iterator.push(Box::new(index));
234 }
235 let known_last_index = max(last_index, start_entry_id);
236 if known_last_index < end_entry_id {
237 let range = known_last_index..end_entry_id;
238 let index = RegionWalRange::new(range, max_batch_bytes);
239 iterator.push(Box::new(index));
240 }
241
242 Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
243 }
244 None => {
245 let range = start_entry_id..end_entry_id;
246
247 Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
248 }
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_region_wal_range() {
258 let range = RegionWalRange::new(0..1024, 1024);
259 assert_eq!(
260 range.next_batch_hint(10),
261 Some(NextBatchHint {
262 bytes: 1024,
263 len: 102
264 })
265 );
266
267 let mut range = RegionWalRange::new(0..1, 1024);
268
269 assert_eq!(range.next_batch_size(), Some(1));
270 assert_eq!(range.peek(), Some(0));
271
272 assert_eq!(range.next(), Some(0));
274 assert_eq!(range.next_batch_size(), None);
275
276 assert_eq!(range.next(), None);
278 assert_eq!(range.next_batch_size(), None);
279 assert_eq!(range.next(), None);
281 assert_eq!(range.next_batch_size(), None);
282
283 let mut range = RegionWalRange::new(0..0, 1024);
284 assert_eq!(range.next_batch_size(), None);
285 assert_eq!(range.next(), None);
287 assert_eq!(range.next_batch_size(), None);
288 }
289
290 #[test]
291 fn test_region_wal_vec_index() {
292 let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30);
293 assert_eq!(
294 index.next_batch_hint(10),
295 Some(NextBatchHint { bytes: 30, len: 3 })
296 );
297 assert_eq!(index.peek(), Some(0));
298 assert_eq!(index.next(), Some(0));
300 assert_eq!(
301 index.next_batch_hint(10),
302 Some(NextBatchHint { bytes: 20, len: 2 })
303 );
304 assert_eq!(index.next(), Some(1));
306 assert_eq!(
307 index.next_batch_hint(10),
308 Some(NextBatchHint { bytes: 10, len: 1 })
309 );
310 assert_eq!(index.next(), Some(2));
312 assert_eq!(
313 index.next_batch_hint(10),
314 Some(NextBatchHint { bytes: 20, len: 2 })
315 );
316 assert_eq!(index.next(), Some(7));
318 assert_eq!(
319 index.next_batch_hint(10),
320 Some(NextBatchHint { bytes: 40, len: 2 })
321 );
322 assert_eq!(index.next(), Some(8));
324 assert_eq!(
325 index.next_batch_hint(10),
326 Some(NextBatchHint { bytes: 10, len: 1 })
327 );
328 assert_eq!(index.next(), Some(11));
330 assert_eq!(index.next_batch_hint(10), None);
331
332 assert_eq!(index.next(), None);
334 assert_eq!(index.next_batch_hint(10), None);
335
336 let mut index = RegionWalVecIndex::new([], 1024);
337 assert_eq!(index.next_batch_hint(10), None);
338 assert_eq!(index.peek(), None);
339 assert_eq!(index.peek(), None);
341 assert_eq!(index.next(), None);
342 assert_eq!(index.next_batch_hint(10), None);
343 }
344
345 #[test]
346 fn test_multiple_region_wal_iterator() {
347 let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
348 let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _;
349 let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _;
350 let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]);
351
352 assert_eq!(
354 iter.next_batch_hint(10),
355 Some(NextBatchHint { bytes: 30, len: 3 })
356 );
357 assert_eq!(iter.peek(), Some(0));
358 assert_eq!(iter.next(), Some(0));
360
361 assert_eq!(
363 iter.next_batch_hint(10),
364 Some(NextBatchHint { bytes: 20, len: 2 })
365 );
366 assert_eq!(iter.peek(), Some(1));
367 assert_eq!(iter.next(), Some(1));
369
370 assert_eq!(
372 iter.next_batch_hint(10),
373 Some(NextBatchHint { bytes: 10, len: 1 })
374 );
375 assert_eq!(iter.peek(), Some(2));
376
377 assert_eq!(iter.next(), Some(2));
379 assert_eq!(
381 iter.next_batch_hint(10),
382 Some(NextBatchHint { bytes: 50, len: 3 })
383 );
384 assert_eq!(iter.peek(), Some(7));
385
386 assert_eq!(iter.next(), Some(7));
388 assert_eq!(
390 iter.next_batch_hint(10),
391 Some(NextBatchHint { bytes: 40, len: 2 })
392 );
393 assert_eq!(iter.peek(), Some(8));
394
395 assert_eq!(iter.next(), Some(8));
397 assert_eq!(
399 iter.next_batch_hint(10),
400 Some(NextBatchHint { bytes: 10, len: 1 })
401 );
402 assert_eq!(iter.peek(), Some(11));
403 assert_eq!(iter.next(), Some(11));
405
406 assert_eq!(iter.next_batch_hint(10), None,);
407 assert_eq!(iter.peek(), None);
408 assert!(!iter.iterator.is_empty());
409 assert_eq!(iter.next(), None);
410 assert!(iter.iterator.is_empty());
411
412 assert_eq!(iter.next(), None);
414 assert_eq!(iter.next_batch_hint(10), None,);
415 assert_eq!(iter.peek(), None);
416 assert_eq!(iter.next(), None);
417 }
418
419 #[test]
420 fn test_build_region_wal_index_iterator() {
421 let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
422 assert!(iterator.is_none());
423
424 let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
425 assert!(iterator.is_none());
426
427 let iterator =
428 build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
429 assert!(iterator.is_none());
430
431 let iterator =
432 build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
433 assert!(iterator.is_none());
434
435 let iterator =
436 build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
437 assert!(iterator.is_none());
438
439 let iterator = build_region_wal_index_iterator(
440 1,
441 1024,
442 Some((BTreeSet::from([512, 756]), 1024)),
443 5,
444 5,
445 )
446 .unwrap();
447 let iter = iterator
448 .as_any()
449 .downcast_ref::<MultipleRegionWalIndexIterator>()
450 .unwrap();
451 assert_eq!(iter.iterator.len(), 1);
452 let vec_index = iter.iterator[0]
453 .as_any()
454 .downcast_ref::<RegionWalVecIndex>()
455 .unwrap();
456 assert_eq!(vec_index.index, VecDeque::from([512, 756]));
457
458 let iterator = build_region_wal_index_iterator(
459 1,
460 1024,
461 Some((BTreeSet::from([512, 756]), 1023)),
462 5,
463 5,
464 )
465 .unwrap();
466 let iter = iterator
467 .as_any()
468 .downcast_ref::<MultipleRegionWalIndexIterator>()
469 .unwrap();
470 assert_eq!(iter.iterator.len(), 2);
471 let vec_index = iter.iterator[0]
472 .as_any()
473 .downcast_ref::<RegionWalVecIndex>()
474 .unwrap();
475 assert_eq!(vec_index.index, VecDeque::from([512, 756]));
476 let wal_range = iter.iterator[1]
477 .as_any()
478 .downcast_ref::<RegionWalRange>()
479 .unwrap();
480 assert_eq!(wal_range.current_entry_id, 1023);
481 assert_eq!(wal_range.end_entry_id, 1024);
482 }
483}