datanode/heartbeat/handler/
gc_worker.rs1use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
16use common_telemetry::{debug, warn};
17use mito2::gc::LocalGcWorker;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::storage::{FileRefsManifest, RegionId};
20
21use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct GcRegionsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for GcRegionsHandler {
28 type Instruction = GcRegions;
29
30 async fn handle(
31 &self,
32 ctx: &HandlerContext,
33 gc_regions: Self::Instruction,
34 ) -> Option<InstructionReply> {
35 let region_ids = gc_regions.regions.clone();
36 debug!("Received gc regions instruction: {:?}", region_ids);
37
38 let (region_id, gc_worker) = match self
39 .create_gc_worker(
40 ctx,
41 region_ids,
42 &gc_regions.file_refs_manifest,
43 gc_regions.full_file_listing,
44 )
45 .await
46 {
47 Ok(worker) => worker,
48 Err(e) => {
49 return Some(InstructionReply::GcRegions(GcRegionsReply {
50 result: Err(format!("Failed to create GC worker: {}", e)),
51 }));
52 }
53 };
54
55 let register_result = ctx
56 .gc_tasks
57 .try_register(
58 region_id,
59 Box::pin(async move {
60 debug!("Starting gc worker for region {}", region_id);
61 let report = gc_worker
62 .run()
63 .await
64 .context(GcMitoEngineSnafu { region_id })?;
65 debug!("Gc worker for region {} finished", region_id);
66 Ok(report)
67 }),
68 )
69 .await;
70 if register_result.is_busy() {
71 warn!("Another gc task is running for the region: {region_id}");
72 return Some(InstructionReply::GcRegions(GcRegionsReply {
73 result: Err(format!(
74 "Another gc task is running for the region: {region_id}"
75 )),
76 }));
77 }
78 let mut watcher = register_result.into_watcher();
79 let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
80 match result {
81 Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
82 result: Ok(report),
83 })),
84 Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
85 result: Err(format!("{err:?}")),
86 })),
87 }
88 }
89}
90
91impl GcRegionsHandler {
92 async fn create_gc_worker(
95 &self,
96 ctx: &HandlerContext,
97 mut region_ids: Vec<RegionId>,
98 file_ref_manifest: &FileRefsManifest,
99 full_file_listing: bool,
100 ) -> Result<(RegionId, LocalGcWorker)> {
101 region_ids.sort_by_key(|r| r.region_number());
103
104 ensure!(
105 region_ids.windows(2).all(|w| {
106 let t1 = w[0].table_id();
107 let t2 = w[1].table_id();
108 t1 == t2
109 }),
110 InvalidGcArgsSnafu {
111 msg: format!(
112 "Regions to GC should belong to the same table, found: {:?}",
113 region_ids
114 ),
115 }
116 );
117
118 let mito_engine = ctx
119 .region_server
120 .mito_engine()
121 .with_context(|| UnexpectedSnafu {
122 violated: "MitoEngine not found".to_string(),
123 })?;
124
125 let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
126 msg: "No region ids provided".to_string(),
127 })?;
128
129 ensure!(
131 region_ids
132 .iter()
133 .all(|rid| mito_engine.find_region(*rid).is_some()),
134 InvalidGcArgsSnafu {
135 msg: format!(
136 "Some regions are not on current datanode:{:?}",
137 region_ids
138 .iter()
139 .filter(|rid| mito_engine.find_region(**rid).is_none())
140 .collect::<Vec<_>>()
141 ),
142 }
143 );
144
145 let access_layer = mito_engine
147 .find_region(region_id)
148 .with_context(|| InvalidGcArgsSnafu {
149 msg: format!(
150 "None of the regions is on current datanode:{:?}",
151 region_ids
152 ),
153 })?
154 .access_layer();
155
156 let mito_regions = region_ids
157 .iter()
158 .map(|rid| {
159 mito_engine
160 .find_region(*rid)
161 .map(|r| (*rid, r))
162 .with_context(|| InvalidGcArgsSnafu {
163 msg: format!("Region {} not found on datanode", rid),
164 })
165 })
166 .collect::<Result<_>>()?;
167
168 let cache_manager = mito_engine.cache_manager();
169
170 let gc_worker = LocalGcWorker::try_new(
171 access_layer.clone(),
172 Some(cache_manager),
173 mito_regions,
174 mito_engine.mito_config().gc.clone(),
175 file_ref_manifest.clone(),
176 &mito_engine.gc_limiter(),
177 full_file_listing,
178 )
179 .await
180 .context(GcMitoEngineSnafu { region_id })?;
181
182 Ok((region_id, gc_worker))
183 }
184}