1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::{Mutex, oneshot};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::metrics::{
34 METRIC_META_GC_SCHEDULER_CYCLES_TOTAL, METRIC_META_GC_SCHEDULER_DURATION_SECONDS,
35};
36use crate::service::mailbox::MailboxRef;
37
38#[derive(Debug, Default)]
40pub struct GcJobReport {
41 pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
42 pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
43}
44
45pub enum Event {
52 Tick,
53 Manually(oneshot::Sender<GcJobReport>),
54}
55
56#[allow(unused)]
57pub type GcTickerRef = Arc<GcTicker>;
58
59define_ticker!(
60 GcTicker,
62 event_type = Event,
63 event_value = Event::Tick
64);
65
66pub struct GcScheduler {
68 pub(crate) ctx: Arc<dyn SchedulerCtx>,
69 pub(crate) receiver: Receiver<Event>,
71 pub(crate) config: GcSchedulerOptions,
73 pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
75 pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
77}
78
79impl GcScheduler {
80 pub(crate) fn new_with_config(
82 table_metadata_manager: TableMetadataManagerRef,
83 procedure_manager: ProcedureManagerRef,
84 meta_peer_client: MetaPeerClientRef,
85 mailbox: MailboxRef,
86 server_addr: String,
87 config: GcSchedulerOptions,
88 ) -> Result<(Self, GcTicker)> {
89 config.validate()?;
91
92 let (tx, rx) = Self::channel();
93 let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
94 let gc_trigger = Self {
95 ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
96 table_metadata_manager,
97 procedure_manager,
98 meta_peer_client,
99 mailbox,
100 server_addr,
101 )?),
102 receiver: rx,
103 config,
104 region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
105 last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
106 };
107 Ok((gc_trigger, gc_ticker))
108 }
109
110 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
111 tokio::sync::mpsc::channel(8)
112 }
113
114 pub fn try_start(mut self) -> Result<()> {
116 common_runtime::spawn_global(async move { self.run().await });
117 info!("GC trigger started");
118 Ok(())
119 }
120
121 pub(crate) async fn run(&mut self) {
122 while let Some(event) = self.receiver.recv().await {
123 match event {
124 Event::Tick => {
125 info!("Received gc tick");
126 if let Err(e) = self.handle_tick().await {
127 error!(e; "Failed to handle gc tick");
128 }
129 }
130 Event::Manually(sender) => {
131 info!("Received manually gc request");
132 match self.handle_tick().await {
133 Ok(report) => {
134 let _ = sender.send(report);
136 }
137 Err(e) => {
138 error!(e; "Failed to handle gc tick");
139 }
140 };
141 }
142 }
143 }
144 }
145
146 pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
147 METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc();
148 let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer();
149 info!("Start to trigger gc");
150 let report = self.trigger_gc().await?;
151
152 self.cleanup_tracker_if_needed().await?;
154
155 info!("Finished gc trigger");
156
157 Ok(report)
158 }
159}