log_store/kafka/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod dump_index;
16pub(crate) mod flush;
17pub(crate) mod produce;
18pub(crate) mod update_high_watermark;
19
20use std::sync::Arc;
21
22use common_telemetry::debug;
23use dashmap::DashMap;
24use futures::future::try_join_all;
25use rskafka::client::partition::Compression;
26use rskafka::record::Record;
27use snafu::{OptionExt, ResultExt};
28use store_api::logstore::provider::KafkaProvider;
29use store_api::logstore::EntryId;
30use store_api::storage::RegionId;
31use tokio::sync::mpsc::Receiver;
32use tokio::sync::oneshot::{self};
33
34use crate::error::{self, NoMaxValueSnafu, Result};
35use crate::kafka::index::{IndexCollector, IndexEncoder};
36use crate::kafka::producer::ProducerClient;
37
38pub(crate) enum WorkerRequest {
39    Produce(ProduceRequest),
40    TruncateIndex(TruncateIndexRequest),
41    DumpIndex(DumpIndexRequest),
42    UpdateHighWatermark,
43}
44
45impl WorkerRequest {
46    pub(crate) fn new_produce_request(
47        region_id: RegionId,
48        batch: Vec<Record>,
49    ) -> (WorkerRequest, ProduceResultHandle) {
50        let (tx, rx) = oneshot::channel();
51
52        (
53            WorkerRequest::Produce(ProduceRequest {
54                region_id,
55                batch,
56                sender: tx,
57            }),
58            ProduceResultHandle { receiver: rx },
59        )
60    }
61}
62
63pub(crate) struct DumpIndexRequest {
64    encoder: Arc<dyn IndexEncoder>,
65    sender: oneshot::Sender<()>,
66}
67
68impl DumpIndexRequest {
69    pub fn new(encoder: Arc<dyn IndexEncoder>) -> (DumpIndexRequest, oneshot::Receiver<()>) {
70        let (tx, rx) = oneshot::channel();
71        (
72            DumpIndexRequest {
73                encoder,
74                sender: tx,
75            },
76            rx,
77        )
78    }
79}
80
81pub(crate) struct TruncateIndexRequest {
82    region_id: RegionId,
83    entry_id: EntryId,
84}
85
86impl TruncateIndexRequest {
87    pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
88        Self {
89            region_id,
90            entry_id,
91        }
92    }
93}
94
95pub(crate) struct ProduceRequest {
96    region_id: RegionId,
97    batch: Vec<Record>,
98    sender: oneshot::Sender<ProduceResultReceiver>,
99}
100
101/// Receives the committed offsets when data has been committed to Kafka
102/// or an unrecoverable error has been encountered.
103pub(crate) struct ProduceResultHandle {
104    receiver: oneshot::Receiver<ProduceResultReceiver>,
105}
106
107impl ProduceResultHandle {
108    /// Waits for the data has been committed to Kafka.
109    /// Returns the **max** committed offsets.
110    pub(crate) async fn wait(self) -> Result<u64> {
111        self.receiver
112            .await
113            .context(error::WaitProduceResultReceiverSnafu)?
114            .wait()
115            .await
116    }
117}
118
119#[derive(Default)]
120pub(crate) struct ProduceResultReceiver {
121    receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
122}
123
124impl ProduceResultReceiver {
125    fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
126        self.receivers.push(receiver)
127    }
128
129    async fn wait(self) -> Result<u64> {
130        Ok(try_join_all(self.receivers)
131            .await
132            .into_iter()
133            .flatten()
134            .collect::<Result<Vec<_>>>()?
135            .into_iter()
136            .flatten()
137            .max()
138            .context(NoMaxValueSnafu)? as u64)
139    }
140}
141
142pub(crate) struct PendingRequest {
143    batch: Vec<Record>,
144    region_ids: Vec<RegionId>,
145    size: usize,
146    sender: oneshot::Sender<Result<Vec<i64>>>,
147}
148
149pub(crate) struct BackgroundProducerWorker {
150    pub(crate) provider: Arc<KafkaProvider>,
151    /// The [`ProducerClient`].
152    pub(crate) client: Arc<dyn ProducerClient>,
153    // The compression configuration.
154    pub(crate) compression: Compression,
155    /// Receiver of [ProduceRequest].
156    pub(crate) receiver: Receiver<WorkerRequest>,
157    /// Max batch size for a worker to handle requests.
158    pub(crate) request_batch_size: usize,
159    /// Max bytes size for a single flush.
160    pub(crate) max_batch_bytes: usize,
161    /// Collecting ids of WAL entries.
162    pub(crate) index_collector: Box<dyn IndexCollector>,
163    /// High watermark for all topics.
164    pub(crate) high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
165}
166
167impl BackgroundProducerWorker {
168    pub(crate) async fn run(&mut self) {
169        let mut buffer = Vec::with_capacity(self.request_batch_size);
170        loop {
171            match self.receiver.recv().await {
172                Some(req) => {
173                    buffer.clear();
174                    buffer.push(req);
175                    for _ in 1..self.request_batch_size {
176                        match self.receiver.try_recv() {
177                            Ok(req) => buffer.push(req),
178                            Err(_) => break,
179                        }
180                    }
181                    self.handle_requests(&mut buffer).await;
182                }
183                None => {
184                    debug!("The sender is dropped, BackgroundProducerWorker exited");
185                    // Exits the loop if the `sender` is dropped.
186                    break;
187                }
188            }
189        }
190    }
191
192    async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
193        let mut produce_requests = Vec::with_capacity(buffer.len());
194        for req in buffer.drain(..) {
195            match req {
196                WorkerRequest::Produce(req) => produce_requests.push(req),
197                WorkerRequest::TruncateIndex(TruncateIndexRequest {
198                    region_id,
199                    entry_id,
200                }) => self.index_collector.truncate(region_id, entry_id),
201                WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
202                WorkerRequest::UpdateHighWatermark => {
203                    self.update_high_watermark().await;
204                }
205            }
206        }
207
208        let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
209        self.try_flush_pending_requests(pending_requests).await;
210    }
211}