log_store/kafka/
high_watermark_manager.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use common_telemetry::error;
19use dashmap::DashMap;
20use store_api::logstore::provider::KafkaProvider;
21use tokio::time::{interval, MissedTickBehavior};
22
23use crate::error::Result;
24use crate::kafka::client_manager::ClientManagerRef;
25
26pub(crate) struct HighWatermarkManager {
29 update_interval: Duration,
31 high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
33 client_manager: ClientManagerRef,
35}
36
37impl HighWatermarkManager {
38 pub(crate) fn new(
39 update_interval: Duration,
40 high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
41 client_manager: ClientManagerRef,
42 ) -> Self {
43 Self {
44 update_interval,
45 high_watermark,
46 client_manager,
47 }
48 }
49
50 pub(crate) async fn run(self) {
55 common_runtime::spawn_global(async move {
56 let mut interval = interval(self.update_interval);
57 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
58 loop {
59 interval.tick().await;
60 if let Err(e) = self.try_update().await {
61 error!(e; "Failed to update high watermark");
62 }
63 }
64 });
65 }
66
67 pub(crate) async fn try_update(&self) -> Result<()> {
72 for iterator_element in self.high_watermark.iter() {
73 let producer = self
74 .client_manager
75 .get_or_insert(iterator_element.key())
76 .await?
77 .producer()
78 .clone();
79 producer.update_high_watermark().await?;
80 }
81 Ok(())
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use common_wal::test_util::run_test_with_kafka_wal;
88 use store_api::storage::RegionId;
89
90 use super::*;
91 use crate::kafka::test_util::{prepare, record};
92
93 #[tokio::test]
94 async fn test_try_update_high_watermark() {
95 run_test_with_kafka_wal(|broker_endpoints| {
96 Box::pin(async {
97 let (manager, topics) =
98 prepare("test_try_update_high_watermark", 1, broker_endpoints).await;
99 let manager = Arc::new(manager);
100 let high_watermark_manager = HighWatermarkManager::new(
101 Duration::from_millis(100),
102 manager.high_watermark().clone(),
103 manager.clone(),
104 );
105 let high_watermark = high_watermark_manager.high_watermark.clone();
106 high_watermark_manager.run().await;
107
108 let topic = topics[0].clone();
109 let provider = Arc::new(KafkaProvider::new(topic.to_string()));
110 let producer = manager
111 .get_or_insert(&provider)
112 .await
113 .unwrap()
114 .producer()
115 .clone();
116
117 tokio::time::sleep(Duration::from_millis(150)).await;
118 let current_high_watermark = *high_watermark.get(&provider).unwrap();
119 assert_eq!(current_high_watermark, 0);
120
121 let record = vec![record()];
122 let region = RegionId::new(1, 1);
123 producer.produce(region, record.clone()).await.unwrap();
124 tokio::time::sleep(Duration::from_millis(150)).await;
125 let current_high_watermark = *high_watermark.get(&provider).unwrap();
126 assert_eq!(current_high_watermark, record.len() as u64);
127 })
128 })
129 .await
130 }
131}