datanode/heartbeat/handler/
gc_worker.rs1use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
16use common_telemetry::{debug, warn};
17use mito2::gc::LocalGcWorker;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::storage::{FileRefsManifest, RegionId};
20
21use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct GcRegionsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for GcRegionsHandler {
28 type Instruction = GcRegions;
29
30 async fn handle(
31 &self,
32 ctx: &HandlerContext,
33 gc_regions: Self::Instruction,
34 ) -> Option<InstructionReply> {
35 let region_ids = gc_regions.regions.clone();
36 debug!("Received gc regions instruction: {:?}", region_ids);
37
38 let (region_id, gc_worker) = match self
39 .create_gc_worker(
40 ctx,
41 region_ids,
42 &gc_regions.file_refs_manifest,
43 gc_regions.full_file_listing,
44 )
45 .await
46 {
47 Ok(worker) => worker,
48 Err(e) => {
49 return Some(InstructionReply::GcRegions(GcRegionsReply {
50 result: Err(format!("Failed to create GC worker: {}", e)),
51 }));
52 }
53 };
54
55 let register_result = ctx
56 .gc_tasks
57 .try_register(
58 region_id,
59 Box::pin(async move {
60 debug!("Starting gc worker for region {}", region_id);
61 let report = gc_worker
62 .run()
63 .await
64 .context(GcMitoEngineSnafu { region_id })?;
65 debug!("Gc worker for region {} finished", region_id);
66 Ok(report)
67 }),
68 )
69 .await;
70 if register_result.is_busy() {
71 warn!("Another gc task is running for the region: {region_id}");
72 return Some(InstructionReply::GcRegions(GcRegionsReply {
73 result: Err(format!(
74 "Another gc task is running for the region: {region_id}"
75 )),
76 }));
77 }
78 let mut watcher = register_result.into_watcher();
79 let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
80 match result {
81 Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
82 result: Ok(report),
83 })),
84 Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
85 result: Err(format!("{err:?}")),
86 })),
87 }
88 }
89}
90
91impl GcRegionsHandler {
92 async fn create_gc_worker(
95 &self,
96 ctx: &HandlerContext,
97 mut region_ids: Vec<RegionId>,
98 file_ref_manifest: &FileRefsManifest,
99 full_file_listing: bool,
100 ) -> Result<(RegionId, LocalGcWorker)> {
101 region_ids.sort_by_key(|r| r.region_number());
103
104 let mito_engine = ctx
105 .region_server
106 .mito_engine()
107 .with_context(|| UnexpectedSnafu {
108 violated: "MitoEngine not found".to_string(),
109 })?;
110
111 let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
112 msg: "No region ids provided".to_string(),
113 })?;
114
115 ensure!(
117 region_ids
118 .iter()
119 .all(|rid| mito_engine.find_region(*rid).is_some()),
120 InvalidGcArgsSnafu {
121 msg: format!(
122 "Some regions are not on current datanode:{:?}",
123 region_ids
124 .iter()
125 .filter(|rid| mito_engine.find_region(**rid).is_none())
126 .collect::<Vec<_>>()
127 ),
128 }
129 );
130
131 let access_layer = mito_engine
133 .find_region(region_id)
134 .with_context(|| InvalidGcArgsSnafu {
135 msg: format!(
136 "None of the regions is on current datanode:{:?}",
137 region_ids
138 ),
139 })?
140 .access_layer();
141
142 let mito_regions = region_ids
147 .iter()
148 .filter_map(|rid| mito_engine.find_region(*rid).map(|r| (*rid, r)))
149 .collect();
150
151 let cache_manager = mito_engine.cache_manager();
152
153 let gc_worker = LocalGcWorker::try_new(
154 access_layer.clone(),
155 Some(cache_manager),
156 mito_regions,
157 mito_engine.mito_config().gc.clone(),
158 file_ref_manifest.clone(),
159 &mito_engine.gc_limiter(),
160 full_file_listing,
161 )
162 .await
163 .context(GcMitoEngineSnafu { region_id })?;
164
165 Ok((region_id, gc_worker))
166 }
167}