log_store/kafka/
high_watermark_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use common_telemetry::error;
19use dashmap::DashMap;
20use store_api::logstore::provider::KafkaProvider;
21use tokio::time::{interval, MissedTickBehavior};
22
23use crate::error::Result;
24use crate::kafka::client_manager::ClientManagerRef;
25
26/// HighWatermarkManager is responsible for periodically updating the high watermark
27/// (latest existing record offset) for each Kafka topic.
28pub(crate) struct HighWatermarkManager {
29    /// Interval to update high watermark.
30    update_interval: Duration,
31    /// The high watermark for each topic.
32    high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
33    /// Client manager to send requests.
34    client_manager: ClientManagerRef,
35}
36
37impl HighWatermarkManager {
38    pub(crate) fn new(
39        update_interval: Duration,
40        high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
41        client_manager: ClientManagerRef,
42    ) -> Self {
43        Self {
44            update_interval,
45            high_watermark,
46            client_manager,
47        }
48    }
49
50    /// Starts the high watermark manager as a background task
51    ///
52    /// This spawns a task that periodically queries Kafka for the latest
53    /// high watermark values for all registered topics and updates the shared map.
54    pub(crate) async fn run(self) {
55        common_runtime::spawn_global(async move {
56            let mut interval = interval(self.update_interval);
57            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
58            loop {
59                interval.tick().await;
60                if let Err(e) = self.try_update().await {
61                    error!(e; "Failed to update high watermark");
62                }
63            }
64        });
65    }
66
67    /// Attempts to update the high watermark for all registered topics
68    ///
69    /// Iterates through all topics in the high watermark map, obtains a producer
70    /// for each topic, and requests an update of the high watermark value.
71    pub(crate) async fn try_update(&self) -> Result<()> {
72        for iterator_element in self.high_watermark.iter() {
73            let producer = self
74                .client_manager
75                .get_or_insert(iterator_element.key())
76                .await?
77                .producer()
78                .clone();
79            producer.update_high_watermark().await?;
80        }
81        Ok(())
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use common_wal::test_util::run_test_with_kafka_wal;
88    use store_api::storage::RegionId;
89
90    use super::*;
91    use crate::kafka::test_util::{prepare, record};
92
93    #[tokio::test]
94    async fn test_try_update_high_watermark() {
95        run_test_with_kafka_wal(|broker_endpoints| {
96            Box::pin(async {
97                let (manager, topics) =
98                    prepare("test_try_update_high_watermark", 1, broker_endpoints).await;
99                let manager = Arc::new(manager);
100                let high_watermark_manager = HighWatermarkManager::new(
101                    Duration::from_millis(100),
102                    manager.high_watermark().clone(),
103                    manager.clone(),
104                );
105                let high_watermark = high_watermark_manager.high_watermark.clone();
106                high_watermark_manager.run().await;
107
108                let topic = topics[0].clone();
109                let provider = Arc::new(KafkaProvider::new(topic.to_string()));
110                let producer = manager
111                    .get_or_insert(&provider)
112                    .await
113                    .unwrap()
114                    .producer()
115                    .clone();
116
117                tokio::time::sleep(Duration::from_millis(150)).await;
118                let current_high_watermark = *high_watermark.get(&provider).unwrap();
119                assert_eq!(current_high_watermark, 0);
120
121                let record = vec![record()];
122                let region = RegionId::new(1, 1);
123                producer.produce(region, record.clone()).await.unwrap();
124                tokio::time::sleep(Duration::from_millis(150)).await;
125                let current_high_watermark = *high_watermark.get(&provider).unwrap();
126                assert_eq!(current_high_watermark, record.len() as u64);
127            })
128        })
129        .await
130    }
131}