log_store/kafka/worker/fetch_latest_offset.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error};
16use rskafka::client::partition::OffsetAt;
17use snafu::ResultExt;
18
19use crate::error;
20use crate::kafka::log_store::TopicStat;
21use crate::kafka::worker::BackgroundProducerWorker;
22
23impl BackgroundProducerWorker {
24 /// Fetches the latest offset for the topic.
25 ///
26 /// This function retrieves the topic's latest offset from Kafka and updates the latest offset
27 /// in the shared map.
28 pub async fn fetch_latest_offset(&mut self) {
29 match self
30 .client
31 .get_offset(OffsetAt::Latest)
32 .await
33 .context(error::GetOffsetSnafu {
34 topic: &self.provider.topic,
35 }) {
36 Ok(highwatermark) => {
37 // The highwatermark is the offset of the last record plus one.
38 let offset = (highwatermark as u64).saturating_sub(1);
39
40 match self.topic_stats.entry(self.provider.clone()) {
41 dashmap::Entry::Occupied(mut occupied_entry) => {
42 let stat = occupied_entry.get_mut();
43 if stat.latest_offset < offset {
44 stat.latest_offset = offset;
45 debug!(
46 "Updated latest offset for topic {} to {}",
47 self.provider.topic, offset
48 );
49 }
50 }
51 dashmap::Entry::Vacant(vacant_entry) => {
52 vacant_entry.insert(TopicStat {
53 latest_offset: offset,
54 record_size: 0,
55 record_num: 0,
56 });
57 debug!(
58 "Inserted latest offset for topic {} to {}",
59 self.provider.topic, offset
60 );
61 }
62 }
63 }
64 Err(err) => {
65 error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
66 }
67 }
68 }
69}