log_store/kafka/worker/flush.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::warn;
16use snafu::ResultExt;
17
18use crate::error;
19use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
20
21impl BackgroundProducerWorker {
22 async fn do_flush(
23 &mut self,
24 PendingRequest {
25 batch,
26 region_ids,
27 sender,
28 size: _size,
29 }: PendingRequest,
30 ) {
31 let result = self
32 .client
33 .produce(batch, self.compression)
34 .await
35 .context(error::BatchProduceSnafu);
36
37 if let Ok(result) = &result {
38 for (idx, region_id) in result.iter().zip(region_ids) {
39 self.index_collector.append(region_id, *idx as u64);
40 }
41 }
42
43 if let Err(err) = sender.send(result) {
44 warn!(err; "BatchFlushState Receiver is dropped");
45 }
46 }
47
48 pub(crate) async fn try_flush_pending_requests(
49 &mut self,
50 pending_requests: Vec<PendingRequest>,
51 ) {
52 // TODO(weny): Considering merge `PendingRequest`s.
53 for req in pending_requests {
54 self.do_flush(req).await
55 }
56 }
57}