meta_srv/procedure/wal_prune/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_set::Entry;
16use std::collections::HashSet;
17use std::fmt::{Debug, Formatter};
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::Duration;
20
21use common_meta::key::TableMetadataManagerRef;
22use common_meta::leadership_notifier::LeadershipChangeListener;
23use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
24use common_runtime::JoinHandle;
25use common_telemetry::{error, info, warn};
26use futures::future::join_all;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::mpsc::{Receiver, Sender};
29use tokio::sync::Semaphore;
30use tokio::time::{interval_at, Instant, MissedTickBehavior};
31
32use crate::error::{self, Result};
33use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
34use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
35
36pub type WalPruneTickerRef = Arc<WalPruneTicker>;
37
38/// Tracks running [WalPruneProcedure]s and the resources they hold.
39/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
40///
41/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
42#[derive(Clone)]
43pub struct WalPruneProcedureTracker {
44    running_procedures: Arc<RwLock<HashSet<String>>>,
45}
46
47impl WalPruneProcedureTracker {
48    /// Insert a running [WalPruneProcedure] for the given topic name and
49    /// consume acquire a semaphore permit for the given topic name.
50    pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
51        let mut running_procedures = self.running_procedures.write().unwrap();
52        match running_procedures.entry(topic_name.clone()) {
53            Entry::Occupied(_) => None,
54            Entry::Vacant(entry) => {
55                entry.insert();
56                Some(WalPruneProcedureGuard {
57                    topic_name,
58                    running_procedures: self.running_procedures.clone(),
59                })
60            }
61        }
62    }
63
64    /// Number of running [WalPruneProcedure]s.
65    pub fn len(&self) -> usize {
66        self.running_procedures.read().unwrap().len()
67    }
68}
69
70/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
71/// It is used to track the running [WalPruneProcedure]s.
72/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
73pub struct WalPruneProcedureGuard {
74    topic_name: String,
75    running_procedures: Arc<RwLock<HashSet<String>>>,
76}
77
78impl Drop for WalPruneProcedureGuard {
79    fn drop(&mut self) {
80        let mut running_procedures = self.running_procedures.write().unwrap();
81        running_procedures.remove(&self.topic_name);
82    }
83}
84
85/// Event is used to notify the [WalPruneManager] to do some work.
86///
87/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
88pub enum Event {
89    Tick,
90}
91
92impl Debug for Event {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        match self {
95            Event::Tick => write!(f, "Tick"),
96        }
97    }
98}
99
100/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
101/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
102pub(crate) struct WalPruneTicker {
103    /// Handle of ticker thread.
104    pub(crate) tick_handle: Mutex<Option<JoinHandle<()>>>,
105    /// The interval of tick.
106    pub(crate) tick_interval: Duration,
107    /// Sends [Event]s.
108    pub(crate) sender: Sender<Event>,
109}
110
111#[async_trait::async_trait]
112impl LeadershipChangeListener for WalPruneTicker {
113    fn name(&self) -> &'static str {
114        "WalPruneTicker"
115    }
116
117    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
118        self.start();
119        Ok(())
120    }
121
122    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
123        self.stop();
124        Ok(())
125    }
126}
127
128/// TODO(CookiePie): Similar to [RegionSupervisorTicker], maybe can refactor to a unified framework.
129impl WalPruneTicker {
130    pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
131        Self {
132            tick_handle: Mutex::new(None),
133            tick_interval,
134            sender,
135        }
136    }
137
138    /// Starts the ticker.
139    pub fn start(&self) {
140        let mut handle = self.tick_handle.lock().unwrap();
141        if handle.is_none() {
142            let sender = self.sender.clone();
143            let tick_interval = self.tick_interval;
144            let ticker_loop = tokio::spawn(async move {
145                let mut interval = interval_at(Instant::now() + tick_interval, tick_interval);
146                interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
147                loop {
148                    interval.tick().await;
149                    if sender.send(Event::Tick).await.is_err() {
150                        info!("EventReceiver is dropped, tick loop is stopped");
151                        break;
152                    }
153                }
154            });
155            *handle = Some(ticker_loop);
156        }
157        info!("WalPruneTicker started.");
158    }
159
160    /// Stops the ticker.
161    pub fn stop(&self) {
162        let mut handle = self.tick_handle.lock().unwrap();
163        if let Some(handle) = handle.take() {
164            handle.abort();
165        }
166        info!("WalPruneTicker stopped.");
167    }
168}
169
170impl Drop for WalPruneTicker {
171    fn drop(&mut self) {
172        self.stop();
173    }
174}
175
176/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
177///
178/// [WalPruneManager] is responsible for:
179/// 1. Registering [WalPruneProcedure] loader in the procedure manager.
180/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
181/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
182pub(crate) struct WalPruneManager {
183    /// Table metadata manager to restore topics from kvbackend.
184    table_metadata_manager: TableMetadataManagerRef,
185    /// Receives [Event]s.
186    receiver: Receiver<Event>,
187    /// Procedure manager.
188    procedure_manager: ProcedureManagerRef,
189    /// Tracker for running [WalPruneProcedure]s.
190    tracker: WalPruneProcedureTracker,
191    /// Semaphore to limit the number of concurrent [WalPruneProcedure]s.
192    semaphore: Arc<Semaphore>,
193
194    /// Context for [WalPruneProcedure].
195    wal_prune_context: WalPruneContext,
196    /// Trigger flush threshold for [WalPruneProcedure].
197    /// If `None`, never send flush requests.
198    trigger_flush_threshold: u64,
199}
200
201impl WalPruneManager {
202    /// Returns a new empty [WalPruneManager].
203    pub fn new(
204        table_metadata_manager: TableMetadataManagerRef,
205        limit: usize,
206        receiver: Receiver<Event>,
207        procedure_manager: ProcedureManagerRef,
208        wal_prune_context: WalPruneContext,
209        trigger_flush_threshold: u64,
210    ) -> Self {
211        Self {
212            table_metadata_manager,
213            receiver,
214            procedure_manager,
215            wal_prune_context,
216            tracker: WalPruneProcedureTracker {
217                running_procedures: Arc::new(RwLock::new(HashSet::new())),
218            },
219            semaphore: Arc::new(Semaphore::new(limit)),
220            trigger_flush_threshold,
221        }
222    }
223
224    /// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager.
225    pub async fn try_start(mut self) -> Result<()> {
226        let context = self.wal_prune_context.clone();
227        let tracker = self.tracker.clone();
228        self.procedure_manager
229            .register_loader(
230                WalPruneProcedure::TYPE_NAME,
231                Box::new(move |json| {
232                    let tracker = tracker.clone();
233                    WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
234                }),
235            )
236            .context(error::RegisterProcedureLoaderSnafu {
237                type_name: WalPruneProcedure::TYPE_NAME,
238            })?;
239        common_runtime::spawn_global(async move {
240            self.run().await;
241        });
242        info!("WalPruneProcedureManager Started.");
243        Ok(())
244    }
245
246    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
247    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
248        tokio::sync::mpsc::channel(1024)
249    }
250
251    /// Runs the main loop. Performs actions on received events.
252    ///
253    /// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL.
254    pub(crate) async fn run(&mut self) {
255        while let Some(event) = self.receiver.recv().await {
256            match event {
257                Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
258                    error!(e; "Failed to handle tick request");
259                }),
260            }
261        }
262    }
263
264    /// Submits a [WalPruneProcedure] for the given topic name.
265    pub async fn submit_procedure(&self, topic_name: &str) -> Result<ProcedureId> {
266        let guard = self
267            .tracker
268            .insert_running_procedure(topic_name.to_string())
269            .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
270
271        let procedure = WalPruneProcedure::new(
272            topic_name.to_string(),
273            self.wal_prune_context.clone(),
274            self.trigger_flush_threshold,
275            Some(guard),
276        );
277        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
278        let procedure_id = procedure_with_id.id;
279        METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
280            .with_label_values(&[topic_name])
281            .inc();
282        let procedure_manager = self.procedure_manager.clone();
283        let mut watcher = procedure_manager
284            .submit(procedure_with_id)
285            .await
286            .context(error::SubmitProcedureSnafu)?;
287        watcher::wait(&mut watcher)
288            .await
289            .context(error::WaitProcedureSnafu)?;
290
291        Ok(procedure_id)
292    }
293
294    async fn handle_tick_request(&self) -> Result<()> {
295        let topics = self.retrieve_sorted_topics().await?;
296        let mut tasks = Vec::with_capacity(topics.len());
297        for topic_name in topics.iter() {
298            tasks.push(async {
299                let _permit = self.semaphore.acquire().await.unwrap();
300                match self.submit_procedure(topic_name).await {
301                    Ok(_) => {}
302                    Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
303                        warn!("Prune task for topic {} is already running", topic);
304                    }
305                    Err(e) => {
306                        error!(
307                            "Failed to submit prune task for topic {}: {}",
308                            topic_name.clone(),
309                            e
310                        );
311                    }
312                }
313            });
314        }
315
316        join_all(tasks).await;
317        Ok(())
318    }
319
320    /// Retrieve topics from the table metadata manager.
321    /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
322    /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
323    async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
324        self.table_metadata_manager
325            .topic_name_manager()
326            .range()
327            .await
328            .context(error::TableMetadataManagerSnafu)
329    }
330}
331
332#[cfg(test)]
333mod test {
334    use std::assert_matches::assert_matches;
335
336    use common_meta::key::topic_name::TopicNameKey;
337    use common_wal::test_util::run_test_with_kafka_wal;
338    use tokio::time::{sleep, timeout};
339
340    use super::*;
341    use crate::procedure::wal_prune::test_util::TestEnv;
342
343    #[tokio::test]
344    async fn test_wal_prune_ticker() {
345        let (tx, mut rx) = WalPruneManager::channel();
346        let interval = Duration::from_millis(10);
347        let ticker = WalPruneTicker::new(interval, tx);
348        assert_eq!(ticker.name(), "WalPruneTicker");
349
350        for _ in 0..2 {
351            ticker.start();
352            sleep(2 * interval).await;
353            assert!(!rx.is_empty());
354            while let Ok(event) = rx.try_recv() {
355                assert_matches!(event, Event::Tick);
356            }
357        }
358        ticker.stop();
359    }
360
361    #[tokio::test]
362    async fn test_wal_prune_tracker_and_guard() {
363        let tracker = WalPruneProcedureTracker {
364            running_procedures: Arc::new(RwLock::new(HashSet::new())),
365        };
366        let topic_name = uuid::Uuid::new_v4().to_string();
367        {
368            let guard = tracker
369                .insert_running_procedure(topic_name.clone())
370                .unwrap();
371            assert_eq!(guard.topic_name, topic_name);
372            assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
373
374            let result = tracker.insert_running_procedure(topic_name.clone());
375            assert!(result.is_none());
376        }
377        assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
378    }
379
380    async fn mock_wal_prune_manager(
381        broker_endpoints: Vec<String>,
382        limit: usize,
383    ) -> (Sender<Event>, WalPruneManager) {
384        let test_env = TestEnv::new();
385        let (tx, rx) = WalPruneManager::channel();
386        let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
387        (
388            tx,
389            WalPruneManager::new(
390                test_env.table_metadata_manager.clone(),
391                limit,
392                rx,
393                test_env.procedure_manager.clone(),
394                wal_prune_context,
395                0,
396            ),
397        )
398    }
399
400    async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
401        let topic_name_keys = topics
402            .iter()
403            .map(|topic| TopicNameKey::new(topic))
404            .collect::<Vec<_>>();
405        manager
406            .table_metadata_manager
407            .topic_name_manager()
408            .batch_put(topic_name_keys)
409            .await
410            .unwrap();
411    }
412
413    #[tokio::test]
414    async fn test_wal_prune_manager() {
415        run_test_with_kafka_wal(|broker_endpoints| {
416            Box::pin(async {
417                let limit = 6;
418                let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
419                let topics = (0..limit * 2)
420                    .map(|_| uuid::Uuid::new_v4().to_string())
421                    .collect::<Vec<_>>();
422                mock_topics(&manager, &topics).await;
423
424                let tracker = manager.tracker.clone();
425                let handler =
426                    common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
427                handler.await.unwrap();
428
429                tx.send(Event::Tick).await.unwrap();
430                // Wait for at least one procedure to be submitted.
431                timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
432                    .await
433                    .unwrap();
434            })
435        })
436        .await;
437    }
438}