log_store/kafka/
consumer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use derive_builder::Builder;
23use futures::future::{BoxFuture, Fuse, FusedFuture};
24use futures::{FutureExt, Stream};
25use pin_project::pin_project;
26use rskafka::client::partition::PartitionClient;
27use rskafka::record::RecordAndOffset;
28
29use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
30
31pub struct FetchResult {
32    /// The offsets of the fetched records.
33    pub records: Vec<RecordAndOffset>,
34
35    /// The high watermark of the partition.
36    pub high_watermark: i64,
37
38    /// The size of the response encoded in bytes.
39    pub encoded_response_size: usize,
40}
41
42#[async_trait::async_trait]
43pub trait FetchClient: std::fmt::Debug + Send + Sync {
44    /// Fetch records.
45    ///
46    /// Arguments are identical to [`PartitionClient::fetch_records`].
47    async fn fetch_records(
48        &self,
49        offset: i64,
50        bytes: Range<i32>,
51        max_wait_ms: i32,
52    ) -> rskafka::client::error::Result<FetchResult>;
53
54    fn topic(&self) -> &str;
55}
56
57#[async_trait::async_trait]
58impl FetchClient for PartitionClient {
59    async fn fetch_records(
60        &self,
61        offset: i64,
62        bytes: Range<i32>,
63        max_wait_ms: i32,
64    ) -> rskafka::client::error::Result<FetchResult> {
65        self.fetch_records(offset, bytes, max_wait_ms)
66            .await
67            .map(|r| FetchResult {
68                records: r.records,
69                high_watermark: r.high_watermark,
70                encoded_response_size: r.encoded_response_size,
71            })
72    }
73
74    fn topic(&self) -> &str {
75        self.topic()
76    }
77}
78
79struct FetchResultInner {
80    records_and_offsets: Vec<RecordAndOffset>,
81    batch_size: usize,
82    fetch_bytes: usize,
83    watermark: i64,
84    used_offset: i64,
85}
86
87const MAX_BATCH_SIZE: usize = 52428800;
88const AVG_RECORD_SIZE: usize = 256 * 1024;
89
90/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
91/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
92#[pin_project]
93#[derive(Builder)]
94#[builder(pattern = "owned")]
95pub struct Consumer {
96    #[builder(default = "-1")]
97    last_high_watermark: i64,
98
99    /// The client is used to fetch records from kafka topic.
100    client: Arc<dyn FetchClient>,
101
102    /// The max batch size in a single fetch request.
103    #[builder(default = "MAX_BATCH_SIZE")]
104    max_batch_size: usize,
105
106    /// The max wait milliseconds.
107    #[builder(default = "500")]
108    max_wait_ms: u32,
109
110    /// The avg record size
111    #[builder(default = "AVG_RECORD_SIZE")]
112    avg_record_size: usize,
113
114    /// Termination flag
115    #[builder(default = "false")]
116    terminated: bool,
117
118    /// The buffer of records.
119    buffer: RecordsBuffer,
120
121    /// The fetch future.
122    #[builder(default = "Fuse::terminated()")]
123    fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResultInner>>>,
124
125    /// Total fetched bytes.
126    #[builder(default = "0")]
127    total_fetched_bytes: u64,
128}
129
130impl Consumer {
131    /// Returns the total fetched bytes.
132    pub fn total_fetched_bytes(&self) -> u64 {
133        self.total_fetched_bytes
134    }
135
136    /// Returns the topic name.
137    pub fn topic(&self) -> &str {
138        self.client.topic()
139    }
140}
141
142pub(crate) struct RecordsBuffer {
143    buffer: VecDeque<RecordAndOffset>,
144
145    index: Box<dyn RegionWalIndexIterator>,
146}
147
148impl RecordsBuffer {
149    /// Creates an empty [`RecordsBuffer`]
150    pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
151        RecordsBuffer {
152            buffer: VecDeque::new(),
153            index,
154        }
155    }
156}
157
158impl RecordsBuffer {
159    fn pop_front(&mut self) -> Option<RecordAndOffset> {
160        while let Some(index) = self.index.peek() {
161            if let Some(record_and_offset) = self.buffer.pop_front() {
162                if index == record_and_offset.offset as u64 {
163                    self.index.next();
164                    return Some(record_and_offset);
165                }
166            } else {
167                return None;
168            }
169        }
170
171        self.buffer.clear();
172        None
173    }
174
175    fn extend(&mut self, records: Vec<RecordAndOffset>) {
176        if let (Some(first), Some(index)) = (records.first(), self.index.peek()) {
177            // TODO(weny): throw an error?
178            assert!(
179                index <= first.offset as u64,
180                "index: {index}, first offset: {}",
181                first.offset
182            );
183        }
184        self.buffer.extend(records);
185    }
186}
187
188impl Stream for Consumer {
189    type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>;
190
191    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192        let this = self.project();
193
194        loop {
195            if *this.terminated {
196                return Poll::Ready(None);
197            }
198
199            if this.buffer.index.peek().is_none() {
200                return Poll::Ready(None);
201            }
202
203            if let Some(x) = this.buffer.pop_front() {
204                debug!("Yielding record with offset: {}", x.offset);
205                return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
206            }
207
208            if this.fetch_fut.is_terminated() {
209                match this.buffer.index.peek() {
210                    Some(next_offset) => {
211                        let client = Arc::clone(this.client);
212                        let max_wait_ms = *this.max_wait_ms as i32;
213                        let offset = next_offset as i64;
214                        let NextBatchHint { bytes, len } = this
215                            .buffer
216                            .index
217                            .next_batch_hint(*this.avg_record_size)
218                            .unwrap_or(NextBatchHint {
219                                bytes: *this.avg_record_size,
220                                len: 1,
221                            });
222
223                        let fetch_range =
224                            1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
225                        *this.fetch_fut = FutureExt::fuse(Box::pin(async move {
226                            let FetchResult {
227                                records: records_and_offsets,
228                                high_watermark: watermark,
229                                encoded_response_size,
230                                ..
231                            } = client
232                                .fetch_records(offset, fetch_range, max_wait_ms)
233                                .await?;
234
235                            Ok(FetchResultInner {
236                                records_and_offsets,
237                                watermark,
238                                used_offset: offset,
239                                fetch_bytes: encoded_response_size,
240                                batch_size: len,
241                            })
242                        }));
243                    }
244                    None => {
245                        return Poll::Ready(None);
246                    }
247                }
248            }
249
250            let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
251
252            match data {
253                Ok(FetchResultInner {
254                    mut records_and_offsets,
255                    watermark,
256                    used_offset,
257                    fetch_bytes,
258                    batch_size,
259                }) => {
260                    // Sort records by offset in case they aren't in order
261                    records_and_offsets.sort_unstable_by_key(|x| x.offset);
262                    *this.last_high_watermark = watermark;
263                    if !records_and_offsets.is_empty() {
264                        *this.avg_record_size = fetch_bytes / records_and_offsets.len();
265                        debug!("set avg_record_size: {}", *this.avg_record_size);
266                    }
267                    *this.total_fetched_bytes += fetch_bytes as u64;
268
269                    debug!(
270                        "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
271                        records_and_offsets
272                            .iter()
273                            .map(|record| record.offset)
274                            .collect::<Vec<_>>(),
275                        records_and_offsets
276                            .len()
277                    );
278                    this.buffer.extend(records_and_offsets);
279                    continue;
280                }
281                Err(e) => {
282                    *this.terminated = true;
283
284                    return Poll::Ready(Some(Err(e)));
285                }
286            }
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use std::collections::VecDeque;
294    use std::ops::Range;
295    use std::sync::Arc;
296
297    use chrono::{TimeZone, Utc};
298    use futures::future::Fuse;
299    use futures::TryStreamExt;
300    use rskafka::record::{Record, RecordAndOffset};
301
302    use super::*;
303    use crate::kafka::consumer::{Consumer, RecordsBuffer};
304    use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
305
306    #[derive(Debug)]
307    struct MockFetchClient {
308        record: Record,
309    }
310
311    #[async_trait::async_trait]
312    impl FetchClient for MockFetchClient {
313        async fn fetch_records(
314            &self,
315            offset: i64,
316            bytes: Range<i32>,
317            _max_wait_ms: i32,
318        ) -> rskafka::client::error::Result<FetchResult> {
319            let record_size = self.record.approximate_size();
320            let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
321
322            let records = (0..num)
323                .map(|idx| RecordAndOffset {
324                    record: self.record.clone(),
325                    offset: offset + idx as i64,
326                })
327                .collect::<Vec<_>>();
328
329            let max_offset = offset + records.len() as i64;
330            let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
331            Ok(FetchResult {
332                records,
333                high_watermark: max_offset,
334                encoded_response_size,
335            })
336        }
337
338        fn topic(&self) -> &str {
339            "test"
340        }
341    }
342
343    fn test_record() -> Record {
344        Record {
345            key: Some(vec![0; 4]),
346            value: Some(vec![0; 6]),
347            headers: Default::default(),
348            timestamp: Utc.timestamp_millis_opt(1337).unwrap(),
349        }
350    }
351
352    #[tokio::test]
353    async fn test_consumer_with_index() {
354        common_telemetry::init_default_ut_logging();
355        let record = test_record();
356        let record_size = record.approximate_size();
357        let mock_client = MockFetchClient {
358            record: record.clone(),
359        };
360        let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3);
361        let consumer = Consumer {
362            last_high_watermark: -1,
363            client: Arc::new(mock_client),
364            max_batch_size: usize::MAX,
365            max_wait_ms: 500,
366            avg_record_size: record_size,
367            terminated: false,
368            buffer: RecordsBuffer {
369                buffer: VecDeque::new(),
370                index: Box::new(index),
371            },
372            fetch_fut: Fuse::terminated(),
373            total_fetched_bytes: 0,
374        };
375
376        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
377        assert_eq!(
378            records
379                .into_iter()
380                .map(|(x, _)| x.offset)
381                .collect::<Vec<_>>(),
382            vec![1, 3, 4, 8, 10, 12]
383        )
384    }
385
386    #[tokio::test]
387    async fn test_consumer_without_index() {
388        common_telemetry::init_default_ut_logging();
389        let record = test_record();
390        let mock_client = MockFetchClient {
391            record: record.clone(),
392        };
393        let index = RegionWalRange::new(0..30, 1024);
394        let consumer = Consumer {
395            last_high_watermark: -1,
396            client: Arc::new(mock_client),
397            max_batch_size: usize::MAX,
398            max_wait_ms: 500,
399            avg_record_size: record.approximate_size(),
400            terminated: false,
401            buffer: RecordsBuffer {
402                buffer: VecDeque::new(),
403                index: Box::new(index),
404            },
405            fetch_fut: Fuse::terminated(),
406            total_fetched_bytes: 0,
407        };
408
409        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
410        assert_eq!(
411            records
412                .into_iter()
413                .map(|(x, _)| x.offset)
414                .collect::<Vec<_>>(),
415            (0..30).collect::<Vec<_>>()
416        )
417    }
418
419    #[tokio::test]
420    async fn test_consumer_with_multiple_index() {
421        common_telemetry::init_default_ut_logging();
422        let record = test_record();
423        let mock_client = MockFetchClient {
424            record: record.clone(),
425        };
426
427        let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
428        let iter1 = Box::new(RegionWalVecIndex::new(
429            [0, 1, 2, 7, 8, 11],
430            record.approximate_size() * 4,
431        )) as _;
432        let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _;
433        let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _;
434        let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]);
435
436        let consumer = Consumer {
437            last_high_watermark: -1,
438            client: Arc::new(mock_client),
439            max_batch_size: usize::MAX,
440            max_wait_ms: 500,
441            avg_record_size: record.approximate_size(),
442            terminated: false,
443            buffer: RecordsBuffer {
444                buffer: VecDeque::new(),
445                index: Box::new(iter),
446            },
447            fetch_fut: Fuse::terminated(),
448            total_fetched_bytes: 0,
449        };
450
451        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
452        assert_eq!(
453            records
454                .into_iter()
455                .map(|(x, _)| x.offset)
456                .collect::<Vec<_>>(),
457            [0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027]
458        )
459    }
460}