meta_srv/gc/
scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::{Mutex, oneshot};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::service::mailbox::MailboxRef;
34
35/// Report for a GC job.
36#[derive(Debug, Default)]
37pub struct GcJobReport {
38    pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
39    pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
40}
41
42/// [`Event`] represents various types of events that can be processed by the gc ticker.
43///
44/// Variants:
45/// - `Tick`: This event is used to trigger gc periodically.
46/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
47///   to send back the [`GcJobReport`] for that run.
48pub enum Event {
49    Tick,
50    Manually(oneshot::Sender<GcJobReport>),
51}
52
53#[allow(unused)]
54pub type GcTickerRef = Arc<GcTicker>;
55
56define_ticker!(
57    /// [GcTicker] is used to trigger gc periodically.
58    GcTicker,
59    event_type = Event,
60    event_value = Event::Tick
61);
62
63/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
64pub struct GcScheduler {
65    pub(crate) ctx: Arc<dyn SchedulerCtx>,
66    /// The receiver of events.
67    pub(crate) receiver: Receiver<Event>,
68    /// GC configuration.
69    pub(crate) config: GcSchedulerOptions,
70    /// Tracks the last GC time for regions.
71    pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
72    /// Last time the tracker was cleaned up.
73    pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
74}
75
76impl GcScheduler {
77    /// Creates a new [`GcScheduler`] with custom configuration.
78    pub(crate) fn new_with_config(
79        table_metadata_manager: TableMetadataManagerRef,
80        procedure_manager: ProcedureManagerRef,
81        meta_peer_client: MetaPeerClientRef,
82        mailbox: MailboxRef,
83        server_addr: String,
84        config: GcSchedulerOptions,
85    ) -> Result<(Self, GcTicker)> {
86        // Validate configuration before creating the scheduler
87        config.validate()?;
88
89        let (tx, rx) = Self::channel();
90        let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
91        let gc_trigger = Self {
92            ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
93                table_metadata_manager,
94                procedure_manager,
95                meta_peer_client,
96                mailbox,
97                server_addr,
98            )?),
99            receiver: rx,
100            config,
101            region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
102            last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
103        };
104        Ok((gc_trigger, gc_ticker))
105    }
106
107    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
108        tokio::sync::mpsc::channel(8)
109    }
110
111    /// Starts the gc trigger.
112    pub fn try_start(mut self) -> Result<()> {
113        common_runtime::spawn_global(async move { self.run().await });
114        info!("GC trigger started");
115        Ok(())
116    }
117
118    pub(crate) async fn run(&mut self) {
119        while let Some(event) = self.receiver.recv().await {
120            match event {
121                Event::Tick => {
122                    info!("Received gc tick");
123                    if let Err(e) = self.handle_tick().await {
124                        error!(e; "Failed to handle gc tick");
125                    }
126                }
127                Event::Manually(sender) => {
128                    info!("Received manually gc request");
129                    match self.handle_tick().await {
130                        Ok(report) => {
131                            // ignore error
132                            let _ = sender.send(report);
133                        }
134                        Err(e) => {
135                            error!(e; "Failed to handle gc tick");
136                        }
137                    };
138                }
139            }
140        }
141    }
142
143    pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
144        info!("Start to trigger gc");
145        let report = self.trigger_gc().await?;
146
147        // Periodically clean up stale tracker entries
148        self.cleanup_tracker_if_needed().await?;
149
150        info!("Finished gc trigger");
151
152        Ok(report)
153    }
154}