meta_srv/procedure/wal_prune/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::collections::hash_set::Entry;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::Semaphore;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35/// Tracks running [WalPruneProcedure]s and the resources they hold.
36/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
37///
38/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
39#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41    running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45    /// Insert a running [WalPruneProcedure] for the given topic name and
46    /// consume acquire a semaphore permit for the given topic name.
47    pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48        let mut running_procedures = self.running_procedures.write().unwrap();
49        match running_procedures.entry(topic_name.clone()) {
50            Entry::Occupied(_) => None,
51            Entry::Vacant(entry) => {
52                entry.insert();
53                Some(WalPruneProcedureGuard {
54                    topic_name,
55                    running_procedures: self.running_procedures.clone(),
56                })
57            }
58        }
59    }
60
61    /// Number of running [WalPruneProcedure]s.
62    pub fn len(&self) -> usize {
63        self.running_procedures.read().unwrap().len()
64    }
65}
66
67/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
68/// It is used to track the running [WalPruneProcedure]s.
69/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
70pub struct WalPruneProcedureGuard {
71    topic_name: String,
72    running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76    fn drop(&mut self) {
77        let mut running_procedures = self.running_procedures.write().unwrap();
78        running_procedures.remove(&self.topic_name);
79    }
80}
81
82/// Event is used to notify the [WalPruneManager] to do some work.
83///
84/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
85pub enum Event {
86    Tick,
87}
88
89impl Debug for Event {
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        match self {
92            Event::Tick => write!(f, "Tick"),
93        }
94    }
95}
96
97define_ticker!(
98    /// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
99    /// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
100    WalPruneTicker,
101    event_type = Event,
102    event_value = Event::Tick
103);
104
105/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
106///
107/// [WalPruneManager] is responsible for:
108/// 1. Registering [WalPruneProcedure] loader in the procedure manager.
109/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
110/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
111pub(crate) struct WalPruneManager {
112    /// Receives [Event]s.
113    receiver: Receiver<Event>,
114    /// Procedure manager.
115    procedure_manager: ProcedureManagerRef,
116    /// Tracker for running [WalPruneProcedure]s.
117    tracker: WalPruneProcedureTracker,
118    /// Semaphore to limit the number of concurrent [WalPruneProcedure]s.
119    semaphore: Arc<Semaphore>,
120
121    /// Context for [WalPruneProcedure].
122    wal_prune_context: WalPruneContext,
123}
124
125impl WalPruneManager {
126    /// Returns a new empty [`WalPruneManager`].
127    pub fn new(
128        parallelism: usize,
129        receiver: Receiver<Event>,
130        procedure_manager: ProcedureManagerRef,
131        wal_prune_context: WalPruneContext,
132    ) -> Self {
133        Self {
134            receiver,
135            procedure_manager,
136            wal_prune_context,
137            tracker: WalPruneProcedureTracker {
138                running_procedures: Arc::new(RwLock::new(HashSet::new())),
139            },
140            semaphore: Arc::new(Semaphore::new(parallelism)),
141        }
142    }
143
144    /// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager.
145    pub async fn try_start(mut self) -> Result<()> {
146        let context = self.wal_prune_context.clone();
147        let tracker = self.tracker.clone();
148        self.procedure_manager
149            .register_loader(
150                WalPruneProcedure::TYPE_NAME,
151                Box::new(move |json| {
152                    let tracker = tracker.clone();
153                    WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
154                }),
155            )
156            .context(error::RegisterProcedureLoaderSnafu {
157                type_name: WalPruneProcedure::TYPE_NAME,
158            })?;
159        common_runtime::spawn_global(async move {
160            self.run().await;
161        });
162        info!("WalPruneProcedureManager Started.");
163        Ok(())
164    }
165
166    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
167    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
168        tokio::sync::mpsc::channel(1024)
169    }
170
171    /// Runs the main loop. Performs actions on received events.
172    ///
173    /// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL.
174    pub(crate) async fn run(&mut self) {
175        while let Some(event) = self.receiver.recv().await {
176            match event {
177                Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
178                    error!(e; "Failed to handle tick request");
179                }),
180            }
181        }
182    }
183
184    /// Submits a [WalPruneProcedure] for the given topic name.
185    pub async fn wait_procedure(
186        &self,
187        topic_name: &str,
188        prunable_entry_id: u64,
189    ) -> Result<ProcedureId> {
190        let guard = self
191            .tracker
192            .insert_running_procedure(topic_name.to_string())
193            .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
194
195        let procedure = WalPruneProcedure::new(
196            self.wal_prune_context.clone(),
197            Some(guard),
198            topic_name.to_string(),
199            prunable_entry_id,
200        );
201        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
202        let procedure_id = procedure_with_id.id;
203        METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
204            .with_label_values(&[topic_name])
205            .inc();
206        let procedure_manager = self.procedure_manager.clone();
207        let mut watcher = procedure_manager
208            .submit(procedure_with_id)
209            .await
210            .context(error::SubmitProcedureSnafu)?;
211        watcher::wait(&mut watcher)
212            .await
213            .context(error::WaitProcedureSnafu)?;
214
215        Ok(procedure_id)
216    }
217
218    async fn try_prune(&self, topic_name: &str) -> Result<()> {
219        let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
220        let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
221        let prunable_entry_id = find_pruneable_entry_id_for_topic(
222            &table_metadata_manager,
223            &leader_region_registry,
224            topic_name,
225        )
226        .await?;
227        let Some(prunable_entry_id) = prunable_entry_id else {
228            debug!(
229                "No prunable entry id found for topic {}, skipping prune",
230                topic_name
231            );
232            return Ok(());
233        };
234        let current = table_metadata_manager
235            .topic_name_manager()
236            .get(topic_name)
237            .await
238            .context(error::TableMetadataManagerSnafu)?
239            .map(|v| v.into_inner().pruned_entry_id);
240        if !should_trigger_prune(current, prunable_entry_id) {
241            debug!(
242                "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
243                topic_name, current, prunable_entry_id
244            );
245            return Ok(());
246        }
247
248        self.wait_procedure(topic_name, prunable_entry_id)
249            .await
250            .map(|_| ())
251    }
252
253    async fn handle_tick_request(&self) -> Result<()> {
254        let topics = self.retrieve_sorted_topics().await?;
255        let mut tasks = Vec::with_capacity(topics.len());
256        for topic_name in topics.iter() {
257            tasks.push(async {
258                let _permit = self.semaphore.acquire().await.unwrap();
259                match self.try_prune(topic_name).await {
260                    Ok(_) => {}
261                    Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
262                        warn!("Prune task for topic {} is already running", topic);
263                    }
264                    Err(err) => {
265                        error!(err; "Failed to prune remote WAL for topic {}", topic_name.as_str());
266                    }
267                }
268            });
269        }
270
271        join_all(tasks).await;
272        Ok(())
273    }
274
275    /// Retrieve topics from the table metadata manager.
276    /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
277    /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
278    async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
279        self.wal_prune_context
280            .table_metadata_manager
281            .topic_name_manager()
282            .range()
283            .await
284            .context(error::TableMetadataManagerSnafu)
285    }
286}
287
288#[cfg(test)]
289mod test {
290    use std::assert_matches::assert_matches;
291    use std::time::Duration;
292
293    use common_meta::key::topic_name::TopicNameKey;
294    use common_meta::leadership_notifier::LeadershipChangeListener;
295    use common_wal::maybe_skip_kafka_integration_test;
296    use common_wal::test_util::get_kafka_endpoints;
297    use tokio::time::{sleep, timeout};
298
299    use super::*;
300    use crate::procedure::test_util::new_wal_prune_metadata;
301    use crate::procedure::wal_prune::test_util::TestEnv;
302
303    #[tokio::test]
304    async fn test_wal_prune_ticker() {
305        common_telemetry::init_default_ut_logging();
306        let (tx, mut rx) = WalPruneManager::channel();
307        let interval = Duration::from_millis(50);
308        let ticker = WalPruneTicker::new(interval, tx);
309        assert_eq!(ticker.name(), "WalPruneTicker");
310
311        for _ in 0..2 {
312            ticker.start();
313            // wait a bit longer to make sure not all ticks are skipped
314            sleep(4 * interval).await;
315            assert!(!rx.is_empty());
316            while let Ok(event) = rx.try_recv() {
317                assert_matches!(event, Event::Tick);
318            }
319        }
320        ticker.stop();
321    }
322
323    #[tokio::test]
324    async fn test_wal_prune_tracker_and_guard() {
325        let tracker = WalPruneProcedureTracker {
326            running_procedures: Arc::new(RwLock::new(HashSet::new())),
327        };
328        let topic_name = uuid::Uuid::new_v4().to_string();
329        {
330            let guard = tracker
331                .insert_running_procedure(topic_name.clone())
332                .unwrap();
333            assert_eq!(guard.topic_name, topic_name);
334            assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
335
336            let result = tracker.insert_running_procedure(topic_name.clone());
337            assert!(result.is_none());
338        }
339        assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
340    }
341
342    async fn mock_wal_prune_manager(
343        broker_endpoints: Vec<String>,
344        limit: usize,
345    ) -> (Sender<Event>, WalPruneManager) {
346        let test_env = TestEnv::new();
347        let (tx, rx) = WalPruneManager::channel();
348        let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
349        (
350            tx,
351            WalPruneManager::new(
352                limit,
353                rx,
354                test_env.procedure_manager.clone(),
355                wal_prune_context,
356            ),
357        )
358    }
359
360    async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
361        let topic_name_keys = topics
362            .iter()
363            .map(|topic| TopicNameKey::new(topic))
364            .collect::<Vec<_>>();
365        manager
366            .wal_prune_context
367            .table_metadata_manager
368            .topic_name_manager()
369            .batch_put(topic_name_keys)
370            .await
371            .unwrap();
372    }
373
374    #[tokio::test]
375    async fn test_wal_prune_manager() {
376        maybe_skip_kafka_integration_test!();
377        let broker_endpoints = get_kafka_endpoints();
378        let limit = 6;
379        let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
380        let topics = (0..limit * 2)
381            .map(|_| uuid::Uuid::new_v4().to_string())
382            .collect::<Vec<_>>();
383        mock_topics(&manager, &topics).await;
384
385        let tracker = manager.tracker.clone();
386        let handler =
387            common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
388        handler.await.unwrap();
389
390        tx.send(Event::Tick).await.unwrap();
391        // Wait for at least one procedure to be submitted.
392        timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
393            .await
394            .unwrap();
395    }
396
397    #[tokio::test]
398    async fn test_find_pruneable_entry_id_for_topic_none() {
399        let test_env = TestEnv::new();
400        let prunable_entry_id = find_pruneable_entry_id_for_topic(
401            &test_env.table_metadata_manager,
402            &test_env.leader_region_registry,
403            "test_topic",
404        )
405        .await
406        .unwrap();
407        assert!(prunable_entry_id.is_none());
408    }
409
410    #[tokio::test]
411    async fn test_find_pruneable_entry_id_for_topic_some() {
412        let test_env = TestEnv::new();
413        let topic = "test_topic";
414        let expected_prunable_entry_id = new_wal_prune_metadata(
415            test_env.table_metadata_manager.clone(),
416            test_env.leader_region_registry.clone(),
417            2,
418            5,
419            &[3, 10, 23, 50, 52, 82, 130],
420            topic.to_string(),
421        )
422        .await;
423        let prunable_entry_id = find_pruneable_entry_id_for_topic(
424            &test_env.table_metadata_manager,
425            &test_env.leader_region_registry,
426            topic,
427        )
428        .await
429        .unwrap()
430        .unwrap();
431        assert_eq!(prunable_entry_id, expected_prunable_entry_id);
432    }
433}