log_store/kafka/worker/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use snafu::ResultExt;
17
18use crate::error;
19use crate::kafka::log_store::TopicStat;
20use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
21
22impl BackgroundProducerWorker {
23    async fn do_flush(
24        &mut self,
25        PendingRequest {
26            batch,
27            region_ids,
28            sender,
29            size: _size,
30        }: PendingRequest,
31    ) {
32        let record_num = batch.len() as u64;
33        let result = self
34            .client
35            .produce(batch, self.compression)
36            .await
37            .context(error::BatchProduceSnafu);
38
39        if let Ok(result) = &result {
40            let total_record_size = result.encoded_request_size as u64;
41            for (idx, region_id) in result.offsets.iter().zip(region_ids) {
42                self.index_collector.append(region_id, *idx as u64);
43            }
44
45            let max_offset = result.offsets.iter().max().cloned().unwrap_or_default() as u64;
46            self.topic_stats
47                .entry(self.provider.clone())
48                .and_modify(|stat| {
49                    stat.latest_offset = stat.latest_offset.max(max_offset);
50                    stat.record_size += total_record_size;
51                    stat.record_num += record_num;
52                })
53                .or_insert(TopicStat {
54                    latest_offset: max_offset,
55                    record_size: total_record_size,
56                    record_num,
57                });
58        }
59
60        if let Err(err) = sender.send(result.map(|r| r.offsets)) {
61            warn!(err; "BatchFlushState Receiver is dropped");
62        }
63    }
64
65    pub(crate) async fn try_flush_pending_requests(
66        &mut self,
67        pending_requests: Vec<PendingRequest>,
68    ) {
69        // TODO(weny): Considering merge `PendingRequest`s.
70        for req in pending_requests {
71            self.do_flush(req).await
72        }
73    }
74}