1use std::collections::VecDeque;
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use derive_builder::Builder;
23use futures::future::{BoxFuture, Fuse, FusedFuture};
24use futures::{FutureExt, Stream};
25use pin_project::pin_project;
26use rskafka::client::partition::PartitionClient;
27use rskafka::record::RecordAndOffset;
28
29use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
30
31pub struct FetchResult {
32 pub records: Vec<RecordAndOffset>,
34
35 pub high_watermark: i64,
37
38 pub encoded_response_size: usize,
40}
41
42#[async_trait::async_trait]
43pub trait FetchClient: std::fmt::Debug + Send + Sync {
44 async fn fetch_records(
48 &self,
49 offset: i64,
50 bytes: Range<i32>,
51 max_wait_ms: i32,
52 ) -> rskafka::client::error::Result<FetchResult>;
53
54 fn topic(&self) -> &str;
55}
56
57#[async_trait::async_trait]
58impl FetchClient for PartitionClient {
59 async fn fetch_records(
60 &self,
61 offset: i64,
62 bytes: Range<i32>,
63 max_wait_ms: i32,
64 ) -> rskafka::client::error::Result<FetchResult> {
65 self.fetch_records(offset, bytes, max_wait_ms)
66 .await
67 .map(|r| FetchResult {
68 records: r.records,
69 high_watermark: r.high_watermark,
70 encoded_response_size: r.encoded_response_size,
71 })
72 }
73
74 fn topic(&self) -> &str {
75 self.topic()
76 }
77}
78
79struct FetchResultInner {
80 records_and_offsets: Vec<RecordAndOffset>,
81 batch_size: usize,
82 fetch_bytes: usize,
83 watermark: i64,
84 used_offset: i64,
85}
86
87const MAX_BATCH_SIZE: usize = 52428800;
88const AVG_RECORD_SIZE: usize = 256 * 1024;
89
90#[pin_project]
93#[derive(Builder)]
94#[builder(pattern = "owned")]
95pub struct Consumer {
96 #[builder(default = "-1")]
97 last_high_watermark: i64,
98
99 client: Arc<dyn FetchClient>,
101
102 #[builder(default = "MAX_BATCH_SIZE")]
104 max_batch_size: usize,
105
106 #[builder(default = "500")]
108 max_wait_ms: u32,
109
110 #[builder(default = "AVG_RECORD_SIZE")]
112 avg_record_size: usize,
113
114 #[builder(default = "false")]
116 terminated: bool,
117
118 buffer: RecordsBuffer,
120
121 #[builder(default = "Fuse::terminated()")]
123 fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResultInner>>>,
124
125 #[builder(default = "0")]
127 total_fetched_bytes: u64,
128}
129
130impl Consumer {
131 pub fn total_fetched_bytes(&self) -> u64 {
133 self.total_fetched_bytes
134 }
135
136 pub fn topic(&self) -> &str {
138 self.client.topic()
139 }
140}
141
142pub(crate) struct RecordsBuffer {
143 buffer: VecDeque<RecordAndOffset>,
144
145 index: Box<dyn RegionWalIndexIterator>,
146}
147
148impl RecordsBuffer {
149 pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
151 RecordsBuffer {
152 buffer: VecDeque::new(),
153 index,
154 }
155 }
156}
157
158impl RecordsBuffer {
159 fn pop_front(&mut self) -> Option<RecordAndOffset> {
160 while let Some(index) = self.index.peek() {
161 if let Some(record_and_offset) = self.buffer.pop_front() {
162 if index == record_and_offset.offset as u64 {
163 self.index.next();
164 return Some(record_and_offset);
165 }
166 } else {
167 return None;
168 }
169 }
170
171 self.buffer.clear();
172 None
173 }
174
175 fn extend(&mut self, records: Vec<RecordAndOffset>) {
176 if let (Some(first), Some(index)) = (records.first(), self.index.peek()) {
177 assert!(
179 index <= first.offset as u64,
180 "index: {index}, first offset: {}",
181 first.offset
182 );
183 }
184 self.buffer.extend(records);
185 }
186}
187
188impl Stream for Consumer {
189 type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>;
190
191 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 let this = self.project();
193
194 loop {
195 if *this.terminated {
196 return Poll::Ready(None);
197 }
198
199 if this.buffer.index.peek().is_none() {
200 return Poll::Ready(None);
201 }
202
203 if let Some(x) = this.buffer.pop_front() {
204 debug!("Yielding record with offset: {}", x.offset);
205 return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
206 }
207
208 if this.fetch_fut.is_terminated() {
209 match this.buffer.index.peek() {
210 Some(next_offset) => {
211 let client = Arc::clone(this.client);
212 let max_wait_ms = *this.max_wait_ms as i32;
213 let offset = next_offset as i64;
214 let NextBatchHint { bytes, len } = this
215 .buffer
216 .index
217 .next_batch_hint(*this.avg_record_size)
218 .unwrap_or(NextBatchHint {
219 bytes: *this.avg_record_size,
220 len: 1,
221 });
222
223 let fetch_range =
224 1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
225 *this.fetch_fut = FutureExt::fuse(Box::pin(async move {
226 let FetchResult {
227 records: records_and_offsets,
228 high_watermark: watermark,
229 encoded_response_size,
230 ..
231 } = client
232 .fetch_records(offset, fetch_range, max_wait_ms)
233 .await?;
234
235 Ok(FetchResultInner {
236 records_and_offsets,
237 watermark,
238 used_offset: offset,
239 fetch_bytes: encoded_response_size,
240 batch_size: len,
241 })
242 }));
243 }
244 None => {
245 return Poll::Ready(None);
246 }
247 }
248 }
249
250 let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
251
252 match data {
253 Ok(FetchResultInner {
254 mut records_and_offsets,
255 watermark,
256 used_offset,
257 fetch_bytes,
258 batch_size,
259 }) => {
260 records_and_offsets.sort_unstable_by_key(|x| x.offset);
262 *this.last_high_watermark = watermark;
263 if !records_and_offsets.is_empty() {
264 *this.avg_record_size = fetch_bytes / records_and_offsets.len();
265 debug!("set avg_record_size: {}", *this.avg_record_size);
266 }
267 *this.total_fetched_bytes += fetch_bytes as u64;
268
269 debug!(
270 "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
271 records_and_offsets
272 .iter()
273 .map(|record| record.offset)
274 .collect::<Vec<_>>(),
275 records_and_offsets.len()
276 );
277 this.buffer.extend(records_and_offsets);
278 continue;
279 }
280 Err(e) => {
281 *this.terminated = true;
282
283 return Poll::Ready(Some(Err(e)));
284 }
285 }
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use std::collections::VecDeque;
293 use std::ops::Range;
294 use std::sync::Arc;
295
296 use chrono::{TimeZone, Utc};
297 use futures::TryStreamExt;
298 use futures::future::Fuse;
299 use rskafka::record::{Record, RecordAndOffset};
300
301 use super::*;
302 use crate::kafka::consumer::{Consumer, RecordsBuffer};
303 use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
304
305 #[derive(Debug)]
306 struct MockFetchClient {
307 record: Record,
308 }
309
310 #[async_trait::async_trait]
311 impl FetchClient for MockFetchClient {
312 async fn fetch_records(
313 &self,
314 offset: i64,
315 bytes: Range<i32>,
316 _max_wait_ms: i32,
317 ) -> rskafka::client::error::Result<FetchResult> {
318 let record_size = self.record.approximate_size();
319 let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
320
321 let records = (0..num)
322 .map(|idx| RecordAndOffset {
323 record: self.record.clone(),
324 offset: offset + idx as i64,
325 })
326 .collect::<Vec<_>>();
327
328 let max_offset = offset + records.len() as i64;
329 let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
330 Ok(FetchResult {
331 records,
332 high_watermark: max_offset,
333 encoded_response_size,
334 })
335 }
336
337 fn topic(&self) -> &str {
338 "test"
339 }
340 }
341
342 fn test_record() -> Record {
343 Record {
344 key: Some(vec![0; 4]),
345 value: Some(vec![0; 6]),
346 headers: Default::default(),
347 timestamp: Utc.timestamp_millis_opt(1337).unwrap(),
348 }
349 }
350
351 #[tokio::test]
352 async fn test_consumer_with_index() {
353 common_telemetry::init_default_ut_logging();
354 let record = test_record();
355 let record_size = record.approximate_size();
356 let mock_client = MockFetchClient {
357 record: record.clone(),
358 };
359 let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3);
360 let consumer = Consumer {
361 last_high_watermark: -1,
362 client: Arc::new(mock_client),
363 max_batch_size: usize::MAX,
364 max_wait_ms: 500,
365 avg_record_size: record_size,
366 terminated: false,
367 buffer: RecordsBuffer {
368 buffer: VecDeque::new(),
369 index: Box::new(index),
370 },
371 fetch_fut: Fuse::terminated(),
372 total_fetched_bytes: 0,
373 };
374
375 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
376 assert_eq!(
377 records
378 .into_iter()
379 .map(|(x, _)| x.offset)
380 .collect::<Vec<_>>(),
381 vec![1, 3, 4, 8, 10, 12]
382 )
383 }
384
385 #[tokio::test]
386 async fn test_consumer_without_index() {
387 common_telemetry::init_default_ut_logging();
388 let record = test_record();
389 let mock_client = MockFetchClient {
390 record: record.clone(),
391 };
392 let index = RegionWalRange::new(0..30, 1024);
393 let consumer = Consumer {
394 last_high_watermark: -1,
395 client: Arc::new(mock_client),
396 max_batch_size: usize::MAX,
397 max_wait_ms: 500,
398 avg_record_size: record.approximate_size(),
399 terminated: false,
400 buffer: RecordsBuffer {
401 buffer: VecDeque::new(),
402 index: Box::new(index),
403 },
404 fetch_fut: Fuse::terminated(),
405 total_fetched_bytes: 0,
406 };
407
408 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
409 assert_eq!(
410 records
411 .into_iter()
412 .map(|(x, _)| x.offset)
413 .collect::<Vec<_>>(),
414 (0..30).collect::<Vec<_>>()
415 )
416 }
417
418 #[tokio::test]
419 async fn test_consumer_with_multiple_index() {
420 common_telemetry::init_default_ut_logging();
421 let record = test_record();
422 let mock_client = MockFetchClient {
423 record: record.clone(),
424 };
425
426 let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
427 let iter1 = Box::new(RegionWalVecIndex::new(
428 [0, 1, 2, 7, 8, 11],
429 record.approximate_size() * 4,
430 )) as _;
431 let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _;
432 let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _;
433 let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]);
434
435 let consumer = Consumer {
436 last_high_watermark: -1,
437 client: Arc::new(mock_client),
438 max_batch_size: usize::MAX,
439 max_wait_ms: 500,
440 avg_record_size: record.approximate_size(),
441 terminated: false,
442 buffer: RecordsBuffer {
443 buffer: VecDeque::new(),
444 index: Box::new(iter),
445 },
446 fetch_fut: Fuse::terminated(),
447 total_fetched_bytes: 0,
448 };
449
450 let records = consumer.try_collect::<Vec<_>>().await.unwrap();
451 assert_eq!(
452 records
453 .into_iter()
454 .map(|(x, _)| x.offset)
455 .collect::<Vec<_>>(),
456 [0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027]
457 )
458 }
459}