meta_srv/procedure/wal_prune/
manager.rs1use std::collections::HashSet;
16use std::collections::hash_set::Entry;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::Semaphore;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41 running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45 pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48 let mut running_procedures = self.running_procedures.write().unwrap();
49 match running_procedures.entry(topic_name.clone()) {
50 Entry::Occupied(_) => None,
51 Entry::Vacant(entry) => {
52 entry.insert();
53 Some(WalPruneProcedureGuard {
54 topic_name,
55 running_procedures: self.running_procedures.clone(),
56 })
57 }
58 }
59 }
60
61 pub fn len(&self) -> usize {
63 self.running_procedures.read().unwrap().len()
64 }
65}
66
67pub struct WalPruneProcedureGuard {
71 topic_name: String,
72 running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76 fn drop(&mut self) {
77 let mut running_procedures = self.running_procedures.write().unwrap();
78 running_procedures.remove(&self.topic_name);
79 }
80}
81
82pub enum Event {
86 Tick,
87}
88
89impl Debug for Event {
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Event::Tick => write!(f, "Tick"),
93 }
94 }
95}
96
97define_ticker!(
98 WalPruneTicker,
101 event_type = Event,
102 event_value = Event::Tick
103);
104
105pub(crate) struct WalPruneManager {
112 receiver: Receiver<Event>,
114 procedure_manager: ProcedureManagerRef,
116 tracker: WalPruneProcedureTracker,
118 semaphore: Arc<Semaphore>,
120 logical_delete: bool,
122
123 wal_prune_context: WalPruneContext,
125}
126
127impl WalPruneManager {
128 pub fn new(
130 parallelism: usize,
131 logical_delete: bool,
132 receiver: Receiver<Event>,
133 procedure_manager: ProcedureManagerRef,
134 wal_prune_context: WalPruneContext,
135 ) -> Self {
136 Self {
137 receiver,
138 procedure_manager,
139 wal_prune_context,
140 tracker: WalPruneProcedureTracker {
141 running_procedures: Arc::new(RwLock::new(HashSet::new())),
142 },
143 semaphore: Arc::new(Semaphore::new(parallelism)),
144 logical_delete,
145 }
146 }
147
148 pub async fn try_start(mut self) -> Result<()> {
150 let context = self.wal_prune_context.clone();
151 let tracker = self.tracker.clone();
152 self.procedure_manager
153 .register_loader(
154 WalPruneProcedure::TYPE_NAME,
155 Box::new(move |json| {
156 let tracker = tracker.clone();
157 WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
158 }),
159 )
160 .context(error::RegisterProcedureLoaderSnafu {
161 type_name: WalPruneProcedure::TYPE_NAME,
162 })?;
163 common_runtime::spawn_global(async move {
164 self.run().await;
165 });
166 info!("WalPruneProcedureManager Started.");
167 Ok(())
168 }
169
170 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
172 tokio::sync::mpsc::channel(1024)
173 }
174
175 pub(crate) async fn run(&mut self) {
179 while let Some(event) = self.receiver.recv().await {
180 match event {
181 Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
182 error!(e; "Failed to handle tick request");
183 }),
184 }
185 }
186 }
187
188 pub async fn wait_procedure(
190 &self,
191 topic_name: &str,
192 prunable_entry_id: u64,
193 ) -> Result<ProcedureId> {
194 let guard = self
195 .tracker
196 .insert_running_procedure(topic_name.to_string())
197 .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
198
199 let procedure = WalPruneProcedure::new(
200 self.wal_prune_context.clone(),
201 Some(guard),
202 topic_name.to_string(),
203 prunable_entry_id,
204 self.logical_delete,
205 );
206 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
207 let procedure_id = procedure_with_id.id;
208 METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
209 .with_label_values(&[topic_name])
210 .inc();
211 let procedure_manager = self.procedure_manager.clone();
212 let mut watcher = procedure_manager
213 .submit(procedure_with_id)
214 .await
215 .context(error::SubmitProcedureSnafu)?;
216 watcher::wait(&mut watcher)
217 .await
218 .context(error::WaitProcedureSnafu)?;
219
220 Ok(procedure_id)
221 }
222
223 async fn try_prune(&self, topic_name: &str) -> Result<()> {
224 let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
225 let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
226 let prunable_entry_id = find_pruneable_entry_id_for_topic(
227 &table_metadata_manager,
228 &leader_region_registry,
229 topic_name,
230 )
231 .await?;
232 let Some(prunable_entry_id) = prunable_entry_id else {
233 debug!(
234 "No prunable entry id found for topic {}, skipping prune",
235 topic_name
236 );
237 return Ok(());
238 };
239 let current = table_metadata_manager
240 .topic_name_manager()
241 .get(topic_name)
242 .await
243 .context(error::TableMetadataManagerSnafu)?
244 .map(|v| v.into_inner().pruned_entry_id);
245 debug!(
246 "Found prunable entry id {} for topic {}, current pruned entry id: {:?}",
247 prunable_entry_id, topic_name, current
248 );
249 if !should_trigger_prune(current, prunable_entry_id) {
250 debug!(
251 "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
252 topic_name, current, prunable_entry_id
253 );
254 return Ok(());
255 }
256
257 self.wait_procedure(topic_name, prunable_entry_id)
258 .await
259 .map(|_| ())
260 }
261
262 async fn handle_tick_request(&self) -> Result<()> {
263 let topics = self.retrieve_sorted_topics().await?;
264 let mut tasks = Vec::with_capacity(topics.len());
265 for topic_name in topics.iter() {
266 tasks.push(async {
267 let _permit = self.semaphore.acquire().await.unwrap();
268 match self.try_prune(topic_name).await {
269 Ok(_) => {}
270 Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
271 warn!("Prune task for topic {} is already running", topic);
272 }
273 Err(err) => {
274 error!(err; "Failed to prune remote WAL for topic {}", topic_name.as_str());
275 }
276 }
277 });
278 }
279
280 join_all(tasks).await;
281 Ok(())
282 }
283
284 async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
288 self.wal_prune_context
289 .table_metadata_manager
290 .topic_name_manager()
291 .range()
292 .await
293 .context(error::TableMetadataManagerSnafu)
294 }
295}
296
297#[cfg(test)]
298mod test {
299 use std::assert_matches;
300 use std::time::Duration;
301
302 use common_meta::key::topic_name::TopicNameKey;
303 use common_meta::leadership_notifier::LeadershipChangeListener;
304 use common_wal::maybe_skip_kafka_integration_test;
305 use common_wal::test_util::get_kafka_endpoints;
306 use tokio::time::{sleep, timeout};
307
308 use super::*;
309 use crate::procedure::test_util::new_wal_prune_metadata;
310 use crate::procedure::wal_prune::test_util::TestEnv;
311
312 #[tokio::test]
313 async fn test_wal_prune_ticker() {
314 common_telemetry::init_default_ut_logging();
315 let (tx, mut rx) = WalPruneManager::channel();
316 let interval = Duration::from_millis(50);
317 let ticker = WalPruneTicker::new(interval, tx);
318 assert_eq!(ticker.name(), "WalPruneTicker");
319
320 for _ in 0..2 {
321 ticker.start();
322 sleep(4 * interval).await;
324 assert!(!rx.is_empty());
325 while let Ok(event) = rx.try_recv() {
326 assert_matches!(event, Event::Tick);
327 }
328 }
329 ticker.stop();
330 }
331
332 #[tokio::test]
333 async fn test_wal_prune_tracker_and_guard() {
334 let tracker = WalPruneProcedureTracker {
335 running_procedures: Arc::new(RwLock::new(HashSet::new())),
336 };
337 let topic_name = uuid::Uuid::new_v4().to_string();
338 {
339 let guard = tracker
340 .insert_running_procedure(topic_name.clone())
341 .unwrap();
342 assert_eq!(guard.topic_name, topic_name);
343 assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
344
345 let result = tracker.insert_running_procedure(topic_name.clone());
346 assert!(result.is_none());
347 }
348 assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
349 }
350
351 async fn mock_wal_prune_manager(
352 broker_endpoints: Vec<String>,
353 limit: usize,
354 ) -> (Sender<Event>, WalPruneManager) {
355 let test_env = TestEnv::new();
356 let (tx, rx) = WalPruneManager::channel();
357 let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
358 (
359 tx,
360 WalPruneManager::new(
361 limit,
362 false,
363 rx,
364 test_env.procedure_manager.clone(),
365 wal_prune_context,
366 ),
367 )
368 }
369
370 async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
371 let topic_name_keys = topics
372 .iter()
373 .map(|topic| TopicNameKey::new(topic))
374 .collect::<Vec<_>>();
375 manager
376 .wal_prune_context
377 .table_metadata_manager
378 .topic_name_manager()
379 .batch_put(topic_name_keys)
380 .await
381 .unwrap();
382 }
383
384 #[tokio::test]
385 async fn test_wal_prune_manager() {
386 maybe_skip_kafka_integration_test!();
387 let broker_endpoints = get_kafka_endpoints();
388 let limit = 6;
389 let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
390 let topics = (0..limit * 2)
391 .map(|_| uuid::Uuid::new_v4().to_string())
392 .collect::<Vec<_>>();
393 mock_topics(&manager, &topics).await;
394
395 let tracker = manager.tracker.clone();
396 let handler =
397 common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
398 handler.await.unwrap();
399
400 tx.send(Event::Tick).await.unwrap();
401 timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
403 .await
404 .unwrap();
405 }
406
407 #[tokio::test]
408 async fn test_find_pruneable_entry_id_for_topic_none() {
409 let test_env = TestEnv::new();
410 let prunable_entry_id = find_pruneable_entry_id_for_topic(
411 &test_env.table_metadata_manager,
412 &test_env.leader_region_registry,
413 "test_topic",
414 )
415 .await
416 .unwrap();
417 assert!(prunable_entry_id.is_none());
418 }
419
420 #[tokio::test]
421 async fn test_find_pruneable_entry_id_for_topic_some() {
422 let test_env = TestEnv::new();
423 let topic = "test_topic";
424 let expected_prunable_entry_id = new_wal_prune_metadata(
425 test_env.table_metadata_manager.clone(),
426 test_env.leader_region_registry.clone(),
427 2,
428 5,
429 &[3, 10, 23, 50, 52, 82, 130],
430 topic.to_string(),
431 )
432 .await;
433 let prunable_entry_id = find_pruneable_entry_id_for_topic(
434 &test_env.table_metadata_manager,
435 &test_env.leader_region_registry,
436 topic,
437 )
438 .await
439 .unwrap()
440 .unwrap();
441 assert_eq!(prunable_entry_id, expected_prunable_entry_id);
442 }
443}