log_store/kafka/
consumer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::Range;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use derive_builder::Builder;
23use futures::future::{BoxFuture, Fuse, FusedFuture};
24use futures::{FutureExt, Stream};
25use pin_project::pin_project;
26use rskafka::client::partition::PartitionClient;
27use rskafka::record::RecordAndOffset;
28
29use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
30
31pub struct FetchResult {
32    /// The offsets of the fetched records.
33    pub records: Vec<RecordAndOffset>,
34
35    /// The high watermark of the partition.
36    pub high_watermark: i64,
37
38    /// The size of the response encoded in bytes.
39    pub encoded_response_size: usize,
40}
41
42#[async_trait::async_trait]
43pub trait FetchClient: std::fmt::Debug + Send + Sync {
44    /// Fetch records.
45    ///
46    /// Arguments are identical to [`PartitionClient::fetch_records`].
47    async fn fetch_records(
48        &self,
49        offset: i64,
50        bytes: Range<i32>,
51        max_wait_ms: i32,
52    ) -> rskafka::client::error::Result<FetchResult>;
53
54    fn topic(&self) -> &str;
55}
56
57#[async_trait::async_trait]
58impl FetchClient for PartitionClient {
59    async fn fetch_records(
60        &self,
61        offset: i64,
62        bytes: Range<i32>,
63        max_wait_ms: i32,
64    ) -> rskafka::client::error::Result<FetchResult> {
65        self.fetch_records(offset, bytes, max_wait_ms)
66            .await
67            .map(|r| FetchResult {
68                records: r.records,
69                high_watermark: r.high_watermark,
70                encoded_response_size: r.encoded_response_size,
71            })
72    }
73
74    fn topic(&self) -> &str {
75        self.topic()
76    }
77}
78
79struct FetchResultInner {
80    records_and_offsets: Vec<RecordAndOffset>,
81    batch_size: usize,
82    fetch_bytes: usize,
83    watermark: i64,
84    used_offset: i64,
85}
86
87const MAX_BATCH_SIZE: usize = 52428800;
88const AVG_RECORD_SIZE: usize = 256 * 1024;
89
90/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
91/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
92#[pin_project]
93#[derive(Builder)]
94#[builder(pattern = "owned")]
95pub struct Consumer {
96    #[builder(default = "-1")]
97    last_high_watermark: i64,
98
99    /// The client is used to fetch records from kafka topic.
100    client: Arc<dyn FetchClient>,
101
102    /// The max batch size in a single fetch request.
103    #[builder(default = "MAX_BATCH_SIZE")]
104    max_batch_size: usize,
105
106    /// The max wait milliseconds.
107    #[builder(default = "500")]
108    max_wait_ms: u32,
109
110    /// The avg record size
111    #[builder(default = "AVG_RECORD_SIZE")]
112    avg_record_size: usize,
113
114    /// Termination flag
115    #[builder(default = "false")]
116    terminated: bool,
117
118    /// The buffer of records.
119    buffer: RecordsBuffer,
120
121    /// The fetch future.
122    #[builder(default = "Fuse::terminated()")]
123    fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResultInner>>>,
124
125    /// Total fetched bytes.
126    #[builder(default = "0")]
127    total_fetched_bytes: u64,
128}
129
130impl Consumer {
131    /// Returns the total fetched bytes.
132    pub fn total_fetched_bytes(&self) -> u64 {
133        self.total_fetched_bytes
134    }
135
136    /// Returns the topic name.
137    pub fn topic(&self) -> &str {
138        self.client.topic()
139    }
140}
141
142pub(crate) struct RecordsBuffer {
143    buffer: VecDeque<RecordAndOffset>,
144
145    index: Box<dyn RegionWalIndexIterator>,
146}
147
148impl RecordsBuffer {
149    /// Creates an empty [`RecordsBuffer`]
150    pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
151        RecordsBuffer {
152            buffer: VecDeque::new(),
153            index,
154        }
155    }
156}
157
158impl RecordsBuffer {
159    fn pop_front(&mut self) -> Option<RecordAndOffset> {
160        while let Some(index) = self.index.peek() {
161            if let Some(record_and_offset) = self.buffer.pop_front() {
162                if index == record_and_offset.offset as u64 {
163                    self.index.next();
164                    return Some(record_and_offset);
165                }
166            } else {
167                return None;
168            }
169        }
170
171        self.buffer.clear();
172        None
173    }
174
175    fn extend(&mut self, records: Vec<RecordAndOffset>) {
176        if let (Some(first), Some(index)) = (records.first(), self.index.peek()) {
177            // TODO(weny): throw an error?
178            assert!(
179                index <= first.offset as u64,
180                "index: {index}, first offset: {}",
181                first.offset
182            );
183        }
184        self.buffer.extend(records);
185    }
186}
187
188impl Stream for Consumer {
189    type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>;
190
191    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192        let this = self.project();
193
194        loop {
195            if *this.terminated {
196                return Poll::Ready(None);
197            }
198
199            if this.buffer.index.peek().is_none() {
200                return Poll::Ready(None);
201            }
202
203            if let Some(x) = this.buffer.pop_front() {
204                debug!("Yielding record with offset: {}", x.offset);
205                return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
206            }
207
208            if this.fetch_fut.is_terminated() {
209                match this.buffer.index.peek() {
210                    Some(next_offset) => {
211                        let client = Arc::clone(this.client);
212                        let max_wait_ms = *this.max_wait_ms as i32;
213                        let offset = next_offset as i64;
214                        let NextBatchHint { bytes, len } = this
215                            .buffer
216                            .index
217                            .next_batch_hint(*this.avg_record_size)
218                            .unwrap_or(NextBatchHint {
219                                bytes: *this.avg_record_size,
220                                len: 1,
221                            });
222
223                        let fetch_range =
224                            1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
225                        *this.fetch_fut = FutureExt::fuse(Box::pin(async move {
226                            let FetchResult {
227                                records: records_and_offsets,
228                                high_watermark: watermark,
229                                encoded_response_size,
230                                ..
231                            } = client
232                                .fetch_records(offset, fetch_range, max_wait_ms)
233                                .await?;
234
235                            Ok(FetchResultInner {
236                                records_and_offsets,
237                                watermark,
238                                used_offset: offset,
239                                fetch_bytes: encoded_response_size,
240                                batch_size: len,
241                            })
242                        }));
243                    }
244                    None => {
245                        return Poll::Ready(None);
246                    }
247                }
248            }
249
250            let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
251
252            match data {
253                Ok(FetchResultInner {
254                    mut records_and_offsets,
255                    watermark,
256                    used_offset,
257                    fetch_bytes,
258                    batch_size,
259                }) => {
260                    // Sort records by offset in case they aren't in order
261                    records_and_offsets.sort_unstable_by_key(|x| x.offset);
262                    *this.last_high_watermark = watermark;
263                    if !records_and_offsets.is_empty() {
264                        *this.avg_record_size = fetch_bytes / records_and_offsets.len();
265                        debug!("set avg_record_size: {}", *this.avg_record_size);
266                    }
267                    *this.total_fetched_bytes += fetch_bytes as u64;
268
269                    debug!(
270                        "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
271                        records_and_offsets
272                            .iter()
273                            .map(|record| record.offset)
274                            .collect::<Vec<_>>(),
275                        records_and_offsets.len()
276                    );
277                    this.buffer.extend(records_and_offsets);
278                    continue;
279                }
280                Err(e) => {
281                    *this.terminated = true;
282
283                    return Poll::Ready(Some(Err(e)));
284                }
285            }
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use std::collections::VecDeque;
293    use std::ops::Range;
294    use std::sync::Arc;
295
296    use chrono::{TimeZone, Utc};
297    use futures::TryStreamExt;
298    use futures::future::Fuse;
299    use rskafka::record::{Record, RecordAndOffset};
300
301    use super::*;
302    use crate::kafka::consumer::{Consumer, RecordsBuffer};
303    use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
304
305    #[derive(Debug)]
306    struct MockFetchClient {
307        record: Record,
308    }
309
310    #[async_trait::async_trait]
311    impl FetchClient for MockFetchClient {
312        async fn fetch_records(
313            &self,
314            offset: i64,
315            bytes: Range<i32>,
316            _max_wait_ms: i32,
317        ) -> rskafka::client::error::Result<FetchResult> {
318            let record_size = self.record.approximate_size();
319            let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
320
321            let records = (0..num)
322                .map(|idx| RecordAndOffset {
323                    record: self.record.clone(),
324                    offset: offset + idx as i64,
325                })
326                .collect::<Vec<_>>();
327
328            let max_offset = offset + records.len() as i64;
329            let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
330            Ok(FetchResult {
331                records,
332                high_watermark: max_offset,
333                encoded_response_size,
334            })
335        }
336
337        fn topic(&self) -> &str {
338            "test"
339        }
340    }
341
342    fn test_record() -> Record {
343        Record {
344            key: Some(vec![0; 4]),
345            value: Some(vec![0; 6]),
346            headers: Default::default(),
347            timestamp: Utc.timestamp_millis_opt(1337).unwrap(),
348        }
349    }
350
351    #[tokio::test]
352    async fn test_consumer_with_index() {
353        common_telemetry::init_default_ut_logging();
354        let record = test_record();
355        let record_size = record.approximate_size();
356        let mock_client = MockFetchClient {
357            record: record.clone(),
358        };
359        let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3);
360        let consumer = Consumer {
361            last_high_watermark: -1,
362            client: Arc::new(mock_client),
363            max_batch_size: usize::MAX,
364            max_wait_ms: 500,
365            avg_record_size: record_size,
366            terminated: false,
367            buffer: RecordsBuffer {
368                buffer: VecDeque::new(),
369                index: Box::new(index),
370            },
371            fetch_fut: Fuse::terminated(),
372            total_fetched_bytes: 0,
373        };
374
375        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
376        assert_eq!(
377            records
378                .into_iter()
379                .map(|(x, _)| x.offset)
380                .collect::<Vec<_>>(),
381            vec![1, 3, 4, 8, 10, 12]
382        )
383    }
384
385    #[tokio::test]
386    async fn test_consumer_without_index() {
387        common_telemetry::init_default_ut_logging();
388        let record = test_record();
389        let mock_client = MockFetchClient {
390            record: record.clone(),
391        };
392        let index = RegionWalRange::new(0..30, 1024);
393        let consumer = Consumer {
394            last_high_watermark: -1,
395            client: Arc::new(mock_client),
396            max_batch_size: usize::MAX,
397            max_wait_ms: 500,
398            avg_record_size: record.approximate_size(),
399            terminated: false,
400            buffer: RecordsBuffer {
401                buffer: VecDeque::new(),
402                index: Box::new(index),
403            },
404            fetch_fut: Fuse::terminated(),
405            total_fetched_bytes: 0,
406        };
407
408        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
409        assert_eq!(
410            records
411                .into_iter()
412                .map(|(x, _)| x.offset)
413                .collect::<Vec<_>>(),
414            (0..30).collect::<Vec<_>>()
415        )
416    }
417
418    #[tokio::test]
419    async fn test_consumer_with_multiple_index() {
420        common_telemetry::init_default_ut_logging();
421        let record = test_record();
422        let mock_client = MockFetchClient {
423            record: record.clone(),
424        };
425
426        let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
427        let iter1 = Box::new(RegionWalVecIndex::new(
428            [0, 1, 2, 7, 8, 11],
429            record.approximate_size() * 4,
430        )) as _;
431        let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _;
432        let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _;
433        let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]);
434
435        let consumer = Consumer {
436            last_high_watermark: -1,
437            client: Arc::new(mock_client),
438            max_batch_size: usize::MAX,
439            max_wait_ms: 500,
440            avg_record_size: record.approximate_size(),
441            terminated: false,
442            buffer: RecordsBuffer {
443                buffer: VecDeque::new(),
444                index: Box::new(iter),
445            },
446            fetch_fut: Fuse::terminated(),
447            total_fetched_bytes: 0,
448        };
449
450        let records = consumer.try_collect::<Vec<_>>().await.unwrap();
451        assert_eq!(
452            records
453                .into_iter()
454                .map(|(x, _)| x.offset)
455                .collect::<Vec<_>>(),
456            [0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027]
457        )
458    }
459}