datanode/heartbeat/handler/
gc_worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
16use common_telemetry::{debug, warn};
17use mito2::gc::LocalGcWorker;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::storage::{FileRefsManifest, RegionId};
20
21use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct GcRegionsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for GcRegionsHandler {
28    type Instruction = GcRegions;
29
30    async fn handle(
31        &self,
32        ctx: &HandlerContext,
33        gc_regions: Self::Instruction,
34    ) -> Option<InstructionReply> {
35        let region_ids = gc_regions.regions.clone();
36        debug!("Received gc regions instruction: {:?}", region_ids);
37
38        let (region_id, gc_worker) = match self
39            .create_gc_worker(
40                ctx,
41                region_ids,
42                &gc_regions.file_refs_manifest,
43                gc_regions.full_file_listing,
44            )
45            .await
46        {
47            Ok(worker) => worker,
48            Err(e) => {
49                return Some(InstructionReply::GcRegions(GcRegionsReply {
50                    result: Err(format!("Failed to create GC worker: {}", e)),
51                }));
52            }
53        };
54
55        let register_result = ctx
56            .gc_tasks
57            .try_register(
58                region_id,
59                Box::pin(async move {
60                    debug!("Starting gc worker for region {}", region_id);
61                    let report = gc_worker
62                        .run()
63                        .await
64                        .context(GcMitoEngineSnafu { region_id })?;
65                    debug!("Gc worker for region {} finished", region_id);
66                    Ok(report)
67                }),
68            )
69            .await;
70        if register_result.is_busy() {
71            warn!("Another gc task is running for the region: {region_id}");
72            return Some(InstructionReply::GcRegions(GcRegionsReply {
73                result: Err(format!(
74                    "Another gc task is running for the region: {region_id}"
75                )),
76            }));
77        }
78        let mut watcher = register_result.into_watcher();
79        let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
80        match result {
81            Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
82                result: Ok(report),
83            })),
84            Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
85                result: Err(format!("{err:?}")),
86            })),
87        }
88    }
89}
90
91impl GcRegionsHandler {
92    /// Create a GC worker for the given region IDs.
93    /// Return the first region ID(after sort by given region id) and the GC worker.
94    async fn create_gc_worker(
95        &self,
96        ctx: &HandlerContext,
97        mut region_ids: Vec<RegionId>,
98        file_ref_manifest: &FileRefsManifest,
99        full_file_listing: bool,
100    ) -> Result<(RegionId, LocalGcWorker)> {
101        // always use the smallest region id on datanode as the target region id
102        region_ids.sort_by_key(|r| r.region_number());
103
104        ensure!(
105            region_ids.windows(2).all(|w| {
106                let t1 = w[0].table_id();
107                let t2 = w[1].table_id();
108                t1 == t2
109            }),
110            InvalidGcArgsSnafu {
111                msg: format!(
112                    "Regions to GC should belong to the same table, found: {:?}",
113                    region_ids
114                ),
115            }
116        );
117
118        let mito_engine = ctx
119            .region_server
120            .mito_engine()
121            .with_context(|| UnexpectedSnafu {
122                violated: "MitoEngine not found".to_string(),
123            })?;
124
125        let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
126            msg: "No region ids provided".to_string(),
127        })?;
128
129        // also need to ensure all regions are on this datanode
130        ensure!(
131            region_ids
132                .iter()
133                .all(|rid| mito_engine.find_region(*rid).is_some()),
134            InvalidGcArgsSnafu {
135                msg: format!(
136                    "Some regions are not on current datanode:{:?}",
137                    region_ids
138                        .iter()
139                        .filter(|rid| mito_engine.find_region(**rid).is_none())
140                        .collect::<Vec<_>>()
141                ),
142            }
143        );
144
145        // Find the access layer from one of the regions that exists on this datanode
146        let access_layer = mito_engine
147            .find_region(region_id)
148            .with_context(|| InvalidGcArgsSnafu {
149                msg: format!(
150                    "None of the regions is on current datanode:{:?}",
151                    region_ids
152                ),
153            })?
154            .access_layer();
155
156        let mito_regions = region_ids
157            .iter()
158            .map(|rid| {
159                mito_engine
160                    .find_region(*rid)
161                    .map(|r| (*rid, r))
162                    .with_context(|| InvalidGcArgsSnafu {
163                        msg: format!("Region {} not found on datanode", rid),
164                    })
165            })
166            .collect::<Result<_>>()?;
167
168        let cache_manager = mito_engine.cache_manager();
169
170        let gc_worker = LocalGcWorker::try_new(
171            access_layer.clone(),
172            Some(cache_manager),
173            mito_regions,
174            mito_engine.mito_config().gc.clone(),
175            file_ref_manifest.clone(),
176            &mito_engine.gc_limiter(),
177            full_file_listing,
178        )
179        .await
180        .context(GcMitoEngineSnafu { region_id })?;
181
182        Ok((region_id, gc_worker))
183    }
184}