log_store/kafka/worker/
update_high_watermark.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error};
16use rskafka::client::partition::OffsetAt;
17use snafu::ResultExt;
18
19use crate::error;
20use crate::kafka::worker::BackgroundProducerWorker;
21
22impl BackgroundProducerWorker {
23    /// Updates the high watermark for the topic.
24    ///
25    /// This function retrieves the latest offset from Kafka and updates the high watermark
26    /// in the shared map.
27    pub async fn update_high_watermark(&mut self) {
28        match self
29            .client
30            .get_offset(OffsetAt::Latest)
31            .await
32            .context(error::GetOffsetSnafu {
33                topic: &self.provider.topic,
34            }) {
35            Ok(offset) => match self.high_watermark.entry(self.provider.clone()) {
36                dashmap::Entry::Occupied(mut occupied_entry) => {
37                    let offset = offset as u64;
38                    if *occupied_entry.get() != offset {
39                        occupied_entry.insert(offset);
40                        debug!(
41                            "Updated high watermark for topic {} to {}",
42                            self.provider.topic, offset
43                        );
44                    }
45                }
46                dashmap::Entry::Vacant(vacant_entry) => {
47                    vacant_entry.insert(offset as u64);
48                    debug!(
49                        "Inserted high watermark for topic {} to {}",
50                        self.provider.topic, offset
51                    );
52                }
53            },
54            Err(err) => {
55                error!(err; "Failed to get offset for topic {}", self.provider.topic);
56            }
57        }
58    }
59}