1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::{Mutex, oneshot};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::service::mailbox::MailboxRef;
34
35#[derive(Debug, Default)]
37pub struct GcJobReport {
38 pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
39 pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
40}
41
42pub enum Event {
49 Tick,
50 Manually(oneshot::Sender<GcJobReport>),
51}
52
53#[allow(unused)]
54pub type GcTickerRef = Arc<GcTicker>;
55
56define_ticker!(
57 GcTicker,
59 event_type = Event,
60 event_value = Event::Tick
61);
62
63pub struct GcScheduler {
65 pub(crate) ctx: Arc<dyn SchedulerCtx>,
66 pub(crate) receiver: Receiver<Event>,
68 pub(crate) config: GcSchedulerOptions,
70 pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
72 pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
74}
75
76impl GcScheduler {
77 pub(crate) fn new_with_config(
79 table_metadata_manager: TableMetadataManagerRef,
80 procedure_manager: ProcedureManagerRef,
81 meta_peer_client: MetaPeerClientRef,
82 mailbox: MailboxRef,
83 server_addr: String,
84 config: GcSchedulerOptions,
85 ) -> Result<(Self, GcTicker)> {
86 config.validate()?;
88
89 let (tx, rx) = Self::channel();
90 let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
91 let gc_trigger = Self {
92 ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
93 table_metadata_manager,
94 procedure_manager,
95 meta_peer_client,
96 mailbox,
97 server_addr,
98 )?),
99 receiver: rx,
100 config,
101 region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
102 last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
103 };
104 Ok((gc_trigger, gc_ticker))
105 }
106
107 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
108 tokio::sync::mpsc::channel(8)
109 }
110
111 pub fn try_start(mut self) -> Result<()> {
113 common_runtime::spawn_global(async move { self.run().await });
114 info!("GC trigger started");
115 Ok(())
116 }
117
118 pub(crate) async fn run(&mut self) {
119 while let Some(event) = self.receiver.recv().await {
120 match event {
121 Event::Tick => {
122 info!("Received gc tick");
123 if let Err(e) = self.handle_tick().await {
124 error!(e; "Failed to handle gc tick");
125 }
126 }
127 Event::Manually(sender) => {
128 info!("Received manually gc request");
129 match self.handle_tick().await {
130 Ok(report) => {
131 let _ = sender.send(report);
133 }
134 Err(e) => {
135 error!(e; "Failed to handle gc tick");
136 }
137 };
138 }
139 }
140 }
141 }
142
143 pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
144 info!("Start to trigger gc");
145 let report = self.trigger_gc().await?;
146
147 self.cleanup_tracker_if_needed().await?;
149
150 info!("Finished gc trigger");
151
152 Ok(report)
153 }
154}