datanode/heartbeat/handler/
gc_worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
16use common_telemetry::{debug, warn};
17use mito2::gc::LocalGcWorker;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::storage::{FileRefsManifest, RegionId};
20
21use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct GcRegionsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for GcRegionsHandler {
28    type Instruction = GcRegions;
29
30    async fn handle(
31        &self,
32        ctx: &HandlerContext,
33        gc_regions: Self::Instruction,
34    ) -> Option<InstructionReply> {
35        let region_ids = gc_regions.regions.clone();
36        debug!("Received gc regions instruction: {:?}", region_ids);
37
38        let (region_id, gc_worker) = match self
39            .create_gc_worker(
40                ctx,
41                region_ids,
42                &gc_regions.file_refs_manifest,
43                gc_regions.full_file_listing,
44            )
45            .await
46        {
47            Ok(worker) => worker,
48            Err(e) => {
49                return Some(InstructionReply::GcRegions(GcRegionsReply {
50                    result: Err(format!("Failed to create GC worker: {}", e)),
51                }));
52            }
53        };
54
55        let register_result = ctx
56            .gc_tasks
57            .try_register(
58                region_id,
59                Box::pin(async move {
60                    debug!("Starting gc worker for region {}", region_id);
61                    let report = gc_worker
62                        .run()
63                        .await
64                        .context(GcMitoEngineSnafu { region_id })?;
65                    debug!("Gc worker for region {} finished", region_id);
66                    Ok(report)
67                }),
68            )
69            .await;
70        if register_result.is_busy() {
71            warn!("Another gc task is running for the region: {region_id}");
72            return Some(InstructionReply::GcRegions(GcRegionsReply {
73                result: Err(format!(
74                    "Another gc task is running for the region: {region_id}"
75                )),
76            }));
77        }
78        let mut watcher = register_result.into_watcher();
79        let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
80        match result {
81            Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
82                result: Ok(report),
83            })),
84            Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
85                result: Err(format!("{err:?}")),
86            })),
87        }
88    }
89}
90
91impl GcRegionsHandler {
92    /// Create a GC worker for the given region IDs.
93    /// Return the first region ID(after sort by given region id) and the GC worker.
94    async fn create_gc_worker(
95        &self,
96        ctx: &HandlerContext,
97        mut region_ids: Vec<RegionId>,
98        file_ref_manifest: &FileRefsManifest,
99        full_file_listing: bool,
100    ) -> Result<(RegionId, LocalGcWorker)> {
101        // always use the smallest region id on datanode as the target region id
102        region_ids.sort_by_key(|r| r.region_number());
103
104        let mito_engine = ctx
105            .region_server
106            .mito_engine()
107            .with_context(|| UnexpectedSnafu {
108                violated: "MitoEngine not found".to_string(),
109            })?;
110
111        let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
112            msg: "No region ids provided".to_string(),
113        })?;
114
115        // also need to ensure all regions are on this datanode
116        ensure!(
117            region_ids
118                .iter()
119                .all(|rid| mito_engine.find_region(*rid).is_some()),
120            InvalidGcArgsSnafu {
121                msg: format!(
122                    "Some regions are not on current datanode:{:?}",
123                    region_ids
124                        .iter()
125                        .filter(|rid| mito_engine.find_region(**rid).is_none())
126                        .collect::<Vec<_>>()
127                ),
128            }
129        );
130
131        // Find the access layer from one of the regions that exists on this datanode
132        let access_layer = mito_engine
133            .find_region(region_id)
134            .with_context(|| InvalidGcArgsSnafu {
135                msg: format!(
136                    "None of the regions is on current datanode:{:?}",
137                    region_ids
138                ),
139            })?
140            .access_layer();
141
142        // if region happen to be dropped before this but after gc scheduler send gc instr,
143        // need to deal with it properly(it is ok for region to be dropped after GC worker started)
144        // region not found here can only be drop table/database case, since region migration is prevented by lock in gc procedure
145        // TODO(discord9): add integration test for this drop case
146        let mito_regions = region_ids
147            .iter()
148            .filter_map(|rid| mito_engine.find_region(*rid).map(|r| (*rid, r)))
149            .collect();
150
151        let cache_manager = mito_engine.cache_manager();
152
153        let gc_worker = LocalGcWorker::try_new(
154            access_layer.clone(),
155            Some(cache_manager),
156            mito_regions,
157            mito_engine.mito_config().gc.clone(),
158            file_ref_manifest.clone(),
159            &mito_engine.gc_limiter(),
160            full_file_listing,
161        )
162        .await
163        .context(GcMitoEngineSnafu { region_id })?;
164
165        Ok((region_id, gc_worker))
166    }
167}