log_store/kafka/
periodic_offset_fetcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_telemetry::{debug, error, info};
18use tokio::time::{interval, MissedTickBehavior};
19
20use crate::error::Result;
21use crate::kafka::client_manager::ClientManagerRef;
22
23/// PeriodicOffsetFetcher is responsible for periodically updating the offset
24/// for each Kafka topic.
25pub(crate) struct PeriodicOffsetFetcher {
26    /// Interval to fetch the latest offset.
27    interval: Duration,
28    /// Client manager to send requests.
29    client_manager: ClientManagerRef,
30}
31
32impl PeriodicOffsetFetcher {
33    pub(crate) fn new(interval: Duration, client_manager: ClientManagerRef) -> Self {
34        Self {
35            interval,
36            client_manager,
37        }
38    }
39
40    /// Starts the offset fetcher as a background task
41    ///
42    /// This spawns a task that periodically queries Kafka for the latest
43    /// offset values for all registered topics and updates the shared map.
44    pub(crate) async fn run(self) {
45        common_runtime::spawn_global(async move {
46            info!("PeriodicOffsetFetcher started");
47            let mut interval = interval(self.interval);
48            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
49            loop {
50                interval.tick().await;
51                if let Err(e) = self.try_update().await {
52                    error!(e; "Failed to update latest offset");
53                }
54            }
55        });
56    }
57
58    /// Tries to refresh the latest offset for every registered topic.
59    ///
60    /// For each topic in the stats map, retrieves its producer and requests
61    /// an update of the latest offset from Kafka.
62    pub(crate) async fn try_update(&self) -> Result<()> {
63        let topics = self.client_manager.list_topics().await;
64        for topic in topics.iter() {
65            debug!("Fetching latest offset for topic: {}", topic.topic);
66            let producer = self
67                .client_manager
68                .get_or_insert(topic)
69                .await?
70                .producer()
71                .clone();
72            producer.fetch_latest_offset().await?;
73        }
74        Ok(())
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use std::sync::Arc;
81
82    use common_wal::maybe_skip_kafka_integration_test;
83    use common_wal::test_util::get_kafka_endpoints;
84    use store_api::logstore::provider::KafkaProvider;
85    use store_api::storage::RegionId;
86
87    use super::*;
88    use crate::kafka::test_util::{prepare, record};
89
90    #[tokio::test]
91    async fn test_try_update_latest_offset() {
92        common_telemetry::init_default_ut_logging();
93        maybe_skip_kafka_integration_test!();
94        let broker_endpoints = get_kafka_endpoints();
95
96        let (manager, topics) = prepare("test_try_update_latest_offset", 1, broker_endpoints).await;
97        let manager = Arc::new(manager);
98        let fetcher = PeriodicOffsetFetcher::new(Duration::from_millis(100), manager.clone());
99        let topic_stats = manager.topic_stats().clone();
100        fetcher.run().await;
101
102        let topic = topics[0].clone();
103        let provider = Arc::new(KafkaProvider::new(topic.to_string()));
104        let producer = manager
105            .get_or_insert(&provider)
106            .await
107            .unwrap()
108            .producer()
109            .clone();
110
111        tokio::time::sleep(Duration::from_millis(150)).await;
112        let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
113        assert_eq!(current_latest_offset, 0);
114
115        let record = vec![record(), record()];
116        let region = RegionId::new(1, 1);
117        producer.produce(region, record.clone()).await.unwrap();
118        tokio::time::sleep(Duration::from_millis(150)).await;
119        let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
120        assert_eq!(current_latest_offset, record.len() as u64 - 1);
121    }
122}