meta_srv/gc/
scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::{Mutex, oneshot};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::metrics::{
34    METRIC_META_GC_SCHEDULER_CYCLES_TOTAL, METRIC_META_GC_SCHEDULER_DURATION_SECONDS,
35};
36use crate::service::mailbox::MailboxRef;
37
38/// Report for a GC job.
39#[derive(Debug, Default)]
40pub struct GcJobReport {
41    pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
42    pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
43}
44
45/// [`Event`] represents various types of events that can be processed by the gc ticker.
46///
47/// Variants:
48/// - `Tick`: This event is used to trigger gc periodically.
49/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
50///   to send back the [`GcJobReport`] for that run.
51pub enum Event {
52    Tick,
53    Manually(oneshot::Sender<GcJobReport>),
54}
55
56#[allow(unused)]
57pub type GcTickerRef = Arc<GcTicker>;
58
59define_ticker!(
60    /// [GcTicker] is used to trigger gc periodically.
61    GcTicker,
62    event_type = Event,
63    event_value = Event::Tick
64);
65
66/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
67pub struct GcScheduler {
68    pub(crate) ctx: Arc<dyn SchedulerCtx>,
69    /// The receiver of events.
70    pub(crate) receiver: Receiver<Event>,
71    /// GC configuration.
72    pub(crate) config: GcSchedulerOptions,
73    /// Tracks the last GC time for regions.
74    pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
75    /// Last time the tracker was cleaned up.
76    pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
77}
78
79impl GcScheduler {
80    /// Creates a new [`GcScheduler`] with custom configuration.
81    pub(crate) fn new_with_config(
82        table_metadata_manager: TableMetadataManagerRef,
83        procedure_manager: ProcedureManagerRef,
84        meta_peer_client: MetaPeerClientRef,
85        mailbox: MailboxRef,
86        server_addr: String,
87        config: GcSchedulerOptions,
88    ) -> Result<(Self, GcTicker)> {
89        // Validate configuration before creating the scheduler
90        config.validate()?;
91
92        let (tx, rx) = Self::channel();
93        let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
94        let gc_trigger = Self {
95            ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
96                table_metadata_manager,
97                procedure_manager,
98                meta_peer_client,
99                mailbox,
100                server_addr,
101            )?),
102            receiver: rx,
103            config,
104            region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
105            last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
106        };
107        Ok((gc_trigger, gc_ticker))
108    }
109
110    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
111        tokio::sync::mpsc::channel(8)
112    }
113
114    /// Starts the gc trigger.
115    pub fn try_start(mut self) -> Result<()> {
116        common_runtime::spawn_global(async move { self.run().await });
117        info!("GC trigger started");
118        Ok(())
119    }
120
121    pub(crate) async fn run(&mut self) {
122        while let Some(event) = self.receiver.recv().await {
123            match event {
124                Event::Tick => {
125                    info!("Received gc tick");
126                    if let Err(e) = self.handle_tick().await {
127                        error!(e; "Failed to handle gc tick");
128                    }
129                }
130                Event::Manually(sender) => {
131                    info!("Received manually gc request");
132                    match self.handle_tick().await {
133                        Ok(report) => {
134                            // ignore error
135                            let _ = sender.send(report);
136                        }
137                        Err(e) => {
138                            error!(e; "Failed to handle gc tick");
139                        }
140                    };
141                }
142            }
143        }
144    }
145
146    pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
147        METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc();
148        let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer();
149        info!("Start to trigger gc");
150        let report = self.trigger_gc().await?;
151
152        // Periodically clean up stale tracker entries
153        self.cleanup_tracker_if_needed().await?;
154
155        info!("Finished gc trigger");
156
157        Ok(report)
158    }
159}