meta_srv/gc/
scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::tracing::Instrument as _;
23use common_telemetry::{error, info};
24use store_api::storage::{GcReport, RegionId};
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::sync::{Mutex, oneshot};
27
28use crate::cluster::MetaPeerClientRef;
29use crate::define_ticker;
30use crate::error::{Error, Result};
31use crate::gc::Region2Peers;
32use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
33use crate::gc::dropped::DroppedRegionCollector;
34use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
35use crate::gc::tracker::RegionGcTracker;
36use crate::metrics::{
37    METRIC_META_GC_SCHEDULER_CYCLES_TOTAL, METRIC_META_GC_SCHEDULER_DURATION_SECONDS,
38};
39use crate::service::mailbox::MailboxRef;
40
41/// Report for a GC job.
42#[derive(Debug)]
43pub enum GcJobReport {
44    PerDatanode {
45        per_datanode_reports: HashMap<DatanodeId, GcReport>,
46        failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
47    },
48    Combined {
49        report: GcReport,
50    },
51}
52
53impl Default for GcJobReport {
54    fn default() -> Self {
55        Self::PerDatanode {
56            per_datanode_reports: HashMap::new(),
57            failed_datanodes: HashMap::new(),
58        }
59    }
60}
61
62impl GcJobReport {
63    pub fn combined(report: GcReport) -> Self {
64        Self::Combined { report }
65    }
66
67    pub fn merge_to_report(self) -> GcReport {
68        match self {
69            GcJobReport::Combined { report } => report,
70            GcJobReport::PerDatanode {
71                per_datanode_reports,
72                ..
73            } => {
74                let mut combined = GcReport::default();
75                for (_datanode_id, report) in per_datanode_reports {
76                    combined.merge(report);
77                }
78                combined
79            }
80        }
81    }
82}
83
84/// [`Event`] represents various types of events that can be processed by the gc ticker.
85///
86/// Variants:
87/// - `Tick`: This event is used to trigger gc periodically.
88/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
89///   to send back the result for that run.
90///   Optional parameters allow specifying target regions and GC behavior.
91pub enum Event {
92    Tick,
93    Manually {
94        /// Channel sender to return the GC job report or error
95        sender: oneshot::Sender<Result<GcJobReport>>,
96        /// Optional specific region IDs to GC. If None, scheduler will select candidates automatically.
97        region_ids: Option<Vec<RegionId>>,
98        /// Optional override for full file listing. If None, uses scheduler config.
99        full_file_listing: Option<bool>,
100        /// Optional override for timeout. If None, uses scheduler config.
101        timeout: Option<Duration>,
102    },
103}
104
105#[allow(unused)]
106pub type GcTickerRef = Arc<GcTicker>;
107
108define_ticker!(
109    /// [GcTicker] is used to trigger gc periodically.
110    GcTicker,
111    event_type = Event,
112    event_value = Event::Tick
113);
114
115/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
116pub struct GcScheduler {
117    pub(crate) ctx: Arc<dyn SchedulerCtx>,
118    /// The receiver of events.
119    pub(crate) receiver: Receiver<Event>,
120    /// GC configuration.
121    pub(crate) config: GcSchedulerOptions,
122    /// Tracks the last GC time for regions.
123    pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
124    /// Last time the tracker was cleaned up.
125    pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
126}
127
128impl GcScheduler {
129    /// Creates a new [`GcScheduler`] with custom configuration.
130    pub(crate) fn new_with_config(
131        table_metadata_manager: TableMetadataManagerRef,
132        procedure_manager: ProcedureManagerRef,
133        meta_peer_client: MetaPeerClientRef,
134        mailbox: MailboxRef,
135        server_addr: String,
136        config: GcSchedulerOptions,
137    ) -> Result<(Self, GcTicker)> {
138        // Validate configuration before creating the scheduler
139        config.validate()?;
140
141        let (tx, rx) = Self::channel();
142        let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
143        let gc_trigger = Self {
144            ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
145                table_metadata_manager,
146                procedure_manager,
147                meta_peer_client,
148                mailbox,
149                server_addr,
150            )?),
151            receiver: rx,
152            config,
153            region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
154            last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
155        };
156        Ok((gc_trigger, gc_ticker))
157    }
158
159    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
160        tokio::sync::mpsc::channel(8)
161    }
162
163    /// Starts the gc trigger.
164    pub fn try_start(mut self) -> Result<()> {
165        common_runtime::spawn_global(async move { self.run().await });
166        info!("GC trigger started");
167        Ok(())
168    }
169
170    pub(crate) async fn run(&mut self) {
171        while let Some(event) = self.receiver.recv().await {
172            match event {
173                Event::Tick => {
174                    info!("Received gc tick");
175                    let span =
176                        common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "ticker");
177                    if let Err(e) = self.handle_tick().instrument(span).await {
178                        error!(e; "Failed to handle gc tick");
179                    }
180                }
181                Event::Manually {
182                    sender,
183                    region_ids,
184                    full_file_listing,
185                    timeout,
186                } => {
187                    info!("Received manually gc request");
188                    let span =
189                        common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
190                    let result = self
191                        .handle_manual_gc(region_ids, full_file_listing, timeout)
192                        .instrument(span)
193                        .await;
194                    if let Err(e) = &result {
195                        error!(e; "Failed to handle manual gc");
196                    }
197                    let _ = sender.send(result);
198                }
199            }
200        }
201    }
202
203    pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
204        METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc();
205        let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer();
206        info!("Start to trigger gc");
207        let span = common_telemetry::tracing::info_span!("meta_gc_handle_tick");
208        let report = self.trigger_gc().instrument(span).await?;
209
210        // Periodically clean up stale tracker entries
211        self.cleanup_tracker_if_needed().await?;
212
213        info!("Finished gc trigger");
214
215        Ok(report)
216    }
217
218    /// Handles a manual GC request with optional specific parameters.
219    ///
220    /// If `region_ids` is specified, GC will be performed only on those regions.
221    /// Otherwise, falls back to automatic candidate selection.
222    pub(crate) async fn handle_manual_gc(
223        &self,
224        region_ids: Option<Vec<RegionId>>,
225        full_file_listing: Option<bool>,
226        timeout: Option<Duration>,
227    ) -> Result<GcJobReport> {
228        info!("Start to handle manual gc request");
229
230        // No specific regions, use default tick behavior
231        let Some(regions) = region_ids else {
232            let report = self.trigger_gc().await?;
233            info!("Finished manual gc request");
234            return Ok(report);
235        };
236
237        // Empty regions list, return empty report
238        if regions.is_empty() {
239            info!("Finished manual gc request");
240            return Ok(GcJobReport::combined(GcReport::default()));
241        }
242
243        let full_listing = full_file_listing.unwrap_or(false);
244        let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
245
246        let region_set: HashSet<RegionId> = regions.iter().copied().collect();
247        let table_reparts = self.ctx.get_table_reparts().await?;
248        let dropped_collector =
249            DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
250        let dropped_assignment = dropped_collector
251            .collect_and_assign_with_cooldown(&table_reparts, false)
252            .await?;
253
254        let mut dropped_region_set = HashSet::new();
255        let mut dropped_routes_override = Region2Peers::new();
256        for overrides in dropped_assignment.region_routes_override.into_values() {
257            for (region_id, route) in overrides {
258                if region_set.contains(&region_id) {
259                    dropped_region_set.insert(region_id);
260                    dropped_routes_override.insert(region_id, route);
261                }
262            }
263        }
264
265        let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions
266            .into_iter()
267            .partition(|region_id| dropped_region_set.contains(region_id));
268
269        let mut combined_report = GcReport::default();
270
271        if !active_regions.is_empty() {
272            let report = self
273                .ctx
274                .gc_regions(
275                    &active_regions,
276                    full_listing,
277                    gc_timeout,
278                    Region2Peers::new(),
279                )
280                .await?;
281            combined_report.merge(report);
282        }
283
284        if !dropped_regions.is_empty() {
285            let report = self
286                .ctx
287                .gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override)
288                .await?;
289            combined_report.merge(report);
290        }
291
292        let report = GcJobReport::combined(combined_report);
293
294        info!("Finished manual gc request");
295        Ok(report)
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use std::collections::HashMap;
302    use std::time::Duration;
303
304    use common_meta::datanode::RegionStat;
305    use common_meta::key::table_repart::TableRepartValue;
306    use common_meta::key::table_route::PhysicalTableRouteValue;
307    use store_api::storage::RegionId;
308    use table::metadata::TableId;
309
310    use super::*;
311
312    struct ErrorMockSchedulerCtx;
313
314    #[async_trait::async_trait]
315    impl SchedulerCtx for ErrorMockSchedulerCtx {
316        async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
317            Ok(HashMap::new())
318        }
319
320        async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
321            Ok(vec![])
322        }
323
324        async fn get_table_route(
325            &self,
326            _table_id: TableId,
327        ) -> Result<(TableId, PhysicalTableRouteValue)> {
328            unreachable!("get_table_route should not be called in this test")
329        }
330
331        async fn batch_get_table_route(
332            &self,
333            _table_ids: &[TableId],
334        ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
335            Ok(HashMap::new())
336        }
337
338        async fn gc_regions(
339            &self,
340            _region_ids: &[RegionId],
341            _full_file_listing: bool,
342            _timeout: Duration,
343            _region_routes_override: Region2Peers,
344        ) -> Result<GcReport> {
345            crate::error::UnexpectedSnafu {
346                violated: "mock gc failure".to_string(),
347            }
348            .fail()
349        }
350    }
351
352    #[tokio::test]
353    async fn test_handle_manual_gc_propagates_error() {
354        let (tx, rx) = GcScheduler::channel();
355        drop(tx);
356
357        let scheduler = GcScheduler {
358            ctx: Arc::new(ErrorMockSchedulerCtx),
359            receiver: rx,
360            config: GcSchedulerOptions::default(),
361            region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
362            last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
363        };
364
365        let result = scheduler
366            .handle_manual_gc(
367                Some(vec![RegionId::new(1, 0)]),
368                Some(false),
369                Some(Duration::from_secs(1)),
370            )
371            .await;
372
373        assert!(result.is_err());
374    }
375}