log_store/kafka/
worker.rs1pub(crate) mod dump_index;
16pub(crate) mod fetch_latest_offset;
17pub(crate) mod flush;
18pub(crate) mod produce;
19
20use std::sync::Arc;
21
22use common_telemetry::debug;
23use dashmap::DashMap;
24use futures::future::try_join_all;
25use rskafka::client::partition::Compression;
26use rskafka::record::Record;
27use snafu::{OptionExt, ResultExt};
28use store_api::logstore::provider::KafkaProvider;
29use store_api::logstore::EntryId;
30use store_api::storage::RegionId;
31use tokio::sync::mpsc::Receiver;
32use tokio::sync::oneshot::{self};
33
34use crate::error::{self, NoMaxValueSnafu, Result};
35use crate::kafka::index::{IndexCollector, IndexEncoder};
36use crate::kafka::log_store::TopicStat;
37use crate::kafka::producer::ProducerClient;
38
39pub(crate) enum WorkerRequest {
40 Produce(ProduceRequest),
41 TruncateIndex(TruncateIndexRequest),
42 DumpIndex(DumpIndexRequest),
43 FetchLatestOffset,
44}
45
46impl WorkerRequest {
47 pub(crate) fn new_produce_request(
48 region_id: RegionId,
49 batch: Vec<Record>,
50 ) -> (WorkerRequest, ProduceResultHandle) {
51 let (tx, rx) = oneshot::channel();
52
53 (
54 WorkerRequest::Produce(ProduceRequest {
55 region_id,
56 batch,
57 sender: tx,
58 }),
59 ProduceResultHandle { receiver: rx },
60 )
61 }
62}
63
64pub(crate) struct DumpIndexRequest {
65 encoder: Arc<dyn IndexEncoder>,
66 sender: oneshot::Sender<()>,
67}
68
69impl DumpIndexRequest {
70 pub fn new(encoder: Arc<dyn IndexEncoder>) -> (DumpIndexRequest, oneshot::Receiver<()>) {
71 let (tx, rx) = oneshot::channel();
72 (
73 DumpIndexRequest {
74 encoder,
75 sender: tx,
76 },
77 rx,
78 )
79 }
80}
81
82pub(crate) struct TruncateIndexRequest {
83 region_id: RegionId,
84 entry_id: EntryId,
85}
86
87impl TruncateIndexRequest {
88 pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
89 Self {
90 region_id,
91 entry_id,
92 }
93 }
94}
95
96pub(crate) struct ProduceRequest {
97 region_id: RegionId,
98 batch: Vec<Record>,
99 sender: oneshot::Sender<ProduceResultReceiver>,
100}
101
102pub(crate) struct ProduceResultHandle {
105 receiver: oneshot::Receiver<ProduceResultReceiver>,
106}
107
108impl ProduceResultHandle {
109 pub(crate) async fn wait(self) -> Result<u64> {
112 self.receiver
113 .await
114 .context(error::WaitProduceResultReceiverSnafu)?
115 .wait()
116 .await
117 }
118}
119
120#[derive(Default)]
121pub(crate) struct ProduceResultReceiver {
122 receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
123}
124
125impl ProduceResultReceiver {
126 fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
127 self.receivers.push(receiver)
128 }
129
130 async fn wait(self) -> Result<u64> {
131 Ok(try_join_all(self.receivers)
132 .await
133 .into_iter()
134 .flatten()
135 .collect::<Result<Vec<_>>>()?
136 .into_iter()
137 .flatten()
138 .max()
139 .context(NoMaxValueSnafu)? as u64)
140 }
141}
142
143pub(crate) struct PendingRequest {
144 batch: Vec<Record>,
145 region_ids: Vec<RegionId>,
146 size: usize,
147 sender: oneshot::Sender<Result<Vec<i64>>>,
148}
149
150pub(crate) struct BackgroundProducerWorker {
151 pub(crate) provider: Arc<KafkaProvider>,
152 pub(crate) client: Arc<dyn ProducerClient>,
154 pub(crate) compression: Compression,
156 pub(crate) receiver: Receiver<WorkerRequest>,
158 pub(crate) request_batch_size: usize,
160 pub(crate) max_batch_bytes: usize,
162 pub(crate) index_collector: Box<dyn IndexCollector>,
164 pub(crate) topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
166}
167
168impl BackgroundProducerWorker {
169 pub(crate) async fn run(&mut self) {
170 let mut buffer = Vec::with_capacity(self.request_batch_size);
171 loop {
172 match self.receiver.recv().await {
173 Some(req) => {
174 buffer.clear();
175 buffer.push(req);
176 for _ in 1..self.request_batch_size {
177 match self.receiver.try_recv() {
178 Ok(req) => buffer.push(req),
179 Err(_) => break,
180 }
181 }
182 self.handle_requests(&mut buffer).await;
183 }
184 None => {
185 debug!("The sender is dropped, BackgroundProducerWorker exited");
186 break;
188 }
189 }
190 }
191 }
192
193 async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
194 let mut produce_requests = Vec::with_capacity(buffer.len());
195 for req in buffer.drain(..) {
196 match req {
197 WorkerRequest::Produce(req) => produce_requests.push(req),
198 WorkerRequest::TruncateIndex(TruncateIndexRequest {
199 region_id,
200 entry_id,
201 }) => self.index_collector.truncate(region_id, entry_id),
202 WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
203 WorkerRequest::FetchLatestOffset => {
204 self.fetch_latest_offset().await;
205 }
206 }
207 }
208
209 let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
210 self.try_flush_pending_requests(pending_requests).await;
211 }
212}