meta_srv/procedure/wal_prune/
manager.rs1use std::collections::hash_set::Entry;
16use std::collections::HashSet;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::Semaphore;
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41 running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45 pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48 let mut running_procedures = self.running_procedures.write().unwrap();
49 match running_procedures.entry(topic_name.clone()) {
50 Entry::Occupied(_) => None,
51 Entry::Vacant(entry) => {
52 entry.insert();
53 Some(WalPruneProcedureGuard {
54 topic_name,
55 running_procedures: self.running_procedures.clone(),
56 })
57 }
58 }
59 }
60
61 pub fn len(&self) -> usize {
63 self.running_procedures.read().unwrap().len()
64 }
65}
66
67pub struct WalPruneProcedureGuard {
71 topic_name: String,
72 running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76 fn drop(&mut self) {
77 let mut running_procedures = self.running_procedures.write().unwrap();
78 running_procedures.remove(&self.topic_name);
79 }
80}
81
82pub enum Event {
86 Tick,
87}
88
89impl Debug for Event {
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Event::Tick => write!(f, "Tick"),
93 }
94 }
95}
96
97define_ticker!(
98 WalPruneTicker,
101 event_type = Event,
102 event_value = Event::Tick
103);
104
105pub(crate) struct WalPruneManager {
112 receiver: Receiver<Event>,
114 procedure_manager: ProcedureManagerRef,
116 tracker: WalPruneProcedureTracker,
118 semaphore: Arc<Semaphore>,
120
121 wal_prune_context: WalPruneContext,
123}
124
125impl WalPruneManager {
126 pub fn new(
128 parallelism: usize,
129 receiver: Receiver<Event>,
130 procedure_manager: ProcedureManagerRef,
131 wal_prune_context: WalPruneContext,
132 ) -> Self {
133 Self {
134 receiver,
135 procedure_manager,
136 wal_prune_context,
137 tracker: WalPruneProcedureTracker {
138 running_procedures: Arc::new(RwLock::new(HashSet::new())),
139 },
140 semaphore: Arc::new(Semaphore::new(parallelism)),
141 }
142 }
143
144 pub async fn try_start(mut self) -> Result<()> {
146 let context = self.wal_prune_context.clone();
147 let tracker = self.tracker.clone();
148 self.procedure_manager
149 .register_loader(
150 WalPruneProcedure::TYPE_NAME,
151 Box::new(move |json| {
152 let tracker = tracker.clone();
153 WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
154 }),
155 )
156 .context(error::RegisterProcedureLoaderSnafu {
157 type_name: WalPruneProcedure::TYPE_NAME,
158 })?;
159 common_runtime::spawn_global(async move {
160 self.run().await;
161 });
162 info!("WalPruneProcedureManager Started.");
163 Ok(())
164 }
165
166 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
168 tokio::sync::mpsc::channel(1024)
169 }
170
171 pub(crate) async fn run(&mut self) {
175 while let Some(event) = self.receiver.recv().await {
176 match event {
177 Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
178 error!(e; "Failed to handle tick request");
179 }),
180 }
181 }
182 }
183
184 pub async fn wait_procedure(
186 &self,
187 topic_name: &str,
188 prunable_entry_id: u64,
189 ) -> Result<ProcedureId> {
190 let guard = self
191 .tracker
192 .insert_running_procedure(topic_name.to_string())
193 .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
194
195 let procedure = WalPruneProcedure::new(
196 self.wal_prune_context.clone(),
197 Some(guard),
198 topic_name.to_string(),
199 prunable_entry_id,
200 );
201 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
202 let procedure_id = procedure_with_id.id;
203 METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
204 .with_label_values(&[topic_name])
205 .inc();
206 let procedure_manager = self.procedure_manager.clone();
207 let mut watcher = procedure_manager
208 .submit(procedure_with_id)
209 .await
210 .context(error::SubmitProcedureSnafu)?;
211 watcher::wait(&mut watcher)
212 .await
213 .context(error::WaitProcedureSnafu)?;
214
215 Ok(procedure_id)
216 }
217
218 async fn try_prune(&self, topic_name: &str) -> Result<()> {
219 let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
220 let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
221 let prunable_entry_id = find_pruneable_entry_id_for_topic(
222 &table_metadata_manager,
223 &leader_region_registry,
224 topic_name,
225 )
226 .await?;
227 let Some(prunable_entry_id) = prunable_entry_id else {
228 debug!(
229 "No prunable entry id found for topic {}, skipping prune",
230 topic_name
231 );
232 return Ok(());
233 };
234 let current = table_metadata_manager
235 .topic_name_manager()
236 .get(topic_name)
237 .await
238 .context(error::TableMetadataManagerSnafu)?
239 .map(|v| v.into_inner().pruned_entry_id);
240 if !should_trigger_prune(current, prunable_entry_id) {
241 debug!(
242 "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
243 topic_name, current, prunable_entry_id
244 );
245 return Ok(());
246 }
247
248 self.wait_procedure(topic_name, prunable_entry_id)
249 .await
250 .map(|_| ())
251 }
252
253 async fn handle_tick_request(&self) -> Result<()> {
254 let topics = self.retrieve_sorted_topics().await?;
255 let mut tasks = Vec::with_capacity(topics.len());
256 for topic_name in topics.iter() {
257 tasks.push(async {
258 let _permit = self.semaphore.acquire().await.unwrap();
259 match self.try_prune(topic_name).await {
260 Ok(_) => {}
261 Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
262 warn!("Prune task for topic {} is already running", topic);
263 }
264 Err(e) => {
265 error!(
266 "Failed to submit prune task for topic {}: {}",
267 topic_name.clone(),
268 e
269 );
270 }
271 }
272 });
273 }
274
275 join_all(tasks).await;
276 Ok(())
277 }
278
279 async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
283 self.wal_prune_context
284 .table_metadata_manager
285 .topic_name_manager()
286 .range()
287 .await
288 .context(error::TableMetadataManagerSnafu)
289 }
290}
291
292#[cfg(test)]
293mod test {
294 use std::assert_matches::assert_matches;
295 use std::time::Duration;
296
297 use common_meta::key::topic_name::TopicNameKey;
298 use common_meta::leadership_notifier::LeadershipChangeListener;
299 use common_wal::maybe_skip_kafka_integration_test;
300 use common_wal::test_util::get_kafka_endpoints;
301 use tokio::time::{sleep, timeout};
302
303 use super::*;
304 use crate::procedure::test_util::new_wal_prune_metadata;
305 use crate::procedure::wal_prune::test_util::TestEnv;
306
307 #[tokio::test]
308 async fn test_wal_prune_ticker() {
309 common_telemetry::init_default_ut_logging();
310 let (tx, mut rx) = WalPruneManager::channel();
311 let interval = Duration::from_millis(50);
312 let ticker = WalPruneTicker::new(interval, tx);
313 assert_eq!(ticker.name(), "WalPruneTicker");
314
315 for _ in 0..2 {
316 ticker.start();
317 sleep(4 * interval).await;
319 assert!(!rx.is_empty());
320 while let Ok(event) = rx.try_recv() {
321 assert_matches!(event, Event::Tick);
322 }
323 }
324 ticker.stop();
325 }
326
327 #[tokio::test]
328 async fn test_wal_prune_tracker_and_guard() {
329 let tracker = WalPruneProcedureTracker {
330 running_procedures: Arc::new(RwLock::new(HashSet::new())),
331 };
332 let topic_name = uuid::Uuid::new_v4().to_string();
333 {
334 let guard = tracker
335 .insert_running_procedure(topic_name.clone())
336 .unwrap();
337 assert_eq!(guard.topic_name, topic_name);
338 assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
339
340 let result = tracker.insert_running_procedure(topic_name.clone());
341 assert!(result.is_none());
342 }
343 assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
344 }
345
346 async fn mock_wal_prune_manager(
347 broker_endpoints: Vec<String>,
348 limit: usize,
349 ) -> (Sender<Event>, WalPruneManager) {
350 let test_env = TestEnv::new();
351 let (tx, rx) = WalPruneManager::channel();
352 let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
353 (
354 tx,
355 WalPruneManager::new(
356 limit,
357 rx,
358 test_env.procedure_manager.clone(),
359 wal_prune_context,
360 ),
361 )
362 }
363
364 async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
365 let topic_name_keys = topics
366 .iter()
367 .map(|topic| TopicNameKey::new(topic))
368 .collect::<Vec<_>>();
369 manager
370 .wal_prune_context
371 .table_metadata_manager
372 .topic_name_manager()
373 .batch_put(topic_name_keys)
374 .await
375 .unwrap();
376 }
377
378 #[tokio::test]
379 async fn test_wal_prune_manager() {
380 maybe_skip_kafka_integration_test!();
381 let broker_endpoints = get_kafka_endpoints();
382 let limit = 6;
383 let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
384 let topics = (0..limit * 2)
385 .map(|_| uuid::Uuid::new_v4().to_string())
386 .collect::<Vec<_>>();
387 mock_topics(&manager, &topics).await;
388
389 let tracker = manager.tracker.clone();
390 let handler =
391 common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
392 handler.await.unwrap();
393
394 tx.send(Event::Tick).await.unwrap();
395 timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
397 .await
398 .unwrap();
399 }
400
401 #[tokio::test]
402 async fn test_find_pruneable_entry_id_for_topic_none() {
403 let test_env = TestEnv::new();
404 let prunable_entry_id = find_pruneable_entry_id_for_topic(
405 &test_env.table_metadata_manager,
406 &test_env.leader_region_registry,
407 "test_topic",
408 )
409 .await
410 .unwrap();
411 assert!(prunable_entry_id.is_none());
412 }
413
414 #[tokio::test]
415 async fn test_find_pruneable_entry_id_for_topic_some() {
416 let test_env = TestEnv::new();
417 let topic = "test_topic";
418 let expected_prunable_entry_id = new_wal_prune_metadata(
419 test_env.table_metadata_manager.clone(),
420 test_env.leader_region_registry.clone(),
421 2,
422 5,
423 &[3, 10, 23, 50, 52, 82, 130],
424 topic.to_string(),
425 )
426 .await;
427 let prunable_entry_id = find_pruneable_entry_id_for_topic(
428 &test_env.table_metadata_manager,
429 &test_env.leader_region_registry,
430 topic,
431 )
432 .await
433 .unwrap()
434 .unwrap();
435 assert_eq!(prunable_entry_id, expected_prunable_entry_id);
436 }
437}