1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use common_meta::DatanodeId;
20use common_meta::key::TableMetadataManagerRef;
21use common_procedure::ProcedureManagerRef;
22use common_telemetry::tracing::Instrument as _;
23use common_telemetry::{error, info};
24use store_api::storage::{GcReport, RegionId};
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::sync::{Mutex, oneshot};
27
28use crate::cluster::MetaPeerClientRef;
29use crate::define_ticker;
30use crate::error::{Error, Result};
31use crate::gc::Region2Peers;
32use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
33use crate::gc::dropped::DroppedRegionCollector;
34use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
35use crate::gc::tracker::RegionGcTracker;
36use crate::metrics::{
37 METRIC_META_GC_SCHEDULER_CYCLES_TOTAL, METRIC_META_GC_SCHEDULER_DURATION_SECONDS,
38};
39use crate::service::mailbox::MailboxRef;
40
41#[derive(Debug)]
43pub enum GcJobReport {
44 PerDatanode {
45 per_datanode_reports: HashMap<DatanodeId, GcReport>,
46 failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
47 },
48 Combined {
49 report: GcReport,
50 },
51}
52
53impl Default for GcJobReport {
54 fn default() -> Self {
55 Self::PerDatanode {
56 per_datanode_reports: HashMap::new(),
57 failed_datanodes: HashMap::new(),
58 }
59 }
60}
61
62impl GcJobReport {
63 pub fn combined(report: GcReport) -> Self {
64 Self::Combined { report }
65 }
66
67 pub fn merge_to_report(self) -> GcReport {
68 match self {
69 GcJobReport::Combined { report } => report,
70 GcJobReport::PerDatanode {
71 per_datanode_reports,
72 ..
73 } => {
74 let mut combined = GcReport::default();
75 for (_datanode_id, report) in per_datanode_reports {
76 combined.merge(report);
77 }
78 combined
79 }
80 }
81 }
82}
83
84pub enum Event {
92 Tick,
93 Manually {
94 sender: oneshot::Sender<Result<GcJobReport>>,
96 region_ids: Option<Vec<RegionId>>,
98 full_file_listing: Option<bool>,
100 timeout: Option<Duration>,
102 },
103}
104
105#[allow(unused)]
106pub type GcTickerRef = Arc<GcTicker>;
107
108define_ticker!(
109 GcTicker,
111 event_type = Event,
112 event_value = Event::Tick
113);
114
115pub struct GcScheduler {
117 pub(crate) ctx: Arc<dyn SchedulerCtx>,
118 pub(crate) receiver: Receiver<Event>,
120 pub(crate) config: GcSchedulerOptions,
122 pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
124 pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
126}
127
128impl GcScheduler {
129 pub(crate) fn new_with_config(
131 table_metadata_manager: TableMetadataManagerRef,
132 procedure_manager: ProcedureManagerRef,
133 meta_peer_client: MetaPeerClientRef,
134 mailbox: MailboxRef,
135 server_addr: String,
136 config: GcSchedulerOptions,
137 ) -> Result<(Self, GcTicker)> {
138 config.validate()?;
140
141 let (tx, rx) = Self::channel();
142 let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
143 let gc_trigger = Self {
144 ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
145 table_metadata_manager,
146 procedure_manager,
147 meta_peer_client,
148 mailbox,
149 server_addr,
150 )?),
151 receiver: rx,
152 config,
153 region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
154 last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
155 };
156 Ok((gc_trigger, gc_ticker))
157 }
158
159 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
160 tokio::sync::mpsc::channel(8)
161 }
162
163 pub fn try_start(mut self) -> Result<()> {
165 common_runtime::spawn_global(async move { self.run().await });
166 info!("GC trigger started");
167 Ok(())
168 }
169
170 pub(crate) async fn run(&mut self) {
171 while let Some(event) = self.receiver.recv().await {
172 match event {
173 Event::Tick => {
174 info!("Received gc tick");
175 let span =
176 common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "ticker");
177 if let Err(e) = self.handle_tick().instrument(span).await {
178 error!(e; "Failed to handle gc tick");
179 }
180 }
181 Event::Manually {
182 sender,
183 region_ids,
184 full_file_listing,
185 timeout,
186 } => {
187 info!("Received manually gc request");
188 let span =
189 common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
190 let result = self
191 .handle_manual_gc(region_ids, full_file_listing, timeout)
192 .instrument(span)
193 .await;
194 if let Err(e) = &result {
195 error!(e; "Failed to handle manual gc");
196 }
197 let _ = sender.send(result);
198 }
199 }
200 }
201 }
202
203 pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
204 METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc();
205 let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer();
206 info!("Start to trigger gc");
207 let span = common_telemetry::tracing::info_span!("meta_gc_handle_tick");
208 let report = self.trigger_gc().instrument(span).await?;
209
210 self.cleanup_tracker_if_needed().await?;
212
213 info!("Finished gc trigger");
214
215 Ok(report)
216 }
217
218 pub(crate) async fn handle_manual_gc(
223 &self,
224 region_ids: Option<Vec<RegionId>>,
225 full_file_listing: Option<bool>,
226 timeout: Option<Duration>,
227 ) -> Result<GcJobReport> {
228 info!("Start to handle manual gc request");
229
230 let Some(regions) = region_ids else {
232 let report = self.trigger_gc().await?;
233 info!("Finished manual gc request");
234 return Ok(report);
235 };
236
237 if regions.is_empty() {
239 info!("Finished manual gc request");
240 return Ok(GcJobReport::combined(GcReport::default()));
241 }
242
243 let full_listing = full_file_listing.unwrap_or(false);
244 let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
245
246 let region_set: HashSet<RegionId> = regions.iter().copied().collect();
247 let table_reparts = self.ctx.get_table_reparts().await?;
248 let dropped_collector =
249 DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
250 let dropped_assignment = dropped_collector
251 .collect_and_assign_with_cooldown(&table_reparts, false)
252 .await?;
253
254 let mut dropped_region_set = HashSet::new();
255 let mut dropped_routes_override = Region2Peers::new();
256 for overrides in dropped_assignment.region_routes_override.into_values() {
257 for (region_id, route) in overrides {
258 if region_set.contains(®ion_id) {
259 dropped_region_set.insert(region_id);
260 dropped_routes_override.insert(region_id, route);
261 }
262 }
263 }
264
265 let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions
266 .into_iter()
267 .partition(|region_id| dropped_region_set.contains(region_id));
268
269 let mut combined_report = GcReport::default();
270
271 if !active_regions.is_empty() {
272 let report = self
273 .ctx
274 .gc_regions(
275 &active_regions,
276 full_listing,
277 gc_timeout,
278 Region2Peers::new(),
279 )
280 .await?;
281 combined_report.merge(report);
282 }
283
284 if !dropped_regions.is_empty() {
285 let report = self
286 .ctx
287 .gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override)
288 .await?;
289 combined_report.merge(report);
290 }
291
292 let report = GcJobReport::combined(combined_report);
293
294 info!("Finished manual gc request");
295 Ok(report)
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use std::collections::HashMap;
302 use std::time::Duration;
303
304 use common_meta::datanode::RegionStat;
305 use common_meta::key::table_repart::TableRepartValue;
306 use common_meta::key::table_route::PhysicalTableRouteValue;
307 use store_api::storage::RegionId;
308 use table::metadata::TableId;
309
310 use super::*;
311
312 struct ErrorMockSchedulerCtx;
313
314 #[async_trait::async_trait]
315 impl SchedulerCtx for ErrorMockSchedulerCtx {
316 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
317 Ok(HashMap::new())
318 }
319
320 async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
321 Ok(vec![])
322 }
323
324 async fn get_table_route(
325 &self,
326 _table_id: TableId,
327 ) -> Result<(TableId, PhysicalTableRouteValue)> {
328 unreachable!("get_table_route should not be called in this test")
329 }
330
331 async fn batch_get_table_route(
332 &self,
333 _table_ids: &[TableId],
334 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
335 Ok(HashMap::new())
336 }
337
338 async fn gc_regions(
339 &self,
340 _region_ids: &[RegionId],
341 _full_file_listing: bool,
342 _timeout: Duration,
343 _region_routes_override: Region2Peers,
344 ) -> Result<GcReport> {
345 crate::error::UnexpectedSnafu {
346 violated: "mock gc failure".to_string(),
347 }
348 .fail()
349 }
350 }
351
352 #[tokio::test]
353 async fn test_handle_manual_gc_propagates_error() {
354 let (tx, rx) = GcScheduler::channel();
355 drop(tx);
356
357 let scheduler = GcScheduler {
358 ctx: Arc::new(ErrorMockSchedulerCtx),
359 receiver: rx,
360 config: GcSchedulerOptions::default(),
361 region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
362 last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
363 };
364
365 let result = scheduler
366 .handle_manual_gc(
367 Some(vec![RegionId::new(1, 0)]),
368 Some(false),
369 Some(Duration::from_secs(1)),
370 )
371 .await;
372
373 assert!(result.is_err());
374 }
375}