meta_srv/gc/
dropped.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dropped region GC handling.
16//!
17//! This module handles garbage collection for "dropped regions". A dropped region is:
18//! 1. Recorded in `table_repart` metadata (the only place that still remembers it)
19//! 2. No longer exists in `table_route` metadata (removed from region routing)
20//! 3. No longer present in any datanode's heartbeat (physically closed)
21//!
22//! This differs from "active regions" which exist in both metadata and heartbeats.
23//! The `table_repart` entry serves as a tombstone that tracks which regions were
24//! merged/split and need their files cleaned up.
25
26use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29use common_meta::key::table_repart::TableRepartValue;
30use common_meta::peer::Peer;
31use common_telemetry::{debug, warn};
32use store_api::storage::RegionId;
33use table::metadata::TableId;
34
35use crate::error::Result;
36use crate::gc::Region2Peers;
37use crate::gc::ctx::SchedulerCtx;
38use crate::gc::options::GcSchedulerOptions;
39use crate::gc::tracker::RegionGcTracker;
40
41/// Information about a dropped region ready for GC.
42#[derive(Debug, Clone)]
43pub(crate) struct DroppedRegionInfo {
44    pub region_id: RegionId,
45    pub table_id: TableId,
46    /// The destination regions this region was split into (if any).
47    #[allow(unused)]
48    pub dst_regions: HashSet<RegionId>,
49}
50
51/// Result of collecting and assigning dropped regions to peers.
52#[derive(Debug, Default)]
53pub(crate) struct DroppedRegionAssignment {
54    /// Dropped regions grouped by the peer responsible for GC.
55    pub regions_by_peer: HashMap<Peer, Vec<DroppedRegionInfo>>,
56    /// Regions that require full file listing (always true for dropped regions).
57    pub force_full_listing: HashMap<Peer, HashSet<RegionId>>,
58    /// Override region routes for dropped regions (they have no active route).
59    pub region_routes_override: HashMap<Peer, Region2Peers>,
60}
61
62/// Collector for dropped regions that need GC.
63pub(crate) struct DroppedRegionCollector<'a> {
64    ctx: &'a dyn SchedulerCtx,
65    config: &'a GcSchedulerOptions,
66    tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
67}
68
69impl<'a> DroppedRegionCollector<'a> {
70    pub fn new(
71        ctx: &'a dyn SchedulerCtx,
72        config: &'a GcSchedulerOptions,
73        tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
74    ) -> Self {
75        Self {
76            ctx,
77            config,
78            tracker,
79        }
80    }
81
82    /// Collect and assign dropped regions for GC.
83    ///
84    /// This method:
85    /// 1. Identifies regions in repartition metadata that are no longer active
86    /// 2. Filters out regions still in cooldown period
87    /// 3. Assigns each dropped region to an available peer for cleanup
88    pub async fn collect_and_assign(
89        &self,
90        table_reparts: &[(TableId, TableRepartValue)],
91    ) -> Result<DroppedRegionAssignment> {
92        // get active region ids for all tables involved in repartitioning
93        let active_region_ids: HashSet<RegionId> = {
94            let table_ids = table_reparts
95                .iter()
96                .map(|(table_id, _)| *table_id)
97                .collect::<Vec<_>>();
98            let mut active_region_ids = HashSet::new();
99
100            let table_routes = self.ctx.batch_get_table_route(&table_ids).await?;
101            for table_route in table_routes.values() {
102                for region in &table_route.region_routes {
103                    active_region_ids.insert(region.region.id);
104                }
105            }
106
107            active_region_ids
108        };
109
110        let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids);
111
112        if dropped_regions.is_empty() {
113            return Ok(DroppedRegionAssignment::default());
114        }
115
116        let dropped_regions = self.filter_by_cooldown(dropped_regions).await;
117
118        if dropped_regions.is_empty() {
119            return Ok(DroppedRegionAssignment::default());
120        }
121
122        self.assign_to_peers(dropped_regions).await
123    }
124
125    /// Identify dropped regions: regions in `table_repart` but not in table routes.
126    /// The `assign_to_peers` step later double check they're also absent from `table_route`.
127    fn identify_dropped_regions(
128        &self,
129        table_reparts: &[(TableId, TableRepartValue)],
130        active_region_ids: &HashSet<RegionId>,
131    ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
132        let mut dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
133            HashMap::new();
134
135        for (table_id, repart) in table_reparts {
136            if repart.src_to_dst.is_empty() {
137                continue;
138            }
139
140            let entry = dropped_regions.entry(*table_id).or_default();
141            for (src_region, dst_regions) in repart.clone().src_to_dst {
142                if !active_region_ids.contains(&src_region) {
143                    entry.insert(src_region, dst_regions.into_iter().collect());
144                }
145            }
146        }
147
148        dropped_regions.retain(|_, regions| !regions.is_empty());
149        dropped_regions
150    }
151
152    /// Filter out dropped regions that are still in their cooldown period.
153    async fn filter_by_cooldown(
154        &self,
155        dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
156    ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
157        let now = Instant::now();
158        let tracker = self.tracker.lock().await;
159        let mut filtered = HashMap::new();
160
161        for (table_id, regions) in dropped_regions {
162            let mut kept = HashMap::new();
163            for (region_id, dst_regions) in regions {
164                if let Some(gc_info) = tracker.get(&region_id) {
165                    let elapsed = now.saturating_duration_since(gc_info.last_gc_time);
166                    if elapsed < self.config.gc_cooldown_period {
167                        debug!("Skipping dropped region {} due to cooldown", region_id);
168                        continue;
169                    }
170                }
171                kept.insert(region_id, dst_regions);
172            }
173
174            if !kept.is_empty() {
175                filtered.insert(table_id, kept);
176            }
177        }
178
179        filtered
180    }
181
182    /// Assign dropped regions to available peers for GC execution.
183    ///
184    /// For dropped regions, we need to:
185    /// 1. Find an available peer from the table's current route
186    /// 2. Use consistent hashing (region_id % peer_count) for load distribution
187    /// 3. Create route overrides since dropped regions have no active route
188    async fn assign_to_peers(
189        &self,
190        dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
191    ) -> Result<DroppedRegionAssignment> {
192        let mut assignment = DroppedRegionAssignment::default();
193
194        for (table_id, regions) in dropped_regions {
195            let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await {
196                Ok(route) => route,
197                Err(e) => {
198                    warn!(
199                        "Failed to get table route for table {}: {}, skipping dropped regions",
200                        table_id, e
201                    );
202                    continue;
203                }
204            };
205
206            if phy_table_id != table_id {
207                continue;
208            }
209
210            let active_region_ids: HashSet<RegionId> = table_route
211                .region_routes
212                .iter()
213                .map(|r| r.region.id)
214                .collect();
215
216            let mut leader_peers: Vec<Peer> = table_route
217                .region_routes
218                .iter()
219                .filter_map(|r| r.leader_peer.clone())
220                .collect();
221            leader_peers.sort_by_key(|peer| peer.id);
222            leader_peers.dedup_by_key(|peer| peer.id);
223
224            if leader_peers.is_empty() {
225                warn!(
226                    "No leader peers found for table {}, skipping dropped regions",
227                    table_id
228                );
229                continue;
230            }
231
232            for (region_id, dst_regions) in regions {
233                if active_region_ids.contains(&region_id) {
234                    debug!(
235                        "Skipping dropped region {} since it still exists in table route",
236                        region_id
237                    );
238                    continue;
239                }
240
241                let selected_idx = (region_id.as_u64() as usize) % leader_peers.len();
242                let peer = leader_peers[selected_idx].clone();
243
244                let info = DroppedRegionInfo {
245                    region_id,
246                    table_id,
247                    dst_regions,
248                };
249
250                assignment
251                    .regions_by_peer
252                    .entry(peer.clone())
253                    .or_default()
254                    .push(info);
255
256                assignment
257                    .force_full_listing
258                    .entry(peer.clone())
259                    .or_default()
260                    .insert(region_id);
261
262                assignment
263                    .region_routes_override
264                    .entry(peer.clone())
265                    .or_default()
266                    .insert(region_id, (peer, Vec::new()));
267            }
268        }
269
270        Ok(assignment)
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_dropped_region_info() {
280        let info = DroppedRegionInfo {
281            region_id: RegionId::new(1, 1),
282            table_id: 1,
283            dst_regions: HashSet::new(),
284        };
285        assert_eq!(info.region_id, RegionId::new(1, 1));
286        assert_eq!(info.table_id, 1);
287    }
288}