meta_srv/procedure/wal_prune/
manager.rs1use std::collections::HashSet;
16use std::collections::hash_set::Entry;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::Semaphore;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41 running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45 pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48 let mut running_procedures = self.running_procedures.write().unwrap();
49 match running_procedures.entry(topic_name.clone()) {
50 Entry::Occupied(_) => None,
51 Entry::Vacant(entry) => {
52 entry.insert();
53 Some(WalPruneProcedureGuard {
54 topic_name,
55 running_procedures: self.running_procedures.clone(),
56 })
57 }
58 }
59 }
60
61 pub fn len(&self) -> usize {
63 self.running_procedures.read().unwrap().len()
64 }
65}
66
67pub struct WalPruneProcedureGuard {
71 topic_name: String,
72 running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76 fn drop(&mut self) {
77 let mut running_procedures = self.running_procedures.write().unwrap();
78 running_procedures.remove(&self.topic_name);
79 }
80}
81
82pub enum Event {
86 Tick,
87}
88
89impl Debug for Event {
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Event::Tick => write!(f, "Tick"),
93 }
94 }
95}
96
97define_ticker!(
98 WalPruneTicker,
101 event_type = Event,
102 event_value = Event::Tick
103);
104
105pub(crate) struct WalPruneManager {
112 receiver: Receiver<Event>,
114 procedure_manager: ProcedureManagerRef,
116 tracker: WalPruneProcedureTracker,
118 semaphore: Arc<Semaphore>,
120
121 wal_prune_context: WalPruneContext,
123}
124
125impl WalPruneManager {
126 pub fn new(
128 parallelism: usize,
129 receiver: Receiver<Event>,
130 procedure_manager: ProcedureManagerRef,
131 wal_prune_context: WalPruneContext,
132 ) -> Self {
133 Self {
134 receiver,
135 procedure_manager,
136 wal_prune_context,
137 tracker: WalPruneProcedureTracker {
138 running_procedures: Arc::new(RwLock::new(HashSet::new())),
139 },
140 semaphore: Arc::new(Semaphore::new(parallelism)),
141 }
142 }
143
144 pub async fn try_start(mut self) -> Result<()> {
146 let context = self.wal_prune_context.clone();
147 let tracker = self.tracker.clone();
148 self.procedure_manager
149 .register_loader(
150 WalPruneProcedure::TYPE_NAME,
151 Box::new(move |json| {
152 let tracker = tracker.clone();
153 WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
154 }),
155 )
156 .context(error::RegisterProcedureLoaderSnafu {
157 type_name: WalPruneProcedure::TYPE_NAME,
158 })?;
159 common_runtime::spawn_global(async move {
160 self.run().await;
161 });
162 info!("WalPruneProcedureManager Started.");
163 Ok(())
164 }
165
166 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
168 tokio::sync::mpsc::channel(1024)
169 }
170
171 pub(crate) async fn run(&mut self) {
175 while let Some(event) = self.receiver.recv().await {
176 match event {
177 Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
178 error!(e; "Failed to handle tick request");
179 }),
180 }
181 }
182 }
183
184 pub async fn wait_procedure(
186 &self,
187 topic_name: &str,
188 prunable_entry_id: u64,
189 ) -> Result<ProcedureId> {
190 let guard = self
191 .tracker
192 .insert_running_procedure(topic_name.to_string())
193 .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
194
195 let procedure = WalPruneProcedure::new(
196 self.wal_prune_context.clone(),
197 Some(guard),
198 topic_name.to_string(),
199 prunable_entry_id,
200 );
201 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
202 let procedure_id = procedure_with_id.id;
203 METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
204 .with_label_values(&[topic_name])
205 .inc();
206 let procedure_manager = self.procedure_manager.clone();
207 let mut watcher = procedure_manager
208 .submit(procedure_with_id)
209 .await
210 .context(error::SubmitProcedureSnafu)?;
211 watcher::wait(&mut watcher)
212 .await
213 .context(error::WaitProcedureSnafu)?;
214
215 Ok(procedure_id)
216 }
217
218 async fn try_prune(&self, topic_name: &str) -> Result<()> {
219 let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
220 let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
221 let prunable_entry_id = find_pruneable_entry_id_for_topic(
222 &table_metadata_manager,
223 &leader_region_registry,
224 topic_name,
225 )
226 .await?;
227 let Some(prunable_entry_id) = prunable_entry_id else {
228 debug!(
229 "No prunable entry id found for topic {}, skipping prune",
230 topic_name
231 );
232 return Ok(());
233 };
234 let current = table_metadata_manager
235 .topic_name_manager()
236 .get(topic_name)
237 .await
238 .context(error::TableMetadataManagerSnafu)?
239 .map(|v| v.into_inner().pruned_entry_id);
240 if !should_trigger_prune(current, prunable_entry_id) {
241 debug!(
242 "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
243 topic_name, current, prunable_entry_id
244 );
245 return Ok(());
246 }
247
248 self.wait_procedure(topic_name, prunable_entry_id)
249 .await
250 .map(|_| ())
251 }
252
253 async fn handle_tick_request(&self) -> Result<()> {
254 let topics = self.retrieve_sorted_topics().await?;
255 let mut tasks = Vec::with_capacity(topics.len());
256 for topic_name in topics.iter() {
257 tasks.push(async {
258 let _permit = self.semaphore.acquire().await.unwrap();
259 match self.try_prune(topic_name).await {
260 Ok(_) => {}
261 Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
262 warn!("Prune task for topic {} is already running", topic);
263 }
264 Err(err) => {
265 error!(err; "Failed to prune remote WAL for topic {}", topic_name.as_str());
266 }
267 }
268 });
269 }
270
271 join_all(tasks).await;
272 Ok(())
273 }
274
275 async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
279 self.wal_prune_context
280 .table_metadata_manager
281 .topic_name_manager()
282 .range()
283 .await
284 .context(error::TableMetadataManagerSnafu)
285 }
286}
287
288#[cfg(test)]
289mod test {
290 use std::assert_matches::assert_matches;
291 use std::time::Duration;
292
293 use common_meta::key::topic_name::TopicNameKey;
294 use common_meta::leadership_notifier::LeadershipChangeListener;
295 use common_wal::maybe_skip_kafka_integration_test;
296 use common_wal::test_util::get_kafka_endpoints;
297 use tokio::time::{sleep, timeout};
298
299 use super::*;
300 use crate::procedure::test_util::new_wal_prune_metadata;
301 use crate::procedure::wal_prune::test_util::TestEnv;
302
303 #[tokio::test]
304 async fn test_wal_prune_ticker() {
305 common_telemetry::init_default_ut_logging();
306 let (tx, mut rx) = WalPruneManager::channel();
307 let interval = Duration::from_millis(50);
308 let ticker = WalPruneTicker::new(interval, tx);
309 assert_eq!(ticker.name(), "WalPruneTicker");
310
311 for _ in 0..2 {
312 ticker.start();
313 sleep(4 * interval).await;
315 assert!(!rx.is_empty());
316 while let Ok(event) = rx.try_recv() {
317 assert_matches!(event, Event::Tick);
318 }
319 }
320 ticker.stop();
321 }
322
323 #[tokio::test]
324 async fn test_wal_prune_tracker_and_guard() {
325 let tracker = WalPruneProcedureTracker {
326 running_procedures: Arc::new(RwLock::new(HashSet::new())),
327 };
328 let topic_name = uuid::Uuid::new_v4().to_string();
329 {
330 let guard = tracker
331 .insert_running_procedure(topic_name.clone())
332 .unwrap();
333 assert_eq!(guard.topic_name, topic_name);
334 assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
335
336 let result = tracker.insert_running_procedure(topic_name.clone());
337 assert!(result.is_none());
338 }
339 assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
340 }
341
342 async fn mock_wal_prune_manager(
343 broker_endpoints: Vec<String>,
344 limit: usize,
345 ) -> (Sender<Event>, WalPruneManager) {
346 let test_env = TestEnv::new();
347 let (tx, rx) = WalPruneManager::channel();
348 let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
349 (
350 tx,
351 WalPruneManager::new(
352 limit,
353 rx,
354 test_env.procedure_manager.clone(),
355 wal_prune_context,
356 ),
357 )
358 }
359
360 async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
361 let topic_name_keys = topics
362 .iter()
363 .map(|topic| TopicNameKey::new(topic))
364 .collect::<Vec<_>>();
365 manager
366 .wal_prune_context
367 .table_metadata_manager
368 .topic_name_manager()
369 .batch_put(topic_name_keys)
370 .await
371 .unwrap();
372 }
373
374 #[tokio::test]
375 async fn test_wal_prune_manager() {
376 maybe_skip_kafka_integration_test!();
377 let broker_endpoints = get_kafka_endpoints();
378 let limit = 6;
379 let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
380 let topics = (0..limit * 2)
381 .map(|_| uuid::Uuid::new_v4().to_string())
382 .collect::<Vec<_>>();
383 mock_topics(&manager, &topics).await;
384
385 let tracker = manager.tracker.clone();
386 let handler =
387 common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
388 handler.await.unwrap();
389
390 tx.send(Event::Tick).await.unwrap();
391 timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
393 .await
394 .unwrap();
395 }
396
397 #[tokio::test]
398 async fn test_find_pruneable_entry_id_for_topic_none() {
399 let test_env = TestEnv::new();
400 let prunable_entry_id = find_pruneable_entry_id_for_topic(
401 &test_env.table_metadata_manager,
402 &test_env.leader_region_registry,
403 "test_topic",
404 )
405 .await
406 .unwrap();
407 assert!(prunable_entry_id.is_none());
408 }
409
410 #[tokio::test]
411 async fn test_find_pruneable_entry_id_for_topic_some() {
412 let test_env = TestEnv::new();
413 let topic = "test_topic";
414 let expected_prunable_entry_id = new_wal_prune_metadata(
415 test_env.table_metadata_manager.clone(),
416 test_env.leader_region_registry.clone(),
417 2,
418 5,
419 &[3, 10, 23, 50, 52, 82, 130],
420 topic.to_string(),
421 )
422 .await;
423 let prunable_entry_id = find_pruneable_entry_id_for_topic(
424 &test_env.table_metadata_manager,
425 &test_env.leader_region_registry,
426 topic,
427 )
428 .await
429 .unwrap()
430 .unwrap();
431 assert_eq!(prunable_entry_id, expected_prunable_entry_id);
432 }
433}