datanode/heartbeat/handler/
gc_worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
19use common_meta::key::table_info::TableInfoManager;
20use common_meta::key::table_route::TableRouteManager;
21use common_telemetry::{debug, warn};
22use mito2::access_layer::{AccessLayer, AccessLayerRef};
23use mito2::engine::MitoEngine;
24use mito2::gc::LocalGcWorker;
25use mito2::region::MitoRegionRef;
26use snafu::{OptionExt, ResultExt};
27use store_api::path_utils::table_dir;
28use store_api::region_request::PathType;
29use store_api::storage::{FileRefsManifest, GcReport, RegionId};
30use table::requests::STORAGE_KEY;
31
32use crate::error::{GcMitoEngineSnafu, GetMetadataSnafu, Result, UnexpectedSnafu};
33use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
34
35pub struct GcRegionsHandler;
36
37#[async_trait::async_trait]
38impl InstructionHandler for GcRegionsHandler {
39    type Instruction = GcRegions;
40
41    async fn handle(
42        &self,
43        ctx: &HandlerContext,
44        gc_regions: Self::Instruction,
45    ) -> Option<InstructionReply> {
46        let region_ids = gc_regions.regions.clone();
47        debug!("Received gc regions instruction: {:?}", region_ids);
48
49        if region_ids.is_empty() {
50            return Some(InstructionReply::GcRegions(GcRegionsReply {
51                result: Ok(GcReport::default()),
52            }));
53        }
54
55        // Always use the smallest region id on datanode as the target region id for task tracker
56        let mut sorted_region_ids = gc_regions.regions.clone();
57        sorted_region_ids.sort_by_key(|r| r.region_number());
58        let target_region_id = sorted_region_ids[0];
59
60        // Group regions by table_id
61        let mut table_to_regions: HashMap<u32, Vec<RegionId>> = HashMap::new();
62        for rid in region_ids {
63            table_to_regions
64                .entry(rid.table_id())
65                .or_default()
66                .push(rid);
67        }
68
69        let file_refs_manifest = gc_regions.file_refs_manifest.clone();
70        let full_file_listing = gc_regions.full_file_listing;
71
72        let ctx_clone = ctx.clone();
73        let register_result = ctx
74            .gc_tasks
75            .try_register(
76                target_region_id,
77                Box::pin(async move {
78                    let mut reports = Vec::with_capacity(table_to_regions.len());
79                    for (table_id, regions) in table_to_regions {
80                        debug!(
81                            "Starting gc worker for table {}, regions: {:?}",
82                            table_id, regions
83                        );
84                        let gc_worker = GcRegionsHandler::create_gc_worker(
85                            &ctx_clone,
86                            table_id,
87                            regions,
88                            &file_refs_manifest,
89                            full_file_listing,
90                        )
91                        .await?;
92
93                        let report = gc_worker.run().await.context(GcMitoEngineSnafu {
94                            region_id: target_region_id,
95                        })?;
96                        debug!(
97                            "Gc worker for table {} finished, report: {:?}",
98                            table_id, report
99                        );
100                        reports.push(report);
101                    }
102
103                    // Merge reports
104                    let mut merged_report = GcReport::default();
105                    for report in reports {
106                        merged_report
107                            .deleted_files
108                            .extend(report.deleted_files.into_iter());
109                        merged_report
110                            .deleted_indexes
111                            .extend(report.deleted_indexes.into_iter());
112                    }
113                    Ok(merged_report)
114                }),
115            )
116            .await;
117
118        if register_result.is_busy() {
119            warn!("Another gc task is running for the region: {target_region_id}");
120            return Some(InstructionReply::GcRegions(GcRegionsReply {
121                result: Err(format!(
122                    "Another gc task is running for the region: {target_region_id}"
123                )),
124            }));
125        }
126        let mut watcher = register_result.into_watcher();
127        let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
128        match result {
129            Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
130                result: Ok(report),
131            })),
132            Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
133                result: Err(format!("{err:?}")),
134            })),
135        }
136    }
137}
138
139impl GcRegionsHandler {
140    /// Create a GC worker for the given table and region IDs.
141    async fn create_gc_worker(
142        ctx: &HandlerContext,
143        table_id: u32,
144        region_ids: Vec<RegionId>,
145        file_ref_manifest: &FileRefsManifest,
146        full_file_listing: bool,
147    ) -> Result<LocalGcWorker> {
148        debug_assert!(!region_ids.is_empty(), "region_ids should not be empty");
149
150        let mito_engine = ctx
151            .region_server
152            .mito_engine()
153            .with_context(|| UnexpectedSnafu {
154                violated: "MitoEngine not found".to_string(),
155            })?;
156
157        let (access_layer, mito_regions) =
158            Self::get_access_layer(ctx, &mito_engine, table_id, &region_ids).await?;
159
160        let cache_manager = mito_engine.cache_manager();
161
162        let gc_worker = LocalGcWorker::try_new(
163            access_layer,
164            Some(cache_manager),
165            mito_regions,
166            mito_engine.mito_config().gc.clone(),
167            file_ref_manifest.clone(),
168            &mito_engine.gc_limiter(),
169            full_file_listing,
170        )
171        .await
172        .context(GcMitoEngineSnafu {
173            region_id: region_ids[0],
174        })?;
175
176        Ok(gc_worker)
177    }
178
179    /// Get the access layer for the given table and region IDs.
180    /// It also returns the mito regions if they are found in the engine.
181    ///
182    /// This method validates:
183    /// 1. Any found region must be a Leader (not Follower)
184    /// 2. Any missing region must not be routed to another datanode
185    ///
186    /// The AccessLayer is always constructed from table metadata for consistency.
187    async fn get_access_layer(
188        ctx: &HandlerContext,
189        mito_engine: &MitoEngine,
190        table_id: u32,
191        region_ids: &[RegionId],
192    ) -> Result<(AccessLayerRef, BTreeMap<RegionId, Option<MitoRegionRef>>)> {
193        // 1. Collect mito regions and validate Leader status
194        let mut mito_regions = BTreeMap::new();
195
196        for rid in region_ids {
197            let region = mito_engine.find_region(*rid);
198
199            if let Some(ref r) = region {
200                // Validation: Check if region is a leader
201                if r.is_follower() {
202                    return Err(UnexpectedSnafu {
203                        violated: format!(
204                            "Region {} is a follower, cannot perform GC on follower regions",
205                            rid
206                        ),
207                    }
208                    .build());
209                }
210            }
211            mito_regions.insert(*rid, region);
212        }
213
214        // 2. Validate that missing regions are not routed to other datanodes
215        let missing_regions: Vec<_> = mito_regions
216            .iter()
217            .filter(|(_, r)| r.is_none())
218            .map(|(rid, _)| *rid)
219            .collect();
220
221        if !missing_regions.is_empty() {
222            Self::validate_regions_not_routed_elsewhere(ctx, table_id, &missing_regions).await?;
223        }
224
225        // 3. Construct AccessLayer directly from table metadata
226        let access_layer = Self::construct_access_layer(ctx, mito_engine, table_id).await?;
227
228        Ok((access_layer, mito_regions))
229    }
230
231    /// Manually construct an access layer from table metadata.
232    async fn construct_access_layer(
233        ctx: &HandlerContext,
234        mito_engine: &MitoEngine,
235        table_id: u32,
236    ) -> Result<AccessLayerRef> {
237        let table_info_manager = TableInfoManager::new(ctx.kv_backend.clone());
238        let table_info_value = table_info_manager
239            .get(table_id)
240            .await
241            .context(GetMetadataSnafu)?
242            .with_context(|| UnexpectedSnafu {
243                violated: format!("Table metadata not found for table {}", table_id),
244            })?;
245
246        let table_dir = table_dir(&table_info_value.region_storage_path(), table_id);
247        let storage_name = table_info_value
248            .table_info
249            .meta
250            .options
251            .extra_options
252            .get(STORAGE_KEY);
253        let engine = &table_info_value.table_info.meta.engine;
254        let path_type = match engine.as_str() {
255            common_catalog::consts::MITO2_ENGINE => PathType::Bare,
256            common_catalog::consts::MITO_ENGINE => PathType::Bare,
257            common_catalog::consts::METRIC_ENGINE => PathType::Data,
258            _ => PathType::Bare,
259        };
260
261        let object_store = if let Some(name) = storage_name {
262            mito_engine
263                .object_store_manager()
264                .find(name)
265                .cloned()
266                .with_context(|| UnexpectedSnafu {
267                    violated: format!("Object store {} not found", name),
268                })?
269        } else {
270            mito_engine
271                .object_store_manager()
272                .default_object_store()
273                .clone()
274        };
275
276        Ok(Arc::new(AccessLayer::new(
277            table_dir,
278            path_type,
279            object_store,
280            mito_engine.puffin_manager_factory().clone(),
281            mito_engine.intermediate_manager().clone(),
282        )))
283    }
284
285    /// Validate that the given regions are not routed to other datanodes.
286    ///
287    /// If any region is still active on another datanode (has a leader_peer in route table),
288    /// this function returns an error to prevent accidental deletion of files
289    /// that are still in use.
290    async fn validate_regions_not_routed_elsewhere(
291        ctx: &HandlerContext,
292        table_id: u32,
293        missing_region_ids: &[RegionId],
294    ) -> Result<()> {
295        if missing_region_ids.is_empty() {
296            return Ok(());
297        }
298
299        let table_route_manager = TableRouteManager::new(ctx.kv_backend.clone());
300
301        // Get table route
302        let table_route = match table_route_manager
303            .table_route_storage()
304            .get(table_id)
305            .await
306            .context(GetMetadataSnafu)?
307        {
308            Some(route) => route,
309            None => {
310                // Table route not found, all regions are likely deleted
311                debug!(
312                    "Table route not found for table {}, regions {:?} are considered deleted",
313                    table_id, missing_region_ids
314                );
315                return Ok(());
316            }
317        };
318
319        // Get region routes for physical table
320        let region_routes = match table_route.region_routes() {
321            Ok(routes) => routes,
322            Err(_) => {
323                // Logical table, skip validation
324                debug!(
325                    "Table {} is a logical table, skipping region route validation",
326                    table_id
327                );
328                return Ok(());
329            }
330        };
331
332        let region_routes_map: HashMap<RegionId, _> = region_routes
333            .iter()
334            .map(|route| (route.region.id, route))
335            .collect();
336
337        // Check each missing region
338        for region_id in missing_region_ids {
339            if let Some(route) = region_routes_map.get(region_id) {
340                if let Some(leader_peer) = &route.leader_peer {
341                    // Region still has a leader on some datanode.
342                    return Err(UnexpectedSnafu {
343                        violated: format!(
344                            "Region {} is not on this datanode but is routed to datanode {}. \
345                             GC request may have been sent to wrong datanode.",
346                            region_id, leader_peer.id
347                        ),
348                    }
349                    .build());
350                }
351
352                return Err(UnexpectedSnafu {
353                    violated: format!(
354                        "Region {} has no leader in route table; refusing GC without explicit tombstone/deleted state.",
355                        region_id
356                    ),
357                }
358                .build());
359            }
360            // Region not in route table: treat as deleted and allow GC.
361        }
362
363        Ok(())
364    }
365}