log_store/kafka/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod dump_index;
16pub(crate) mod fetch_latest_offset;
17pub(crate) mod flush;
18pub(crate) mod produce;
19
20use std::sync::Arc;
21
22use common_telemetry::debug;
23use dashmap::DashMap;
24use futures::future::try_join_all;
25use rskafka::client::partition::Compression;
26use rskafka::record::Record;
27use snafu::{OptionExt, ResultExt};
28use store_api::logstore::provider::KafkaProvider;
29use store_api::logstore::EntryId;
30use store_api::storage::RegionId;
31use tokio::sync::mpsc::Receiver;
32use tokio::sync::oneshot::{self};
33
34use crate::error::{self, NoMaxValueSnafu, Result};
35use crate::kafka::index::{IndexCollector, IndexEncoder};
36use crate::kafka::log_store::TopicStat;
37use crate::kafka::producer::ProducerClient;
38
39pub(crate) enum WorkerRequest {
40    Produce(ProduceRequest),
41    TruncateIndex(TruncateIndexRequest),
42    DumpIndex(DumpIndexRequest),
43    FetchLatestOffset,
44}
45
46impl WorkerRequest {
47    pub(crate) fn new_produce_request(
48        region_id: RegionId,
49        batch: Vec<Record>,
50    ) -> (WorkerRequest, ProduceResultHandle) {
51        let (tx, rx) = oneshot::channel();
52
53        (
54            WorkerRequest::Produce(ProduceRequest {
55                region_id,
56                batch,
57                sender: tx,
58            }),
59            ProduceResultHandle { receiver: rx },
60        )
61    }
62}
63
64pub(crate) struct DumpIndexRequest {
65    encoder: Arc<dyn IndexEncoder>,
66    sender: oneshot::Sender<()>,
67}
68
69impl DumpIndexRequest {
70    pub fn new(encoder: Arc<dyn IndexEncoder>) -> (DumpIndexRequest, oneshot::Receiver<()>) {
71        let (tx, rx) = oneshot::channel();
72        (
73            DumpIndexRequest {
74                encoder,
75                sender: tx,
76            },
77            rx,
78        )
79    }
80}
81
82pub(crate) struct TruncateIndexRequest {
83    region_id: RegionId,
84    entry_id: EntryId,
85}
86
87impl TruncateIndexRequest {
88    pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
89        Self {
90            region_id,
91            entry_id,
92        }
93    }
94}
95
96pub(crate) struct ProduceRequest {
97    region_id: RegionId,
98    batch: Vec<Record>,
99    sender: oneshot::Sender<ProduceResultReceiver>,
100}
101
102/// Receives the committed offsets when data has been committed to Kafka
103/// or an unrecoverable error has been encountered.
104pub(crate) struct ProduceResultHandle {
105    receiver: oneshot::Receiver<ProduceResultReceiver>,
106}
107
108impl ProduceResultHandle {
109    /// Waits for the data has been committed to Kafka.
110    /// Returns the **max** committed offsets.
111    pub(crate) async fn wait(self) -> Result<u64> {
112        self.receiver
113            .await
114            .context(error::WaitProduceResultReceiverSnafu)?
115            .wait()
116            .await
117    }
118}
119
120#[derive(Default)]
121pub(crate) struct ProduceResultReceiver {
122    receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
123}
124
125impl ProduceResultReceiver {
126    fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
127        self.receivers.push(receiver)
128    }
129
130    async fn wait(self) -> Result<u64> {
131        Ok(try_join_all(self.receivers)
132            .await
133            .into_iter()
134            .flatten()
135            .collect::<Result<Vec<_>>>()?
136            .into_iter()
137            .flatten()
138            .max()
139            .context(NoMaxValueSnafu)? as u64)
140    }
141}
142
143pub(crate) struct PendingRequest {
144    batch: Vec<Record>,
145    region_ids: Vec<RegionId>,
146    size: usize,
147    sender: oneshot::Sender<Result<Vec<i64>>>,
148}
149
150pub(crate) struct BackgroundProducerWorker {
151    pub(crate) provider: Arc<KafkaProvider>,
152    /// The [`ProducerClient`].
153    pub(crate) client: Arc<dyn ProducerClient>,
154    // The compression configuration.
155    pub(crate) compression: Compression,
156    /// Receiver of [ProduceRequest].
157    pub(crate) receiver: Receiver<WorkerRequest>,
158    /// Max batch size for a worker to handle requests.
159    pub(crate) request_batch_size: usize,
160    /// Max bytes size for a single flush.
161    pub(crate) max_batch_bytes: usize,
162    /// Collecting ids of WAL entries.
163    pub(crate) index_collector: Box<dyn IndexCollector>,
164    /// The stats of each topic.
165    pub(crate) topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
166}
167
168impl BackgroundProducerWorker {
169    pub(crate) async fn run(&mut self) {
170        let mut buffer = Vec::with_capacity(self.request_batch_size);
171        loop {
172            match self.receiver.recv().await {
173                Some(req) => {
174                    buffer.clear();
175                    buffer.push(req);
176                    for _ in 1..self.request_batch_size {
177                        match self.receiver.try_recv() {
178                            Ok(req) => buffer.push(req),
179                            Err(_) => break,
180                        }
181                    }
182                    self.handle_requests(&mut buffer).await;
183                }
184                None => {
185                    debug!("The sender is dropped, BackgroundProducerWorker exited");
186                    // Exits the loop if the `sender` is dropped.
187                    break;
188                }
189            }
190        }
191    }
192
193    async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
194        let mut produce_requests = Vec::with_capacity(buffer.len());
195        for req in buffer.drain(..) {
196            match req {
197                WorkerRequest::Produce(req) => produce_requests.push(req),
198                WorkerRequest::TruncateIndex(TruncateIndexRequest {
199                    region_id,
200                    entry_id,
201                }) => self.index_collector.truncate(region_id, entry_id),
202                WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
203                WorkerRequest::FetchLatestOffset => {
204                    self.fetch_latest_offset().await;
205                }
206            }
207        }
208
209        let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
210        self.try_flush_pending_requests(pending_requests).await;
211    }
212}