log_store/kafka/worker/
fetch_latest_offset.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error};
16use rskafka::client::partition::OffsetAt;
17use snafu::ResultExt;
18
19use crate::error;
20use crate::kafka::log_store::TopicStat;
21use crate::kafka::worker::BackgroundProducerWorker;
22
23impl BackgroundProducerWorker {
24    /// Fetches the latest offset for the topic.
25    ///
26    /// This function retrieves the topic's latest offset from Kafka and updates the latest offset
27    /// in the shared map.
28    pub async fn fetch_latest_offset(&mut self) {
29        match self
30            .client
31            .get_offset(OffsetAt::Latest)
32            .await
33            .context(error::GetOffsetSnafu {
34                topic: &self.provider.topic,
35            }) {
36            Ok(highwatermark) => {
37                // The highwatermark is the offset of the last record plus one.
38                let offset = (highwatermark as u64).saturating_sub(1);
39
40                match self.topic_stats.entry(self.provider.clone()) {
41                    dashmap::Entry::Occupied(mut occupied_entry) => {
42                        let stat = occupied_entry.get_mut();
43                        if stat.latest_offset < offset {
44                            stat.latest_offset = offset;
45                            debug!(
46                                "Updated latest offset for topic {} to {}",
47                                self.provider.topic, offset
48                            );
49                        }
50                    }
51                    dashmap::Entry::Vacant(vacant_entry) => {
52                        vacant_entry.insert(TopicStat {
53                            latest_offset: offset,
54                            record_size: 0,
55                            record_num: 0,
56                        });
57                        debug!(
58                            "Inserted latest offset for topic {} to {}",
59                            self.provider.topic, offset
60                        );
61                    }
62                }
63            }
64            Err(err) => {
65                error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
66            }
67        }
68    }
69}