log_store/kafka/worker/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use snafu::ResultExt;
17
18use crate::error;
19use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
20
21impl BackgroundProducerWorker {
22    async fn do_flush(
23        &mut self,
24        PendingRequest {
25            batch,
26            region_ids,
27            sender,
28            size: _size,
29        }: PendingRequest,
30    ) {
31        let result = self
32            .client
33            .produce(batch, self.compression)
34            .await
35            .context(error::BatchProduceSnafu);
36
37        if let Ok(result) = &result {
38            for (idx, region_id) in result.iter().zip(region_ids) {
39                self.index_collector.append(region_id, *idx as u64);
40            }
41        }
42
43        if let Err(err) = sender.send(result) {
44            warn!(err; "BatchFlushState Receiver is dropped");
45        }
46    }
47
48    pub(crate) async fn try_flush_pending_requests(
49        &mut self,
50        pending_requests: Vec<PendingRequest>,
51    ) {
52        // TODO(weny): Considering merge `PendingRequest`s.
53        for req in pending_requests {
54            self.do_flush(req).await
55        }
56    }
57}