log_store/kafka/worker/produce.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::tracing::warn;
16use tokio::sync::oneshot;
17
18use crate::kafka::worker::{
19 BackgroundProducerWorker, PendingRequest, ProduceRequest, ProduceResultReceiver,
20};
21
22impl BackgroundProducerWorker {
23 /// Aggregates records into batches, ensuring that the size of each batch does not exceed a specified maximum (`max_batch_bytes`).
24 ///
25 /// ## Panic
26 /// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
27 pub(crate) fn aggregate_records(
28 &self,
29 requests: &mut Vec<ProduceRequest>,
30 max_batch_bytes: usize,
31 ) -> Vec<PendingRequest> {
32 let mut records_buffer = vec![];
33 let mut region_ids = vec![];
34 let mut batch_size = 0;
35 let mut pending_requests = Vec::with_capacity(requests.len());
36
37 for ProduceRequest {
38 batch,
39 sender,
40 region_id,
41 } in std::mem::take(requests)
42 {
43 let mut receiver = ProduceResultReceiver::default();
44 for record in batch {
45 assert!(record.approximate_size() <= max_batch_bytes);
46 // Yields the `PendingRequest` if buffer is full.
47 if batch_size + record.approximate_size() > max_batch_bytes {
48 let (tx, rx) = oneshot::channel();
49 pending_requests.push(PendingRequest {
50 batch: std::mem::take(&mut records_buffer),
51 region_ids: std::mem::take(&mut region_ids),
52 size: batch_size,
53 sender: tx,
54 });
55 batch_size = 0;
56 receiver.add_receiver(rx);
57 }
58
59 batch_size += record.approximate_size();
60 records_buffer.push(record);
61 region_ids.push(region_id);
62 }
63 // The remaining records.
64 if batch_size > 0 {
65 // Yields `PendingRequest`
66 let (tx, rx) = oneshot::channel();
67 pending_requests.push(PendingRequest {
68 batch: std::mem::take(&mut records_buffer),
69 region_ids: std::mem::take(&mut region_ids),
70 size: batch_size,
71 sender: tx,
72 });
73 batch_size = 0;
74 receiver.add_receiver(rx);
75 }
76
77 if sender.send(receiver).is_err() {
78 warn!("The Receiver of ProduceResultReceiver is dropped");
79 }
80 }
81 pending_requests
82 }
83}