log_store/kafka/worker/
flush.rs1use common_telemetry::warn;
16use snafu::ResultExt;
17
18use crate::error;
19use crate::kafka::log_store::TopicStat;
20use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
21
22impl BackgroundProducerWorker {
23 async fn do_flush(
24 &mut self,
25 PendingRequest {
26 batch,
27 region_ids,
28 sender,
29 size: _size,
30 }: PendingRequest,
31 ) {
32 let record_num = batch.len() as u64;
33 let result = self
34 .client
35 .produce(batch, self.compression)
36 .await
37 .context(error::BatchProduceSnafu);
38
39 if let Ok(result) = &result {
40 let total_record_size = result.encoded_request_size as u64;
41 for (idx, region_id) in result.offsets.iter().zip(region_ids) {
42 self.index_collector.append(region_id, *idx as u64);
43 }
44
45 let max_offset = result.offsets.iter().max().cloned().unwrap_or_default() as u64;
46 self.topic_stats
47 .entry(self.provider.clone())
48 .and_modify(|stat| {
49 stat.latest_offset = stat.latest_offset.max(max_offset);
50 stat.record_size += total_record_size;
51 stat.record_num += record_num;
52 })
53 .or_insert(TopicStat {
54 latest_offset: max_offset,
55 record_size: total_record_size,
56 record_num,
57 });
58 }
59
60 if let Err(err) = sender.send(result.map(|r| r.offsets)) {
61 warn!(err; "BatchFlushState Receiver is dropped");
62 }
63 }
64
65 pub(crate) async fn try_flush_pending_requests(
66 &mut self,
67 pending_requests: Vec<PendingRequest>,
68 ) {
69 for req in pending_requests {
71 self.do_flush(req).await
72 }
73 }
74}