log_store/kafka/
worker.rs1pub(crate) mod dump_index;
16pub(crate) mod flush;
17pub(crate) mod produce;
18pub(crate) mod update_high_watermark;
19
20use std::sync::Arc;
21
22use common_telemetry::debug;
23use dashmap::DashMap;
24use futures::future::try_join_all;
25use rskafka::client::partition::Compression;
26use rskafka::record::Record;
27use snafu::{OptionExt, ResultExt};
28use store_api::logstore::provider::KafkaProvider;
29use store_api::logstore::EntryId;
30use store_api::storage::RegionId;
31use tokio::sync::mpsc::Receiver;
32use tokio::sync::oneshot::{self};
33
34use crate::error::{self, NoMaxValueSnafu, Result};
35use crate::kafka::index::{IndexCollector, IndexEncoder};
36use crate::kafka::producer::ProducerClient;
37
38pub(crate) enum WorkerRequest {
39 Produce(ProduceRequest),
40 TruncateIndex(TruncateIndexRequest),
41 DumpIndex(DumpIndexRequest),
42 UpdateHighWatermark,
43}
44
45impl WorkerRequest {
46 pub(crate) fn new_produce_request(
47 region_id: RegionId,
48 batch: Vec<Record>,
49 ) -> (WorkerRequest, ProduceResultHandle) {
50 let (tx, rx) = oneshot::channel();
51
52 (
53 WorkerRequest::Produce(ProduceRequest {
54 region_id,
55 batch,
56 sender: tx,
57 }),
58 ProduceResultHandle { receiver: rx },
59 )
60 }
61}
62
63pub(crate) struct DumpIndexRequest {
64 encoder: Arc<dyn IndexEncoder>,
65 sender: oneshot::Sender<()>,
66}
67
68impl DumpIndexRequest {
69 pub fn new(encoder: Arc<dyn IndexEncoder>) -> (DumpIndexRequest, oneshot::Receiver<()>) {
70 let (tx, rx) = oneshot::channel();
71 (
72 DumpIndexRequest {
73 encoder,
74 sender: tx,
75 },
76 rx,
77 )
78 }
79}
80
81pub(crate) struct TruncateIndexRequest {
82 region_id: RegionId,
83 entry_id: EntryId,
84}
85
86impl TruncateIndexRequest {
87 pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
88 Self {
89 region_id,
90 entry_id,
91 }
92 }
93}
94
95pub(crate) struct ProduceRequest {
96 region_id: RegionId,
97 batch: Vec<Record>,
98 sender: oneshot::Sender<ProduceResultReceiver>,
99}
100
101pub(crate) struct ProduceResultHandle {
104 receiver: oneshot::Receiver<ProduceResultReceiver>,
105}
106
107impl ProduceResultHandle {
108 pub(crate) async fn wait(self) -> Result<u64> {
111 self.receiver
112 .await
113 .context(error::WaitProduceResultReceiverSnafu)?
114 .wait()
115 .await
116 }
117}
118
119#[derive(Default)]
120pub(crate) struct ProduceResultReceiver {
121 receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
122}
123
124impl ProduceResultReceiver {
125 fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
126 self.receivers.push(receiver)
127 }
128
129 async fn wait(self) -> Result<u64> {
130 Ok(try_join_all(self.receivers)
131 .await
132 .into_iter()
133 .flatten()
134 .collect::<Result<Vec<_>>>()?
135 .into_iter()
136 .flatten()
137 .max()
138 .context(NoMaxValueSnafu)? as u64)
139 }
140}
141
142pub(crate) struct PendingRequest {
143 batch: Vec<Record>,
144 region_ids: Vec<RegionId>,
145 size: usize,
146 sender: oneshot::Sender<Result<Vec<i64>>>,
147}
148
149pub(crate) struct BackgroundProducerWorker {
150 pub(crate) provider: Arc<KafkaProvider>,
151 pub(crate) client: Arc<dyn ProducerClient>,
153 pub(crate) compression: Compression,
155 pub(crate) receiver: Receiver<WorkerRequest>,
157 pub(crate) request_batch_size: usize,
159 pub(crate) max_batch_bytes: usize,
161 pub(crate) index_collector: Box<dyn IndexCollector>,
163 pub(crate) high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
165}
166
167impl BackgroundProducerWorker {
168 pub(crate) async fn run(&mut self) {
169 let mut buffer = Vec::with_capacity(self.request_batch_size);
170 loop {
171 match self.receiver.recv().await {
172 Some(req) => {
173 buffer.clear();
174 buffer.push(req);
175 for _ in 1..self.request_batch_size {
176 match self.receiver.try_recv() {
177 Ok(req) => buffer.push(req),
178 Err(_) => break,
179 }
180 }
181 self.handle_requests(&mut buffer).await;
182 }
183 None => {
184 debug!("The sender is dropped, BackgroundProducerWorker exited");
185 break;
187 }
188 }
189 }
190 }
191
192 async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
193 let mut produce_requests = Vec::with_capacity(buffer.len());
194 for req in buffer.drain(..) {
195 match req {
196 WorkerRequest::Produce(req) => produce_requests.push(req),
197 WorkerRequest::TruncateIndex(TruncateIndexRequest {
198 region_id,
199 entry_id,
200 }) => self.index_collector.truncate(region_id, entry_id),
201 WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
202 WorkerRequest::UpdateHighWatermark => {
203 self.update_high_watermark().await;
204 }
205 }
206 }
207
208 let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
209 self.try_flush_pending_requests(pending_requests).await;
210 }
211}