1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::{error, info};
23use store_api::storage::GcReport;
24use tokio::sync::Mutex;
25use tokio::sync::mpsc::{Receiver, Sender};
26
27use crate::cluster::MetaPeerClientRef;
28use crate::define_ticker;
29use crate::error::{Error, Result};
30use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
31use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
32use crate::gc::tracker::RegionGcTracker;
33use crate::service::mailbox::MailboxRef;
34
35#[derive(Debug, Default)]
37pub struct GcJobReport {
38 pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
39 pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
40}
41impl GcJobReport {
42 pub fn merge(&mut self, mut other: GcJobReport) {
43 for (dn_id, report) in other.per_datanode_reports {
45 let self_report = self.per_datanode_reports.entry(dn_id).or_default();
46 self_report.merge(report);
47 }
48 let all_failed_dn_ids = self
49 .failed_datanodes
50 .keys()
51 .cloned()
52 .chain(other.failed_datanodes.keys().cloned())
53 .collect::<HashSet<_>>();
54 for dn_id in all_failed_dn_ids {
55 let entry = self.failed_datanodes.entry(dn_id).or_default();
56 if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) {
57 entry.extend(other_errors);
58 }
59 }
60 self.failed_datanodes
61 .retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id));
62 }
63}
64
65pub(crate) enum Event {
70 Tick,
71}
72
73#[allow(unused)]
74pub(crate) type GcTickerRef = Arc<GcTicker>;
75
76define_ticker!(
77 GcTicker,
79 event_type = Event,
80 event_value = Event::Tick
81);
82
83pub struct GcScheduler {
85 pub(crate) ctx: Arc<dyn SchedulerCtx>,
86 pub(crate) receiver: Receiver<Event>,
88 pub(crate) config: GcSchedulerOptions,
90 pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
92 pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
94}
95
96impl GcScheduler {
97 pub(crate) fn new_with_config(
99 table_metadata_manager: TableMetadataManagerRef,
100 procedure_manager: ProcedureManagerRef,
101 meta_peer_client: MetaPeerClientRef,
102 mailbox: MailboxRef,
103 server_addr: String,
104 config: GcSchedulerOptions,
105 ) -> Result<(Self, GcTicker)> {
106 config.validate()?;
108
109 let (tx, rx) = Self::channel();
110 let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
111 let gc_trigger = Self {
112 ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
113 table_metadata_manager,
114 procedure_manager,
115 meta_peer_client,
116 mailbox,
117 server_addr,
118 )?),
119 receiver: rx,
120 config,
121 region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
122 last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
123 };
124 Ok((gc_trigger, gc_ticker))
125 }
126
127 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
128 tokio::sync::mpsc::channel(8)
129 }
130
131 pub fn try_start(mut self) -> Result<()> {
133 common_runtime::spawn_global(async move { self.run().await });
134 info!("GC trigger started");
135 Ok(())
136 }
137
138 pub(crate) async fn run(&mut self) {
139 while let Some(event) = self.receiver.recv().await {
140 match event {
141 Event::Tick => {
142 info!("Received gc tick");
143 if let Err(e) = self.handle_tick().await {
144 error!("Failed to handle gc tick: {}", e);
145 }
146 }
147 }
148 }
149 }
150
151 pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
152 info!("Start to trigger gc");
153 let report = self.trigger_gc().await?;
154
155 self.cleanup_tracker_if_needed().await?;
157
158 info!("Finished gc trigger");
159
160 Ok(report)
161 }
162}