meta_srv/procedure/wal_prune/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_set::Entry;
16use std::collections::HashSet;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::Semaphore;
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35/// Tracks running [WalPruneProcedure]s and the resources they hold.
36/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
37///
38/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
39#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41    running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45    /// Insert a running [WalPruneProcedure] for the given topic name and
46    /// consume acquire a semaphore permit for the given topic name.
47    pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48        let mut running_procedures = self.running_procedures.write().unwrap();
49        match running_procedures.entry(topic_name.clone()) {
50            Entry::Occupied(_) => None,
51            Entry::Vacant(entry) => {
52                entry.insert();
53                Some(WalPruneProcedureGuard {
54                    topic_name,
55                    running_procedures: self.running_procedures.clone(),
56                })
57            }
58        }
59    }
60
61    /// Number of running [WalPruneProcedure]s.
62    pub fn len(&self) -> usize {
63        self.running_procedures.read().unwrap().len()
64    }
65}
66
67/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
68/// It is used to track the running [WalPruneProcedure]s.
69/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
70pub struct WalPruneProcedureGuard {
71    topic_name: String,
72    running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76    fn drop(&mut self) {
77        let mut running_procedures = self.running_procedures.write().unwrap();
78        running_procedures.remove(&self.topic_name);
79    }
80}
81
82/// Event is used to notify the [WalPruneManager] to do some work.
83///
84/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
85pub enum Event {
86    Tick,
87}
88
89impl Debug for Event {
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        match self {
92            Event::Tick => write!(f, "Tick"),
93        }
94    }
95}
96
97define_ticker!(
98    /// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
99    /// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
100    WalPruneTicker,
101    event_type = Event,
102    event_value = Event::Tick
103);
104
105/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
106///
107/// [WalPruneManager] is responsible for:
108/// 1. Registering [WalPruneProcedure] loader in the procedure manager.
109/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
110/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
111pub(crate) struct WalPruneManager {
112    /// Receives [Event]s.
113    receiver: Receiver<Event>,
114    /// Procedure manager.
115    procedure_manager: ProcedureManagerRef,
116    /// Tracker for running [WalPruneProcedure]s.
117    tracker: WalPruneProcedureTracker,
118    /// Semaphore to limit the number of concurrent [WalPruneProcedure]s.
119    semaphore: Arc<Semaphore>,
120
121    /// Context for [WalPruneProcedure].
122    wal_prune_context: WalPruneContext,
123}
124
125impl WalPruneManager {
126    /// Returns a new empty [`WalPruneManager`].
127    pub fn new(
128        parallelism: usize,
129        receiver: Receiver<Event>,
130        procedure_manager: ProcedureManagerRef,
131        wal_prune_context: WalPruneContext,
132    ) -> Self {
133        Self {
134            receiver,
135            procedure_manager,
136            wal_prune_context,
137            tracker: WalPruneProcedureTracker {
138                running_procedures: Arc::new(RwLock::new(HashSet::new())),
139            },
140            semaphore: Arc::new(Semaphore::new(parallelism)),
141        }
142    }
143
144    /// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager.
145    pub async fn try_start(mut self) -> Result<()> {
146        let context = self.wal_prune_context.clone();
147        let tracker = self.tracker.clone();
148        self.procedure_manager
149            .register_loader(
150                WalPruneProcedure::TYPE_NAME,
151                Box::new(move |json| {
152                    let tracker = tracker.clone();
153                    WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
154                }),
155            )
156            .context(error::RegisterProcedureLoaderSnafu {
157                type_name: WalPruneProcedure::TYPE_NAME,
158            })?;
159        common_runtime::spawn_global(async move {
160            self.run().await;
161        });
162        info!("WalPruneProcedureManager Started.");
163        Ok(())
164    }
165
166    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
167    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
168        tokio::sync::mpsc::channel(1024)
169    }
170
171    /// Runs the main loop. Performs actions on received events.
172    ///
173    /// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL.
174    pub(crate) async fn run(&mut self) {
175        while let Some(event) = self.receiver.recv().await {
176            match event {
177                Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
178                    error!(e; "Failed to handle tick request");
179                }),
180            }
181        }
182    }
183
184    /// Submits a [WalPruneProcedure] for the given topic name.
185    pub async fn wait_procedure(
186        &self,
187        topic_name: &str,
188        prunable_entry_id: u64,
189    ) -> Result<ProcedureId> {
190        let guard = self
191            .tracker
192            .insert_running_procedure(topic_name.to_string())
193            .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
194
195        let procedure = WalPruneProcedure::new(
196            self.wal_prune_context.clone(),
197            Some(guard),
198            topic_name.to_string(),
199            prunable_entry_id,
200        );
201        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
202        let procedure_id = procedure_with_id.id;
203        METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
204            .with_label_values(&[topic_name])
205            .inc();
206        let procedure_manager = self.procedure_manager.clone();
207        let mut watcher = procedure_manager
208            .submit(procedure_with_id)
209            .await
210            .context(error::SubmitProcedureSnafu)?;
211        watcher::wait(&mut watcher)
212            .await
213            .context(error::WaitProcedureSnafu)?;
214
215        Ok(procedure_id)
216    }
217
218    async fn try_prune(&self, topic_name: &str) -> Result<()> {
219        let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
220        let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
221        let prunable_entry_id = find_pruneable_entry_id_for_topic(
222            &table_metadata_manager,
223            &leader_region_registry,
224            topic_name,
225        )
226        .await?;
227        let Some(prunable_entry_id) = prunable_entry_id else {
228            debug!(
229                "No prunable entry id found for topic {}, skipping prune",
230                topic_name
231            );
232            return Ok(());
233        };
234        let current = table_metadata_manager
235            .topic_name_manager()
236            .get(topic_name)
237            .await
238            .context(error::TableMetadataManagerSnafu)?
239            .map(|v| v.into_inner().pruned_entry_id);
240        if !should_trigger_prune(current, prunable_entry_id) {
241            debug!(
242                "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
243                topic_name, current, prunable_entry_id
244            );
245            return Ok(());
246        }
247
248        self.wait_procedure(topic_name, prunable_entry_id)
249            .await
250            .map(|_| ())
251    }
252
253    async fn handle_tick_request(&self) -> Result<()> {
254        let topics = self.retrieve_sorted_topics().await?;
255        let mut tasks = Vec::with_capacity(topics.len());
256        for topic_name in topics.iter() {
257            tasks.push(async {
258                let _permit = self.semaphore.acquire().await.unwrap();
259                match self.try_prune(topic_name).await {
260                    Ok(_) => {}
261                    Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
262                        warn!("Prune task for topic {} is already running", topic);
263                    }
264                    Err(e) => {
265                        error!(
266                            "Failed to submit prune task for topic {}: {}",
267                            topic_name.clone(),
268                            e
269                        );
270                    }
271                }
272            });
273        }
274
275        join_all(tasks).await;
276        Ok(())
277    }
278
279    /// Retrieve topics from the table metadata manager.
280    /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
281    /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
282    async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
283        self.wal_prune_context
284            .table_metadata_manager
285            .topic_name_manager()
286            .range()
287            .await
288            .context(error::TableMetadataManagerSnafu)
289    }
290}
291
292#[cfg(test)]
293mod test {
294    use std::assert_matches::assert_matches;
295    use std::time::Duration;
296
297    use common_meta::key::topic_name::TopicNameKey;
298    use common_meta::leadership_notifier::LeadershipChangeListener;
299    use common_wal::maybe_skip_kafka_integration_test;
300    use common_wal::test_util::get_kafka_endpoints;
301    use tokio::time::{sleep, timeout};
302
303    use super::*;
304    use crate::procedure::test_util::new_wal_prune_metadata;
305    use crate::procedure::wal_prune::test_util::TestEnv;
306
307    #[tokio::test]
308    async fn test_wal_prune_ticker() {
309        common_telemetry::init_default_ut_logging();
310        let (tx, mut rx) = WalPruneManager::channel();
311        let interval = Duration::from_millis(50);
312        let ticker = WalPruneTicker::new(interval, tx);
313        assert_eq!(ticker.name(), "WalPruneTicker");
314
315        for _ in 0..2 {
316            ticker.start();
317            // wait a bit longer to make sure not all ticks are skipped
318            sleep(4 * interval).await;
319            assert!(!rx.is_empty());
320            while let Ok(event) = rx.try_recv() {
321                assert_matches!(event, Event::Tick);
322            }
323        }
324        ticker.stop();
325    }
326
327    #[tokio::test]
328    async fn test_wal_prune_tracker_and_guard() {
329        let tracker = WalPruneProcedureTracker {
330            running_procedures: Arc::new(RwLock::new(HashSet::new())),
331        };
332        let topic_name = uuid::Uuid::new_v4().to_string();
333        {
334            let guard = tracker
335                .insert_running_procedure(topic_name.clone())
336                .unwrap();
337            assert_eq!(guard.topic_name, topic_name);
338            assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
339
340            let result = tracker.insert_running_procedure(topic_name.clone());
341            assert!(result.is_none());
342        }
343        assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
344    }
345
346    async fn mock_wal_prune_manager(
347        broker_endpoints: Vec<String>,
348        limit: usize,
349    ) -> (Sender<Event>, WalPruneManager) {
350        let test_env = TestEnv::new();
351        let (tx, rx) = WalPruneManager::channel();
352        let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
353        (
354            tx,
355            WalPruneManager::new(
356                limit,
357                rx,
358                test_env.procedure_manager.clone(),
359                wal_prune_context,
360            ),
361        )
362    }
363
364    async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
365        let topic_name_keys = topics
366            .iter()
367            .map(|topic| TopicNameKey::new(topic))
368            .collect::<Vec<_>>();
369        manager
370            .wal_prune_context
371            .table_metadata_manager
372            .topic_name_manager()
373            .batch_put(topic_name_keys)
374            .await
375            .unwrap();
376    }
377
378    #[tokio::test]
379    async fn test_wal_prune_manager() {
380        maybe_skip_kafka_integration_test!();
381        let broker_endpoints = get_kafka_endpoints();
382        let limit = 6;
383        let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
384        let topics = (0..limit * 2)
385            .map(|_| uuid::Uuid::new_v4().to_string())
386            .collect::<Vec<_>>();
387        mock_topics(&manager, &topics).await;
388
389        let tracker = manager.tracker.clone();
390        let handler =
391            common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
392        handler.await.unwrap();
393
394        tx.send(Event::Tick).await.unwrap();
395        // Wait for at least one procedure to be submitted.
396        timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
397            .await
398            .unwrap();
399    }
400
401    #[tokio::test]
402    async fn test_find_pruneable_entry_id_for_topic_none() {
403        let test_env = TestEnv::new();
404        let prunable_entry_id = find_pruneable_entry_id_for_topic(
405            &test_env.table_metadata_manager,
406            &test_env.leader_region_registry,
407            "test_topic",
408        )
409        .await
410        .unwrap();
411        assert!(prunable_entry_id.is_none());
412    }
413
414    #[tokio::test]
415    async fn test_find_pruneable_entry_id_for_topic_some() {
416        let test_env = TestEnv::new();
417        let topic = "test_topic";
418        let expected_prunable_entry_id = new_wal_prune_metadata(
419            test_env.table_metadata_manager.clone(),
420            test_env.leader_region_registry.clone(),
421            2,
422            5,
423            &[3, 10, 23, 50, 52, 82, 130],
424            topic.to_string(),
425        )
426        .await;
427        let prunable_entry_id = find_pruneable_entry_id_for_topic(
428            &test_env.table_metadata_manager,
429            &test_env.leader_region_registry,
430            topic,
431        )
432        .await
433        .unwrap()
434        .unwrap();
435        assert_eq!(prunable_entry_id, expected_prunable_entry_id);
436    }
437}