log_store/kafka/
periodic_offset_fetcher.rs1use std::time::Duration;
16
17use common_telemetry::{debug, error, info};
18use tokio::time::{interval, MissedTickBehavior};
19
20use crate::error::Result;
21use crate::kafka::client_manager::ClientManagerRef;
22
23pub(crate) struct PeriodicOffsetFetcher {
26 interval: Duration,
28 client_manager: ClientManagerRef,
30}
31
32impl PeriodicOffsetFetcher {
33 pub(crate) fn new(interval: Duration, client_manager: ClientManagerRef) -> Self {
34 Self {
35 interval,
36 client_manager,
37 }
38 }
39
40 pub(crate) async fn run(self) {
45 common_runtime::spawn_global(async move {
46 info!("PeriodicOffsetFetcher started");
47 let mut interval = interval(self.interval);
48 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
49 loop {
50 interval.tick().await;
51 if let Err(e) = self.try_update().await {
52 error!(e; "Failed to update latest offset");
53 }
54 }
55 });
56 }
57
58 pub(crate) async fn try_update(&self) -> Result<()> {
63 let topics = self.client_manager.list_topics().await;
64 for topic in topics.iter() {
65 debug!("Fetching latest offset for topic: {}", topic.topic);
66 let producer = self
67 .client_manager
68 .get_or_insert(topic)
69 .await?
70 .producer()
71 .clone();
72 producer.fetch_latest_offset().await?;
73 }
74 Ok(())
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use std::sync::Arc;
81
82 use common_wal::maybe_skip_kafka_integration_test;
83 use common_wal::test_util::get_kafka_endpoints;
84 use store_api::logstore::provider::KafkaProvider;
85 use store_api::storage::RegionId;
86
87 use super::*;
88 use crate::kafka::test_util::{prepare, record};
89
90 #[tokio::test]
91 async fn test_try_update_latest_offset() {
92 common_telemetry::init_default_ut_logging();
93 maybe_skip_kafka_integration_test!();
94 let broker_endpoints = get_kafka_endpoints();
95
96 let (manager, topics) = prepare("test_try_update_latest_offset", 1, broker_endpoints).await;
97 let manager = Arc::new(manager);
98 let fetcher = PeriodicOffsetFetcher::new(Duration::from_millis(100), manager.clone());
99 let topic_stats = manager.topic_stats().clone();
100 fetcher.run().await;
101
102 let topic = topics[0].clone();
103 let provider = Arc::new(KafkaProvider::new(topic.to_string()));
104 let producer = manager
105 .get_or_insert(&provider)
106 .await
107 .unwrap()
108 .producer()
109 .clone();
110
111 tokio::time::sleep(Duration::from_millis(150)).await;
112 let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
113 assert_eq!(current_latest_offset, 0);
114
115 let record = vec![record(), record()];
116 let region = RegionId::new(1, 1);
117 producer.produce(region, record.clone()).await.unwrap();
118 tokio::time::sleep(Duration::from_millis(150)).await;
119 let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
120 assert_eq!(current_latest_offset, record.len() as u64 - 1);
121 }
122}