Skip to main content

meta_srv/procedure/wal_prune/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::collections::hash_set::Entry;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, RwLock};
19
20use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
21use common_telemetry::{debug, error, info, warn};
22use futures::future::join_all;
23use snafu::{OptionExt, ResultExt};
24use tokio::sync::Semaphore;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::define_ticker;
28use crate::error::{self, Result};
29use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
30use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
31use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
32
33pub type WalPruneTickerRef = Arc<WalPruneTicker>;
34
35/// Tracks running [WalPruneProcedure]s and the resources they hold.
36/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
37///
38/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
39#[derive(Clone)]
40pub struct WalPruneProcedureTracker {
41    running_procedures: Arc<RwLock<HashSet<String>>>,
42}
43
44impl WalPruneProcedureTracker {
45    /// Insert a running [WalPruneProcedure] for the given topic name and
46    /// consume acquire a semaphore permit for the given topic name.
47    pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
48        let mut running_procedures = self.running_procedures.write().unwrap();
49        match running_procedures.entry(topic_name.clone()) {
50            Entry::Occupied(_) => None,
51            Entry::Vacant(entry) => {
52                entry.insert();
53                Some(WalPruneProcedureGuard {
54                    topic_name,
55                    running_procedures: self.running_procedures.clone(),
56                })
57            }
58        }
59    }
60
61    /// Number of running [WalPruneProcedure]s.
62    pub fn len(&self) -> usize {
63        self.running_procedures.read().unwrap().len()
64    }
65}
66
67/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
68/// It is used to track the running [WalPruneProcedure]s.
69/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
70pub struct WalPruneProcedureGuard {
71    topic_name: String,
72    running_procedures: Arc<RwLock<HashSet<String>>>,
73}
74
75impl Drop for WalPruneProcedureGuard {
76    fn drop(&mut self) {
77        let mut running_procedures = self.running_procedures.write().unwrap();
78        running_procedures.remove(&self.topic_name);
79    }
80}
81
82/// Event is used to notify the [WalPruneManager] to do some work.
83///
84/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
85pub enum Event {
86    Tick,
87}
88
89impl Debug for Event {
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        match self {
92            Event::Tick => write!(f, "Tick"),
93        }
94    }
95}
96
97define_ticker!(
98    /// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
99    /// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
100    WalPruneTicker,
101    event_type = Event,
102    event_value = Event::Tick
103);
104
105/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
106///
107/// [WalPruneManager] is responsible for:
108/// 1. Registering [WalPruneProcedure] loader in the procedure manager.
109/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
110/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
111pub(crate) struct WalPruneManager {
112    /// Receives [Event]s.
113    receiver: Receiver<Event>,
114    /// Procedure manager.
115    procedure_manager: ProcedureManagerRef,
116    /// Tracker for running [WalPruneProcedure]s.
117    tracker: WalPruneProcedureTracker,
118    /// Semaphore to limit the number of concurrent [WalPruneProcedure]s.
119    semaphore: Arc<Semaphore>,
120    /// Whether pruning only updates metadata and skips Kafka DeleteRecords.
121    logical_delete: bool,
122
123    /// Context for [WalPruneProcedure].
124    wal_prune_context: WalPruneContext,
125}
126
127impl WalPruneManager {
128    /// Returns a new empty [`WalPruneManager`].
129    pub fn new(
130        parallelism: usize,
131        logical_delete: bool,
132        receiver: Receiver<Event>,
133        procedure_manager: ProcedureManagerRef,
134        wal_prune_context: WalPruneContext,
135    ) -> Self {
136        Self {
137            receiver,
138            procedure_manager,
139            wal_prune_context,
140            tracker: WalPruneProcedureTracker {
141                running_procedures: Arc::new(RwLock::new(HashSet::new())),
142            },
143            semaphore: Arc::new(Semaphore::new(parallelism)),
144            logical_delete,
145        }
146    }
147
148    /// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager.
149    pub async fn try_start(mut self) -> Result<()> {
150        let context = self.wal_prune_context.clone();
151        let tracker = self.tracker.clone();
152        self.procedure_manager
153            .register_loader(
154                WalPruneProcedure::TYPE_NAME,
155                Box::new(move |json| {
156                    let tracker = tracker.clone();
157                    WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
158                }),
159            )
160            .context(error::RegisterProcedureLoaderSnafu {
161                type_name: WalPruneProcedure::TYPE_NAME,
162            })?;
163        common_runtime::spawn_global(async move {
164            self.run().await;
165        });
166        info!("WalPruneProcedureManager Started.");
167        Ok(())
168    }
169
170    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
171    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
172        tokio::sync::mpsc::channel(1024)
173    }
174
175    /// Runs the main loop. Performs actions on received events.
176    ///
177    /// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL.
178    pub(crate) async fn run(&mut self) {
179        while let Some(event) = self.receiver.recv().await {
180            match event {
181                Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
182                    error!(e; "Failed to handle tick request");
183                }),
184            }
185        }
186    }
187
188    /// Submits a [WalPruneProcedure] for the given topic name.
189    pub async fn wait_procedure(
190        &self,
191        topic_name: &str,
192        prunable_entry_id: u64,
193    ) -> Result<ProcedureId> {
194        let guard = self
195            .tracker
196            .insert_running_procedure(topic_name.to_string())
197            .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
198
199        let procedure = WalPruneProcedure::new(
200            self.wal_prune_context.clone(),
201            Some(guard),
202            topic_name.to_string(),
203            prunable_entry_id,
204            self.logical_delete,
205        );
206        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
207        let procedure_id = procedure_with_id.id;
208        METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
209            .with_label_values(&[topic_name])
210            .inc();
211        let procedure_manager = self.procedure_manager.clone();
212        let mut watcher = procedure_manager
213            .submit(procedure_with_id)
214            .await
215            .context(error::SubmitProcedureSnafu)?;
216        watcher::wait(&mut watcher)
217            .await
218            .context(error::WaitProcedureSnafu)?;
219
220        Ok(procedure_id)
221    }
222
223    async fn try_prune(&self, topic_name: &str) -> Result<()> {
224        let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
225        let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
226        let prunable_entry_id = find_pruneable_entry_id_for_topic(
227            &table_metadata_manager,
228            &leader_region_registry,
229            topic_name,
230        )
231        .await?;
232        let Some(prunable_entry_id) = prunable_entry_id else {
233            debug!(
234                "No prunable entry id found for topic {}, skipping prune",
235                topic_name
236            );
237            return Ok(());
238        };
239        let current = table_metadata_manager
240            .topic_name_manager()
241            .get(topic_name)
242            .await
243            .context(error::TableMetadataManagerSnafu)?
244            .map(|v| v.into_inner().pruned_entry_id);
245        debug!(
246            "Found prunable entry id {} for topic {}, current pruned entry id: {:?}",
247            prunable_entry_id, topic_name, current
248        );
249        if !should_trigger_prune(current, prunable_entry_id) {
250            debug!(
251                "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
252                topic_name, current, prunable_entry_id
253            );
254            return Ok(());
255        }
256
257        self.wait_procedure(topic_name, prunable_entry_id)
258            .await
259            .map(|_| ())
260    }
261
262    async fn handle_tick_request(&self) -> Result<()> {
263        let topics = self.retrieve_sorted_topics().await?;
264        let mut tasks = Vec::with_capacity(topics.len());
265        for topic_name in topics.iter() {
266            tasks.push(async {
267                let _permit = self.semaphore.acquire().await.unwrap();
268                match self.try_prune(topic_name).await {
269                    Ok(_) => {}
270                    Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
271                        warn!("Prune task for topic {} is already running", topic);
272                    }
273                    Err(err) => {
274                        error!(err; "Failed to prune remote WAL for topic {}", topic_name.as_str());
275                    }
276                }
277            });
278        }
279
280        join_all(tasks).await;
281        Ok(())
282    }
283
284    /// Retrieve topics from the table metadata manager.
285    /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
286    /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
287    async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
288        self.wal_prune_context
289            .table_metadata_manager
290            .topic_name_manager()
291            .range()
292            .await
293            .context(error::TableMetadataManagerSnafu)
294    }
295}
296
297#[cfg(test)]
298mod test {
299    use std::assert_matches;
300    use std::time::Duration;
301
302    use common_meta::key::topic_name::TopicNameKey;
303    use common_meta::leadership_notifier::LeadershipChangeListener;
304    use common_wal::maybe_skip_kafka_integration_test;
305    use common_wal::test_util::get_kafka_endpoints;
306    use tokio::time::{sleep, timeout};
307
308    use super::*;
309    use crate::procedure::test_util::new_wal_prune_metadata;
310    use crate::procedure::wal_prune::test_util::TestEnv;
311
312    #[tokio::test]
313    async fn test_wal_prune_ticker() {
314        common_telemetry::init_default_ut_logging();
315        let (tx, mut rx) = WalPruneManager::channel();
316        let interval = Duration::from_millis(50);
317        let ticker = WalPruneTicker::new(interval, tx);
318        assert_eq!(ticker.name(), "WalPruneTicker");
319
320        for _ in 0..2 {
321            ticker.start();
322            // wait a bit longer to make sure not all ticks are skipped
323            sleep(4 * interval).await;
324            assert!(!rx.is_empty());
325            while let Ok(event) = rx.try_recv() {
326                assert_matches!(event, Event::Tick);
327            }
328        }
329        ticker.stop();
330    }
331
332    #[tokio::test]
333    async fn test_wal_prune_tracker_and_guard() {
334        let tracker = WalPruneProcedureTracker {
335            running_procedures: Arc::new(RwLock::new(HashSet::new())),
336        };
337        let topic_name = uuid::Uuid::new_v4().to_string();
338        {
339            let guard = tracker
340                .insert_running_procedure(topic_name.clone())
341                .unwrap();
342            assert_eq!(guard.topic_name, topic_name);
343            assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
344
345            let result = tracker.insert_running_procedure(topic_name.clone());
346            assert!(result.is_none());
347        }
348        assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
349    }
350
351    async fn mock_wal_prune_manager(
352        broker_endpoints: Vec<String>,
353        limit: usize,
354    ) -> (Sender<Event>, WalPruneManager) {
355        let test_env = TestEnv::new();
356        let (tx, rx) = WalPruneManager::channel();
357        let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
358        (
359            tx,
360            WalPruneManager::new(
361                limit,
362                false,
363                rx,
364                test_env.procedure_manager.clone(),
365                wal_prune_context,
366            ),
367        )
368    }
369
370    async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
371        let topic_name_keys = topics
372            .iter()
373            .map(|topic| TopicNameKey::new(topic))
374            .collect::<Vec<_>>();
375        manager
376            .wal_prune_context
377            .table_metadata_manager
378            .topic_name_manager()
379            .batch_put(topic_name_keys)
380            .await
381            .unwrap();
382    }
383
384    #[tokio::test]
385    async fn test_wal_prune_manager() {
386        maybe_skip_kafka_integration_test!();
387        let broker_endpoints = get_kafka_endpoints();
388        let limit = 6;
389        let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
390        let topics = (0..limit * 2)
391            .map(|_| uuid::Uuid::new_v4().to_string())
392            .collect::<Vec<_>>();
393        mock_topics(&manager, &topics).await;
394
395        let tracker = manager.tracker.clone();
396        let handler =
397            common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
398        handler.await.unwrap();
399
400        tx.send(Event::Tick).await.unwrap();
401        // Wait for at least one procedure to be submitted.
402        timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
403            .await
404            .unwrap();
405    }
406
407    #[tokio::test]
408    async fn test_find_pruneable_entry_id_for_topic_none() {
409        let test_env = TestEnv::new();
410        let prunable_entry_id = find_pruneable_entry_id_for_topic(
411            &test_env.table_metadata_manager,
412            &test_env.leader_region_registry,
413            "test_topic",
414        )
415        .await
416        .unwrap();
417        assert!(prunable_entry_id.is_none());
418    }
419
420    #[tokio::test]
421    async fn test_find_pruneable_entry_id_for_topic_some() {
422        let test_env = TestEnv::new();
423        let topic = "test_topic";
424        let expected_prunable_entry_id = new_wal_prune_metadata(
425            test_env.table_metadata_manager.clone(),
426            test_env.leader_region_registry.clone(),
427            2,
428            5,
429            &[3, 10, 23, 50, 52, 82, 130],
430            topic.to_string(),
431        )
432        .await;
433        let prunable_entry_id = find_pruneable_entry_id_for_topic(
434            &test_env.table_metadata_manager,
435            &test_env.leader_region_registry,
436            topic,
437        )
438        .await
439        .unwrap()
440        .unwrap();
441        assert_eq!(prunable_entry_id, expected_prunable_entry_id);
442    }
443}