meta_srv/gc/
scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::Mutex;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::service::mailbox::MailboxRef;
34
35/// Report for a GC job.
36#[derive(Debug, Default)]
37pub struct GcJobReport {
38    pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
39    pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
40}
41impl GcJobReport {
42    pub fn merge(&mut self, mut other: GcJobReport) {
43        // merge per_datanode_reports&failed_datanodes
44        for (dn_id, report) in other.per_datanode_reports {
45            let self_report = self.per_datanode_reports.entry(dn_id).or_default();
46            self_report.merge(report);
47        }
48        let all_failed_dn_ids = self
49            .failed_datanodes
50            .keys()
51            .cloned()
52            .chain(other.failed_datanodes.keys().cloned())
53            .collect::<HashSet<_>>();
54        for dn_id in all_failed_dn_ids {
55            let entry = self.failed_datanodes.entry(dn_id).or_default();
56            if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) {
57                entry.extend(other_errors);
58            }
59        }
60        self.failed_datanodes
61            .retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id));
62    }
63}
64
65/// [`Event`] represents various types of events that can be processed by the gc ticker.
66///
67/// Variants:
68/// - `Tick`: This event is used to trigger gc periodically.
69pub(crate) enum Event {
70    Tick,
71}
72
73#[allow(unused)]
74pub(crate) type GcTickerRef = Arc<GcTicker>;
75
76define_ticker!(
77    /// [GcTicker] is used to trigger gc periodically.
78    GcTicker,
79    event_type = Event,
80    event_value = Event::Tick
81);
82
83/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
84pub struct GcScheduler {
85    pub(crate) ctx: Arc<dyn SchedulerCtx>,
86    /// The receiver of events.
87    pub(crate) receiver: Receiver<Event>,
88    /// GC configuration.
89    pub(crate) config: GcSchedulerOptions,
90    /// Tracks the last GC time for regions.
91    pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
92    /// Last time the tracker was cleaned up.
93    pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
94}
95
96impl GcScheduler {
97    /// Creates a new [`GcScheduler`] with custom configuration.
98    pub(crate) fn new_with_config(
99        table_metadata_manager: TableMetadataManagerRef,
100        procedure_manager: ProcedureManagerRef,
101        meta_peer_client: MetaPeerClientRef,
102        mailbox: MailboxRef,
103        server_addr: String,
104        config: GcSchedulerOptions,
105    ) -> Result<(Self, GcTicker)> {
106        // Validate configuration before creating the scheduler
107        config.validate()?;
108
109        let (tx, rx) = Self::channel();
110        let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
111        let gc_trigger = Self {
112            ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
113                table_metadata_manager,
114                procedure_manager,
115                meta_peer_client,
116                mailbox,
117                server_addr,
118            )?),
119            receiver: rx,
120            config,
121            region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
122            last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
123        };
124        Ok((gc_trigger, gc_ticker))
125    }
126
127    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
128        tokio::sync::mpsc::channel(8)
129    }
130
131    /// Starts the gc trigger.
132    pub fn try_start(mut self) -> Result<()> {
133        common_runtime::spawn_global(async move { self.run().await });
134        info!("GC trigger started");
135        Ok(())
136    }
137
138    pub(crate) async fn run(&mut self) {
139        while let Some(event) = self.receiver.recv().await {
140            match event {
141                Event::Tick => {
142                    info!("Received gc tick");
143                    if let Err(e) = self.handle_tick().await {
144                        error!("Failed to handle gc tick: {}", e);
145                    }
146                }
147            }
148        }
149    }
150
151    pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
152        info!("Start to trigger gc");
153        let report = self.trigger_gc().await?;
154
155        // Periodically clean up stale tracker entries
156        self.cleanup_tracker_if_needed().await?;
157
158        info!("Finished gc trigger");
159
160        Ok(report)
161    }
162}