meta_srv/procedure/wal_prune/
manager.rs1use std::collections::hash_set::Entry;
16use std::collections::HashSet;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::Duration;
20
21use common_meta::key::TableMetadataManagerRef;
22use common_meta::leadership_notifier::LeadershipChangeListener;
23use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
24use common_runtime::JoinHandle;
25use common_telemetry::{error, info, warn};
26use futures::future::join_all;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::mpsc::{Receiver, Sender};
29use tokio::sync::Semaphore;
30use tokio::time::{interval_at, Instant, MissedTickBehavior};
31
32use crate::error::{self, Result};
33use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
34use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
35
36pub type WalPruneTickerRef = Arc<WalPruneTicker>;
37
38#[derive(Clone)]
43pub struct WalPruneProcedureTracker {
44 running_procedures: Arc<RwLock<HashSet<String>>>,
45}
46
47impl WalPruneProcedureTracker {
48 pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
51 let mut running_procedures = self.running_procedures.write().unwrap();
52 match running_procedures.entry(topic_name.clone()) {
53 Entry::Occupied(_) => None,
54 Entry::Vacant(entry) => {
55 entry.insert();
56 Some(WalPruneProcedureGuard {
57 topic_name,
58 running_procedures: self.running_procedures.clone(),
59 })
60 }
61 }
62 }
63
64 pub fn len(&self) -> usize {
66 self.running_procedures.read().unwrap().len()
67 }
68}
69
70pub struct WalPruneProcedureGuard {
74 topic_name: String,
75 running_procedures: Arc<RwLock<HashSet<String>>>,
76}
77
78impl Drop for WalPruneProcedureGuard {
79 fn drop(&mut self) {
80 let mut running_procedures = self.running_procedures.write().unwrap();
81 running_procedures.remove(&self.topic_name);
82 }
83}
84
85pub enum Event {
89 Tick,
90}
91
92impl Debug for Event {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 match self {
95 Event::Tick => write!(f, "Tick"),
96 }
97 }
98}
99
100pub(crate) struct WalPruneTicker {
103 pub(crate) tick_handle: Mutex<Option<JoinHandle<()>>>,
105 pub(crate) tick_interval: Duration,
107 pub(crate) sender: Sender<Event>,
109}
110
111#[async_trait::async_trait]
112impl LeadershipChangeListener for WalPruneTicker {
113 fn name(&self) -> &'static str {
114 "WalPruneTicker"
115 }
116
117 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
118 self.start();
119 Ok(())
120 }
121
122 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
123 self.stop();
124 Ok(())
125 }
126}
127
128impl WalPruneTicker {
130 pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
131 Self {
132 tick_handle: Mutex::new(None),
133 tick_interval,
134 sender,
135 }
136 }
137
138 pub fn start(&self) {
140 let mut handle = self.tick_handle.lock().unwrap();
141 if handle.is_none() {
142 let sender = self.sender.clone();
143 let tick_interval = self.tick_interval;
144 let ticker_loop = tokio::spawn(async move {
145 let mut interval = interval_at(Instant::now() + tick_interval, tick_interval);
146 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
147 loop {
148 interval.tick().await;
149 if sender.send(Event::Tick).await.is_err() {
150 info!("EventReceiver is dropped, tick loop is stopped");
151 break;
152 }
153 }
154 });
155 *handle = Some(ticker_loop);
156 }
157 info!("WalPruneTicker started.");
158 }
159
160 pub fn stop(&self) {
162 let mut handle = self.tick_handle.lock().unwrap();
163 if let Some(handle) = handle.take() {
164 handle.abort();
165 }
166 info!("WalPruneTicker stopped.");
167 }
168}
169
170impl Drop for WalPruneTicker {
171 fn drop(&mut self) {
172 self.stop();
173 }
174}
175
176pub(crate) struct WalPruneManager {
183 table_metadata_manager: TableMetadataManagerRef,
185 receiver: Receiver<Event>,
187 procedure_manager: ProcedureManagerRef,
189 tracker: WalPruneProcedureTracker,
191 semaphore: Arc<Semaphore>,
193
194 wal_prune_context: WalPruneContext,
196 trigger_flush_threshold: u64,
199}
200
201impl WalPruneManager {
202 pub fn new(
204 table_metadata_manager: TableMetadataManagerRef,
205 limit: usize,
206 receiver: Receiver<Event>,
207 procedure_manager: ProcedureManagerRef,
208 wal_prune_context: WalPruneContext,
209 trigger_flush_threshold: u64,
210 ) -> Self {
211 Self {
212 table_metadata_manager,
213 receiver,
214 procedure_manager,
215 wal_prune_context,
216 tracker: WalPruneProcedureTracker {
217 running_procedures: Arc::new(RwLock::new(HashSet::new())),
218 },
219 semaphore: Arc::new(Semaphore::new(limit)),
220 trigger_flush_threshold,
221 }
222 }
223
224 pub async fn try_start(mut self) -> Result<()> {
226 let context = self.wal_prune_context.clone();
227 let tracker = self.tracker.clone();
228 self.procedure_manager
229 .register_loader(
230 WalPruneProcedure::TYPE_NAME,
231 Box::new(move |json| {
232 let tracker = tracker.clone();
233 WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
234 }),
235 )
236 .context(error::RegisterProcedureLoaderSnafu {
237 type_name: WalPruneProcedure::TYPE_NAME,
238 })?;
239 common_runtime::spawn_global(async move {
240 self.run().await;
241 });
242 info!("WalPruneProcedureManager Started.");
243 Ok(())
244 }
245
246 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
248 tokio::sync::mpsc::channel(1024)
249 }
250
251 pub(crate) async fn run(&mut self) {
255 while let Some(event) = self.receiver.recv().await {
256 match event {
257 Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
258 error!(e; "Failed to handle tick request");
259 }),
260 }
261 }
262 }
263
264 pub async fn submit_procedure(&self, topic_name: &str) -> Result<ProcedureId> {
266 let guard = self
267 .tracker
268 .insert_running_procedure(topic_name.to_string())
269 .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
270
271 let procedure = WalPruneProcedure::new(
272 topic_name.to_string(),
273 self.wal_prune_context.clone(),
274 self.trigger_flush_threshold,
275 Some(guard),
276 );
277 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
278 let procedure_id = procedure_with_id.id;
279 METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
280 .with_label_values(&[topic_name])
281 .inc();
282 let procedure_manager = self.procedure_manager.clone();
283 let mut watcher = procedure_manager
284 .submit(procedure_with_id)
285 .await
286 .context(error::SubmitProcedureSnafu)?;
287 watcher::wait(&mut watcher)
288 .await
289 .context(error::WaitProcedureSnafu)?;
290
291 Ok(procedure_id)
292 }
293
294 async fn handle_tick_request(&self) -> Result<()> {
295 let topics = self.retrieve_sorted_topics().await?;
296 let mut tasks = Vec::with_capacity(topics.len());
297 for topic_name in topics.iter() {
298 tasks.push(async {
299 let _permit = self.semaphore.acquire().await.unwrap();
300 match self.submit_procedure(topic_name).await {
301 Ok(_) => {}
302 Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
303 warn!("Prune task for topic {} is already running", topic);
304 }
305 Err(e) => {
306 error!(
307 "Failed to submit prune task for topic {}: {}",
308 topic_name.clone(),
309 e
310 );
311 }
312 }
313 });
314 }
315
316 join_all(tasks).await;
317 Ok(())
318 }
319
320 async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
324 self.table_metadata_manager
325 .topic_name_manager()
326 .range()
327 .await
328 .context(error::TableMetadataManagerSnafu)
329 }
330}
331
332#[cfg(test)]
333mod test {
334 use std::assert_matches::assert_matches;
335
336 use common_meta::key::topic_name::TopicNameKey;
337 use common_wal::test_util::run_test_with_kafka_wal;
338 use tokio::time::{sleep, timeout};
339
340 use super::*;
341 use crate::procedure::wal_prune::test_util::TestEnv;
342
343 #[tokio::test]
344 async fn test_wal_prune_ticker() {
345 let (tx, mut rx) = WalPruneManager::channel();
346 let interval = Duration::from_millis(10);
347 let ticker = WalPruneTicker::new(interval, tx);
348 assert_eq!(ticker.name(), "WalPruneTicker");
349
350 for _ in 0..2 {
351 ticker.start();
352 sleep(2 * interval).await;
353 assert!(!rx.is_empty());
354 while let Ok(event) = rx.try_recv() {
355 assert_matches!(event, Event::Tick);
356 }
357 }
358 ticker.stop();
359 }
360
361 #[tokio::test]
362 async fn test_wal_prune_tracker_and_guard() {
363 let tracker = WalPruneProcedureTracker {
364 running_procedures: Arc::new(RwLock::new(HashSet::new())),
365 };
366 let topic_name = uuid::Uuid::new_v4().to_string();
367 {
368 let guard = tracker
369 .insert_running_procedure(topic_name.clone())
370 .unwrap();
371 assert_eq!(guard.topic_name, topic_name);
372 assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
373
374 let result = tracker.insert_running_procedure(topic_name.clone());
375 assert!(result.is_none());
376 }
377 assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
378 }
379
380 async fn mock_wal_prune_manager(
381 broker_endpoints: Vec<String>,
382 limit: usize,
383 ) -> (Sender<Event>, WalPruneManager) {
384 let test_env = TestEnv::new();
385 let (tx, rx) = WalPruneManager::channel();
386 let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
387 (
388 tx,
389 WalPruneManager::new(
390 test_env.table_metadata_manager.clone(),
391 limit,
392 rx,
393 test_env.procedure_manager.clone(),
394 wal_prune_context,
395 0,
396 ),
397 )
398 }
399
400 async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
401 let topic_name_keys = topics
402 .iter()
403 .map(|topic| TopicNameKey::new(topic))
404 .collect::<Vec<_>>();
405 manager
406 .table_metadata_manager
407 .topic_name_manager()
408 .batch_put(topic_name_keys)
409 .await
410 .unwrap();
411 }
412
413 #[tokio::test]
414 async fn test_wal_prune_manager() {
415 run_test_with_kafka_wal(|broker_endpoints| {
416 Box::pin(async {
417 let limit = 6;
418 let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
419 let topics = (0..limit * 2)
420 .map(|_| uuid::Uuid::new_v4().to_string())
421 .collect::<Vec<_>>();
422 mock_topics(&manager, &topics).await;
423
424 let tracker = manager.tracker.clone();
425 let handler =
426 common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
427 handler.await.unwrap();
428
429 tx.send(Event::Tick).await.unwrap();
430 timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
432 .await
433 .unwrap();
434 })
435 })
436 .await;
437 }
438}