datanode/heartbeat/handler/
gc_worker.rs1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
19use common_meta::key::table_info::TableInfoManager;
20use common_meta::key::table_route::TableRouteManager;
21use common_telemetry::{debug, warn};
22use mito2::access_layer::{AccessLayer, AccessLayerRef};
23use mito2::engine::MitoEngine;
24use mito2::gc::LocalGcWorker;
25use mito2::region::MitoRegionRef;
26use snafu::{OptionExt, ResultExt};
27use store_api::path_utils::table_dir;
28use store_api::region_request::PathType;
29use store_api::storage::{FileRefsManifest, GcReport, RegionId};
30use table::requests::STORAGE_KEY;
31
32use crate::error::{GcMitoEngineSnafu, GetMetadataSnafu, Result, UnexpectedSnafu};
33use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
34
35pub struct GcRegionsHandler;
36
37#[async_trait::async_trait]
38impl InstructionHandler for GcRegionsHandler {
39 type Instruction = GcRegions;
40
41 async fn handle(
42 &self,
43 ctx: &HandlerContext,
44 gc_regions: Self::Instruction,
45 ) -> Option<InstructionReply> {
46 let region_ids = gc_regions.regions.clone();
47 debug!("Received gc regions instruction: {:?}", region_ids);
48
49 if region_ids.is_empty() {
50 return Some(InstructionReply::GcRegions(GcRegionsReply {
51 result: Ok(GcReport::default()),
52 }));
53 }
54
55 let mut sorted_region_ids = gc_regions.regions.clone();
57 sorted_region_ids.sort_by_key(|r| r.region_number());
58 let target_region_id = sorted_region_ids[0];
59
60 let mut table_to_regions: HashMap<u32, Vec<RegionId>> = HashMap::new();
62 for rid in region_ids {
63 table_to_regions
64 .entry(rid.table_id())
65 .or_default()
66 .push(rid);
67 }
68
69 let file_refs_manifest = gc_regions.file_refs_manifest.clone();
70 let full_file_listing = gc_regions.full_file_listing;
71
72 let ctx_clone = ctx.clone();
73 let register_result = ctx
74 .gc_tasks
75 .try_register(
76 target_region_id,
77 Box::pin(async move {
78 let mut reports = Vec::with_capacity(table_to_regions.len());
79 for (table_id, regions) in table_to_regions {
80 debug!(
81 "Starting gc worker for table {}, regions: {:?}",
82 table_id, regions
83 );
84 let gc_worker = GcRegionsHandler::create_gc_worker(
85 &ctx_clone,
86 table_id,
87 regions,
88 &file_refs_manifest,
89 full_file_listing,
90 )
91 .await?;
92
93 let report = gc_worker.run().await.context(GcMitoEngineSnafu {
94 region_id: target_region_id,
95 })?;
96 debug!(
97 "Gc worker for table {} finished, report: {:?}",
98 table_id, report
99 );
100 reports.push(report);
101 }
102
103 let mut merged_report = GcReport::default();
105 for report in reports {
106 merged_report
107 .deleted_files
108 .extend(report.deleted_files.into_iter());
109 merged_report
110 .deleted_indexes
111 .extend(report.deleted_indexes.into_iter());
112 }
113 Ok(merged_report)
114 }),
115 )
116 .await;
117
118 if register_result.is_busy() {
119 warn!("Another gc task is running for the region: {target_region_id}");
120 return Some(InstructionReply::GcRegions(GcRegionsReply {
121 result: Err(format!(
122 "Another gc task is running for the region: {target_region_id}"
123 )),
124 }));
125 }
126 let mut watcher = register_result.into_watcher();
127 let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
128 match result {
129 Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
130 result: Ok(report),
131 })),
132 Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
133 result: Err(format!("{err:?}")),
134 })),
135 }
136 }
137}
138
139impl GcRegionsHandler {
140 async fn create_gc_worker(
142 ctx: &HandlerContext,
143 table_id: u32,
144 region_ids: Vec<RegionId>,
145 file_ref_manifest: &FileRefsManifest,
146 full_file_listing: bool,
147 ) -> Result<LocalGcWorker> {
148 debug_assert!(!region_ids.is_empty(), "region_ids should not be empty");
149
150 let mito_engine = ctx
151 .region_server
152 .mito_engine()
153 .with_context(|| UnexpectedSnafu {
154 violated: "MitoEngine not found".to_string(),
155 })?;
156
157 let (access_layer, mito_regions) =
158 Self::get_access_layer(ctx, &mito_engine, table_id, ®ion_ids).await?;
159
160 let cache_manager = mito_engine.cache_manager();
161
162 let gc_worker = LocalGcWorker::try_new(
163 access_layer,
164 Some(cache_manager),
165 mito_regions,
166 mito_engine.mito_config().gc.clone(),
167 file_ref_manifest.clone(),
168 &mito_engine.gc_limiter(),
169 full_file_listing,
170 )
171 .await
172 .context(GcMitoEngineSnafu {
173 region_id: region_ids[0],
174 })?;
175
176 Ok(gc_worker)
177 }
178
179 async fn get_access_layer(
188 ctx: &HandlerContext,
189 mito_engine: &MitoEngine,
190 table_id: u32,
191 region_ids: &[RegionId],
192 ) -> Result<(AccessLayerRef, BTreeMap<RegionId, Option<MitoRegionRef>>)> {
193 let mut mito_regions = BTreeMap::new();
195
196 for rid in region_ids {
197 let region = mito_engine.find_region(*rid);
198
199 if let Some(ref r) = region {
200 if r.is_follower() {
202 return Err(UnexpectedSnafu {
203 violated: format!(
204 "Region {} is a follower, cannot perform GC on follower regions",
205 rid
206 ),
207 }
208 .build());
209 }
210 }
211 mito_regions.insert(*rid, region);
212 }
213
214 let missing_regions: Vec<_> = mito_regions
216 .iter()
217 .filter(|(_, r)| r.is_none())
218 .map(|(rid, _)| *rid)
219 .collect();
220
221 if !missing_regions.is_empty() {
222 Self::validate_regions_not_routed_elsewhere(ctx, table_id, &missing_regions).await?;
223 }
224
225 let access_layer = Self::construct_access_layer(ctx, mito_engine, table_id).await?;
227
228 Ok((access_layer, mito_regions))
229 }
230
231 async fn construct_access_layer(
233 ctx: &HandlerContext,
234 mito_engine: &MitoEngine,
235 table_id: u32,
236 ) -> Result<AccessLayerRef> {
237 let table_info_manager = TableInfoManager::new(ctx.kv_backend.clone());
238 let table_info_value = table_info_manager
239 .get(table_id)
240 .await
241 .context(GetMetadataSnafu)?
242 .with_context(|| UnexpectedSnafu {
243 violated: format!("Table metadata not found for table {}", table_id),
244 })?;
245
246 let table_dir = table_dir(&table_info_value.region_storage_path(), table_id);
247 let storage_name = table_info_value
248 .table_info
249 .meta
250 .options
251 .extra_options
252 .get(STORAGE_KEY);
253 let engine = &table_info_value.table_info.meta.engine;
254 let path_type = match engine.as_str() {
255 common_catalog::consts::MITO2_ENGINE => PathType::Bare,
256 common_catalog::consts::MITO_ENGINE => PathType::Bare,
257 common_catalog::consts::METRIC_ENGINE => PathType::Data,
258 _ => PathType::Bare,
259 };
260
261 let object_store = if let Some(name) = storage_name {
262 mito_engine
263 .object_store_manager()
264 .find(name)
265 .cloned()
266 .with_context(|| UnexpectedSnafu {
267 violated: format!("Object store {} not found", name),
268 })?
269 } else {
270 mito_engine
271 .object_store_manager()
272 .default_object_store()
273 .clone()
274 };
275
276 Ok(Arc::new(AccessLayer::new(
277 table_dir,
278 path_type,
279 object_store,
280 mito_engine.puffin_manager_factory().clone(),
281 mito_engine.intermediate_manager().clone(),
282 )))
283 }
284
285 async fn validate_regions_not_routed_elsewhere(
291 ctx: &HandlerContext,
292 table_id: u32,
293 missing_region_ids: &[RegionId],
294 ) -> Result<()> {
295 if missing_region_ids.is_empty() {
296 return Ok(());
297 }
298
299 let table_route_manager = TableRouteManager::new(ctx.kv_backend.clone());
300
301 let table_route = match table_route_manager
303 .table_route_storage()
304 .get(table_id)
305 .await
306 .context(GetMetadataSnafu)?
307 {
308 Some(route) => route,
309 None => {
310 debug!(
312 "Table route not found for table {}, regions {:?} are considered deleted",
313 table_id, missing_region_ids
314 );
315 return Ok(());
316 }
317 };
318
319 let region_routes = match table_route.region_routes() {
321 Ok(routes) => routes,
322 Err(_) => {
323 debug!(
325 "Table {} is a logical table, skipping region route validation",
326 table_id
327 );
328 return Ok(());
329 }
330 };
331
332 let region_routes_map: HashMap<RegionId, _> = region_routes
333 .iter()
334 .map(|route| (route.region.id, route))
335 .collect();
336
337 for region_id in missing_region_ids {
339 if let Some(route) = region_routes_map.get(region_id) {
340 if let Some(leader_peer) = &route.leader_peer {
341 return Err(UnexpectedSnafu {
343 violated: format!(
344 "Region {} is not on this datanode but is routed to datanode {}. \
345 GC request may have been sent to wrong datanode.",
346 region_id, leader_peer.id
347 ),
348 }
349 .build());
350 }
351
352 return Err(UnexpectedSnafu {
353 violated: format!(
354 "Region {} has no leader in route table; refusing GC without explicit tombstone/deleted state.",
355 region_id
356 ),
357 }
358 .build());
359 }
360 }
362
363 Ok(())
364 }
365}