1use std::collections::VecDeque;
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use derive_builder::Builder;
23use futures::future::{BoxFuture, Fuse, FusedFuture};
24use futures::{FutureExt, Stream};
25use pin_project::pin_project;
26use rskafka::client::partition::PartitionClient;
27use rskafka::record::RecordAndOffset;
28
29use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
30
31pub struct FetchResult {
32 pub records: Vec<RecordAndOffset>,
34
35 pub high_watermark: i64,
37
38 pub encoded_response_size: usize,
40}
41
42#[async_trait::async_trait]
43pub trait FetchClient: std::fmt::Debug + Send + Sync {
44 async fn fetch_records(
48 &self,
49 offset: i64,
50 bytes: Range<i32>,
51 max_wait_ms: i32,
52 ) -> rskafka::client::error::Result<FetchResult>;
53
54 fn topic(&self) -> &str;
55}
56
57#[async_trait::async_trait]
58impl FetchClient for PartitionClient {
59 async fn fetch_records(
60 &self,
61 offset: i64,
62 bytes: Range<i32>,
63 max_wait_ms: i32,
64 ) -> rskafka::client::error::Result<FetchResult> {
65 self.fetch_records(offset, bytes, max_wait_ms)
66 .await
67 .map(|r| FetchResult {
68 records: r.records,
69 high_watermark: r.high_watermark,
70 encoded_response_size: r.encoded_response_size,
71 })
72 }
73
74 fn topic(&self) -> &str {
75 self.topic()
76 }
77}
78
79struct FetchResultInner {
80 records_and_offsets: Vec<RecordAndOffset>,
81 batch_size: usize,
82 fetch_bytes: usize,
83 watermark: i64,
84 used_offset: i64,
85}
86
87const MAX_BATCH_SIZE: usize = 52428800;
88const AVG_RECORD_SIZE: usize = 256 * 1024;
89
90#[pin_project]
93#[derive(Builder)]
94#[builder(pattern = "owned")]
95pub struct Consumer {
96 #[builder(default = "-1")]
97 last_high_watermark: i64,
98
99 client: Arc<dyn FetchClient>,
101
102 #[builder(default = "MAX_BATCH_SIZE")]
104 max_batch_size: usize,
105
106 #[builder(default = "500")]
108 max_wait_ms: u32,
109
110 #[builder(default = "AVG_RECORD_SIZE")]
112 avg_record_size: usize,
113
114 #[builder(default = "false")]
116 terminated: bool,
117
118 buffer: RecordsBuffer,
120
121 #[builder(default = "Fuse::terminated()")]
123 fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResultInner>>>,
124
125 #[builder(default = "0")]
127 total_fetched_bytes: u64,
128}
129
130impl Consumer {
131 pub fn total_fetched_bytes(&self) -> u64 {
133 self.total_fetched_bytes
134 }
135
136 pub fn topic(&self) -> &str {
138 self.client.topic()
139 }
140}
141
142pub(crate) struct RecordsBuffer {
143 buffer: VecDeque<RecordAndOffset>,
144
145 index: Box<dyn RegionWalIndexIterator>,
146}
147
148impl RecordsBuffer {
149 pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
151 RecordsBuffer {
152 buffer: VecDeque::new(),
153 index,
154 }
155 }
156}
157
158impl RecordsBuffer {
159 fn pop_front(&mut self) -> Option<RecordAndOffset> {
160 while let Some(index) = self.index.peek() {
161 if let Some(record_and_offset) = self.buffer.pop_front() {
162 if index == record_and_offset.offset as u64 {
163 self.index.next();
164 return Some(record_and_offset);
165 }
166 } else {
167 return None;
168 }
169 }
170
171 self.buffer.clear();
172 None
173 }
174
175 fn extend(&mut self, records: Vec<RecordAndOffset>) {
176 if let (Some(first), Some(index)) = (records.first(), self.index.peek()) {
177 assert!(
179 index <= first.offset as u64,
180 "index: {index}, first offset: {}",
181 first.offset
182 );
183 }
184 self.buffer.extend(records);
185 }
186}
187
188impl Stream for Consumer {
189 type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>;
190
191 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 let this = self.project();
193
194 loop {
195 if *this.terminated {
196 return Poll::Ready(None);
197 }
198
199 if this.buffer.index.peek().is_none() {
200 return Poll::Ready(None);
201 }
202
203 if let Some(x) = this.buffer.pop_front() {
204 debug!("Yielding record with offset: {}", x.offset);
205 return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
206 }
207
208 if this.fetch_fut.is_terminated() {
209 match this.buffer.index.peek() {
210 Some(next_offset) => {
211 let client = Arc::clone(this.client);
212 let max_wait_ms = *this.max_wait_ms as i32;
213 let offset = next_offset as i64;
214 let NextBatchHint { bytes, len } = this
215 .buffer
216 .index
217 .next_batch_hint(*this.avg_record_size)
218 .unwrap_or(NextBatchHint {
219 bytes: *this.avg_record_size,
220 len: 1,
221 });
222
223 let fetch_range =
224 1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
225 *this.fetch_fut = FutureExt::fuse(Box::pin(async move {
226 let FetchResult {
227 records: records_and_offsets,
228 high_watermark: watermark,
229 encoded_response_size,
230 ..
231 } = client
232 .fetch_records(offset, fetch_range, max_wait_ms)
233 .await?;
234
235 Ok(FetchResultInner {
236 records_and_offsets,
237 watermark,
238 used_offset: offset,
239 fetch_bytes: encoded_response_size,
240 batch_size: len,
241 })
242 }));
243 }
244 None => {
245 return Poll::Ready(None);
246 }
247 }
248 }
249
250 let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
251
252 match data {
253 Ok(FetchResultInner {
254 mut records_and_offsets,
255 watermark,
256 used_offset,
257 fetch_bytes,
258 batch_size,
259 }) => {
260 records_and_offsets.sort_unstable_by_key(|x| x.offset);
262 *this.last_high_watermark = watermark;
263 if !records_and_offsets.is_empty() {
264 *this.avg_record_size = fetch_bytes / records_and_offsets.len();
265 debug!("set avg_record_size: {}", *this.avg_record_size);
266 }
267 *this.total_fetched_bytes += fetch_bytes as u64;
268
269 debug!(
270 "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
271 records_and_offsets
272 .iter()
273 .map(|record| record.offset)
274 .collect::<Vec<_>>(),
275 records_and_offsets
276 .len()
277 );
278 this.buffer.extend(records_and_offsets);
279 continue;
280 }
281 Err(e) => {
282 *this.terminated = true;
283
284 return Poll::Ready(Some(Err(e)));
285 }
286 }
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use std::collections::VecDeque;
294 use std::ops::Range;
295 use std::sync::Arc;
296
297 use chrono::{TimeZone, Utc};
298 use futures::future::Fuse;
299 use futures::TryStreamExt;
300 use rskafka::record::{Record, RecordAndOffset};
301
302 use super::*;
303 use crate::kafka::consumer::{Consumer, RecordsBuffer};
304 use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
305
306 #[derive(Debug)]
307 struct MockFetchClient {
308 record: Record,
309 }
310
311 #[async_trait::async_trait]
312 impl FetchClient for MockFetchClient {
313 async fn fetch_records(
314 &self,
315 offset: i64,
316 bytes: Range<i32>,
317 _max_wait_ms: i32,
318 ) -> rskafka::client::error::Result<FetchResult> {
319 let record_size = self.record.approximate_size();
320 let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
321
322 let records = (0..num)
323 .map(|idx| RecordAndOffset {
324 record: self.record.clone(),
325 offset: offset + idx as i64,
326 })
327 .collect::<Vec<_>>();
328
329 let max_offset = offset + records.len() as i64;
330 let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
331 Ok(FetchResult {
332 records,
333 high_watermark: max_offset,
334 encoded_response_size,
335 })
336 }
337
338 fn topic(&self) -> &str {
339 "test"
340 }
341 }
342
343 fn test_record() -> Record {
344 Record {
345 key: Some(vec![0; 4]),
346 value: Some(vec![0; 6]),
347 headers: Default::default(),
348 timestamp: Utc.timestamp_millis_opt(1337).unwrap(),
349 }
350 }
351
352 #[tokio::test]
353 async fn test_consumer_with_index() {
354 common_telemetry::init_default_ut_logging();
355 let record = test_record();
356 let record_size = record.approximate_size();
357 let mock_client = MockFetchClient {
358 record: record.clone(),
359 };
360 let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3);
361 let consumer = Consumer {
362 last_high_watermark: -1,
363 client: Arc::new(mock_client),
364 max_batch_size: usize::MAX,
365 max_wait_ms: 500,
366 avg_record_size: record_size,
367 terminated: false,
368 buffer: RecordsBuffer {
369 buffer: VecDeque::new(),
370 index: Box::new(index),
371 },
372 fetch_fut: Fuse::terminated(),
373 total_fetched_bytes: 0,
374 };
375
376 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
377 assert_eq!(
378 records
379 .into_iter()
380 .map(|(x, _)| x.offset)
381 .collect::<Vec<_>>(),
382 vec![1, 3, 4, 8, 10, 12]
383 )
384 }
385
386 #[tokio::test]
387 async fn test_consumer_without_index() {
388 common_telemetry::init_default_ut_logging();
389 let record = test_record();
390 let mock_client = MockFetchClient {
391 record: record.clone(),
392 };
393 let index = RegionWalRange::new(0..30, 1024);
394 let consumer = Consumer {
395 last_high_watermark: -1,
396 client: Arc::new(mock_client),
397 max_batch_size: usize::MAX,
398 max_wait_ms: 500,
399 avg_record_size: record.approximate_size(),
400 terminated: false,
401 buffer: RecordsBuffer {
402 buffer: VecDeque::new(),
403 index: Box::new(index),
404 },
405 fetch_fut: Fuse::terminated(),
406 total_fetched_bytes: 0,
407 };
408
409 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
410 assert_eq!(
411 records
412 .into_iter()
413 .map(|(x, _)| x.offset)
414 .collect::<Vec<_>>(),
415 (0..30).collect::<Vec<_>>()
416 )
417 }
418
419 #[tokio::test]
420 async fn test_consumer_with_multiple_index() {
421 common_telemetry::init_default_ut_logging();
422 let record = test_record();
423 let mock_client = MockFetchClient {
424 record: record.clone(),
425 };
426
427 let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
428 let iter1 = Box::new(RegionWalVecIndex::new(
429 [0, 1, 2, 7, 8, 11],
430 record.approximate_size() * 4,
431 )) as _;
432 let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _;
433 let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _;
434 let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]);
435
436 let consumer = Consumer {
437 last_high_watermark: -1,
438 client: Arc::new(mock_client),
439 max_batch_size: usize::MAX,
440 max_wait_ms: 500,
441 avg_record_size: record.approximate_size(),
442 terminated: false,
443 buffer: RecordsBuffer {
444 buffer: VecDeque::new(),
445 index: Box::new(iter),
446 },
447 fetch_fut: Fuse::terminated(),
448 total_fetched_bytes: 0,
449 };
450
451 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
452 assert_eq!(
453 records
454 .into_iter()
455 .map(|(x, _)| x.offset)
456 .collect::<Vec<_>>(),
457 [0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027]
458 )
459 }
460}