meta_srv/gc/
dropped.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dropped region GC handling.
16//!
17//! This module handles garbage collection for "dropped regions". A dropped region is:
18//! 1. Recorded in `table_repart` metadata (the only place that still remembers it)
19//! 2. No longer exists in `table_route` metadata (removed from region routing)
20//! 3. No longer present in any datanode's heartbeat (physically closed)
21//!
22//! This differs from "active regions" which exist in both metadata and heartbeats.
23//! The `table_repart` entry serves as a tombstone that tracks which regions were
24//! merged/split and need their files cleaned up.
25
26use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29use common_meta::key::table_repart::TableRepartValue;
30use common_meta::peer::Peer;
31use common_telemetry::{debug, warn};
32use store_api::storage::RegionId;
33use table::metadata::TableId;
34
35use crate::error::Result;
36use crate::gc::Region2Peers;
37use crate::gc::ctx::SchedulerCtx;
38use crate::gc::options::GcSchedulerOptions;
39use crate::gc::tracker::RegionGcTracker;
40
41/// Information about a dropped region ready for GC.
42#[derive(Debug, Clone)]
43pub(crate) struct DroppedRegionInfo {
44    pub region_id: RegionId,
45    pub table_id: TableId,
46    /// The destination regions this region was split into (if any).
47    #[allow(unused)]
48    pub dst_regions: HashSet<RegionId>,
49}
50
51/// Result of collecting and assigning dropped regions to peers.
52#[derive(Debug, Default)]
53pub(crate) struct DroppedRegionAssignment {
54    /// Dropped regions grouped by the peer responsible for GC.
55    pub regions_by_peer: HashMap<Peer, Vec<DroppedRegionInfo>>,
56    /// Regions that require full file listing (always true for dropped regions).
57    pub force_full_listing: HashMap<Peer, HashSet<RegionId>>,
58    /// Override region routes for dropped regions (they have no active route).
59    pub region_routes_override: HashMap<Peer, Region2Peers>,
60}
61
62/// Collector for dropped regions that need GC.
63pub(crate) struct DroppedRegionCollector<'a> {
64    ctx: &'a dyn SchedulerCtx,
65    config: &'a GcSchedulerOptions,
66    tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
67}
68
69impl<'a> DroppedRegionCollector<'a> {
70    pub fn new(
71        ctx: &'a dyn SchedulerCtx,
72        config: &'a GcSchedulerOptions,
73        tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
74    ) -> Self {
75        Self {
76            ctx,
77            config,
78            tracker,
79        }
80    }
81
82    /// Collect and assign dropped regions for GC.
83    ///
84    /// This method:
85    /// 1. Identifies regions in repartition metadata that are no longer active
86    /// 2. Filters out regions still in cooldown period
87    /// 3. Assigns each dropped region to an available peer for cleanup
88    pub async fn collect_and_assign(
89        &self,
90        table_reparts: &[(TableId, TableRepartValue)],
91    ) -> Result<DroppedRegionAssignment> {
92        self.collect_and_assign_with_cooldown(table_reparts, true)
93            .await
94    }
95
96    /// Collect and assign dropped regions for GC with optional cooldown filtering.
97    pub async fn collect_and_assign_with_cooldown(
98        &self,
99        table_reparts: &[(TableId, TableRepartValue)],
100        apply_cooldown: bool,
101    ) -> Result<DroppedRegionAssignment> {
102        // get active region ids for all tables involved in repartitioning
103        let active_region_ids: HashSet<RegionId> = {
104            let table_ids = table_reparts
105                .iter()
106                .map(|(table_id, _)| *table_id)
107                .collect::<Vec<_>>();
108            let mut active_region_ids = HashSet::new();
109
110            let table_routes = self.ctx.batch_get_table_route(&table_ids).await?;
111            for table_route in table_routes.values() {
112                for region in &table_route.region_routes {
113                    active_region_ids.insert(region.region.id);
114                }
115            }
116
117            active_region_ids
118        };
119
120        let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids);
121
122        if dropped_regions.is_empty() {
123            return Ok(DroppedRegionAssignment::default());
124        }
125
126        let dropped_regions = if apply_cooldown {
127            self.filter_by_cooldown(dropped_regions).await
128        } else {
129            dropped_regions
130        };
131
132        if dropped_regions.is_empty() {
133            return Ok(DroppedRegionAssignment::default());
134        }
135
136        self.assign_to_peers(dropped_regions).await
137    }
138
139    /// Identify dropped regions: regions in `table_repart` but not in table routes.
140    /// The `assign_to_peers` step later double check they're also absent from `table_route`.
141    fn identify_dropped_regions(
142        &self,
143        table_reparts: &[(TableId, TableRepartValue)],
144        active_region_ids: &HashSet<RegionId>,
145    ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
146        let mut dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
147            HashMap::new();
148
149        for (table_id, repart) in table_reparts {
150            if repart.src_to_dst.is_empty() {
151                continue;
152            }
153
154            let entry = dropped_regions.entry(*table_id).or_default();
155            for (src_region, dst_regions) in repart.clone().src_to_dst {
156                if !active_region_ids.contains(&src_region) {
157                    entry.insert(src_region, dst_regions.into_iter().collect());
158                }
159            }
160        }
161
162        dropped_regions.retain(|_, regions| !regions.is_empty());
163        dropped_regions
164    }
165
166    /// Filter out dropped regions that are still in their cooldown period.
167    async fn filter_by_cooldown(
168        &self,
169        dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
170    ) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
171        let now = Instant::now();
172        let tracker = self.tracker.lock().await;
173        let mut filtered = HashMap::new();
174
175        for (table_id, regions) in dropped_regions {
176            let mut kept = HashMap::new();
177            for (region_id, dst_regions) in regions {
178                if let Some(gc_info) = tracker.get(&region_id) {
179                    let elapsed = now.saturating_duration_since(gc_info.last_gc_time);
180                    if elapsed < self.config.gc_cooldown_period {
181                        debug!("Skipping dropped region {} due to cooldown", region_id);
182                        continue;
183                    }
184                }
185                kept.insert(region_id, dst_regions);
186            }
187
188            if !kept.is_empty() {
189                filtered.insert(table_id, kept);
190            }
191        }
192
193        filtered
194    }
195
196    /// Assign dropped regions to available peers for GC execution.
197    ///
198    /// For dropped regions, we need to:
199    /// 1. Find an available peer from the table's current route
200    /// 2. Use consistent hashing (region_id % peer_count) for load distribution
201    /// 3. Create route overrides since dropped regions have no active route
202    async fn assign_to_peers(
203        &self,
204        dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
205    ) -> Result<DroppedRegionAssignment> {
206        let mut assignment = DroppedRegionAssignment::default();
207
208        for (table_id, regions) in dropped_regions {
209            let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await {
210                Ok(route) => route,
211                Err(e) => {
212                    warn!(
213                        "Failed to get table route for table {}: {}, skipping dropped regions",
214                        table_id, e
215                    );
216                    continue;
217                }
218            };
219
220            if phy_table_id != table_id {
221                continue;
222            }
223
224            let active_region_ids: HashSet<RegionId> = table_route
225                .region_routes
226                .iter()
227                .map(|r| r.region.id)
228                .collect();
229
230            let mut leader_peers: Vec<Peer> = table_route
231                .region_routes
232                .iter()
233                .filter_map(|r| r.leader_peer.clone())
234                .collect();
235            leader_peers.sort_by_key(|peer| peer.id);
236            leader_peers.dedup_by_key(|peer| peer.id);
237
238            if leader_peers.is_empty() {
239                warn!(
240                    "No leader peers found for table {}, skipping dropped regions",
241                    table_id
242                );
243                continue;
244            }
245
246            for (region_id, dst_regions) in regions {
247                if active_region_ids.contains(&region_id) {
248                    debug!(
249                        "Skipping dropped region {} since it still exists in table route",
250                        region_id
251                    );
252                    continue;
253                }
254
255                let selected_idx = (region_id.as_u64() as usize) % leader_peers.len();
256                let peer = leader_peers[selected_idx].clone();
257
258                let info = DroppedRegionInfo {
259                    region_id,
260                    table_id,
261                    dst_regions,
262                };
263
264                assignment
265                    .regions_by_peer
266                    .entry(peer.clone())
267                    .or_default()
268                    .push(info);
269
270                assignment
271                    .force_full_listing
272                    .entry(peer.clone())
273                    .or_default()
274                    .insert(region_id);
275
276                assignment
277                    .region_routes_override
278                    .entry(peer.clone())
279                    .or_default()
280                    .insert(region_id, (peer, Vec::new()));
281            }
282        }
283
284        Ok(assignment)
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_dropped_region_info() {
294        let info = DroppedRegionInfo {
295            region_id: RegionId::new(1, 1),
296            table_id: 1,
297            dst_regions: HashSet::new(),
298        };
299        assert_eq!(info.region_id, RegionId::new(1, 1));
300        assert_eq!(info.table_id, 1);
301    }
302}