meta_srv/gc/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::{debug, error, info, warn};
22use futures::StreamExt;
23use itertools::Itertools;
24use ordered_float::OrderedFloat;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{GcReport, RegionId};
27use table::metadata::TableId;
28
29use crate::error::Result;
30use crate::gc::Region2Peers;
31use crate::gc::candidate::GcCandidate;
32use crate::gc::dropped::DroppedRegionCollector;
33use crate::gc::scheduler::{GcJobReport, GcScheduler};
34use crate::gc::tracker::RegionGcInfo;
35use crate::metrics::METRIC_META_GC_CANDIDATE_REGIONS;
36
37impl GcScheduler {
38    pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
39        let start_time = Instant::now();
40        info!("Starting GC cycle");
41
42        // limit gc region scope to regions whose datanode have reported stats(by heartbeat)
43        let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
44        info!(
45            "Fetched region stats for {} tables",
46            table_to_region_stats.len()
47        );
48
49        let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
50
51        let table_reparts = self.ctx.get_table_reparts().await?;
52        let dropped_collector =
53            DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
54        let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
55        let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum();
56        let dropped_count: usize = dropped_assignment
57            .regions_by_peer
58            .values()
59            .map(|regions| regions.len())
60            .sum();
61        METRIC_META_GC_CANDIDATE_REGIONS.set((candidate_count + dropped_count) as i64);
62
63        if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
64            info!("No GC candidates found, skipping GC cycle");
65            return Ok(Default::default());
66        }
67
68        let mut datanode_to_candidates = self
69            .aggregate_candidates_by_datanode(per_table_candidates)
70            .await?;
71
72        self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
73
74        if datanode_to_candidates.is_empty() {
75            info!("No valid datanode candidates found, skipping GC cycle");
76            return Ok(Default::default());
77        }
78
79        let report = self
80            .parallel_process_datanodes(
81                datanode_to_candidates,
82                dropped_assignment.force_full_listing,
83                dropped_assignment.region_routes_override,
84            )
85            .await;
86
87        let duration = start_time.elapsed();
88        info!(
89            "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
90            report.per_datanode_reports.len(),
91            report.failed_datanodes.len(),
92            duration
93        );
94        debug!("Detailed GC Job Report: {report:#?}");
95
96        Ok(report)
97    }
98
99    fn merge_dropped_regions(
100        &self,
101        datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
102        assignment: &crate::gc::dropped::DroppedRegionAssignment,
103    ) {
104        for (peer, dropped_infos) in &assignment.regions_by_peer {
105            let entry = datanode_to_candidates.entry(peer.clone()).or_default();
106            for info in dropped_infos {
107                entry.push((info.table_id, dropped_candidate(info.region_id)));
108            }
109        }
110    }
111
112    pub(crate) async fn aggregate_candidates_by_datanode(
113        &self,
114        per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
115    ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
116        let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
117
118        for (table_id, candidates) in per_table_candidates {
119            if candidates.is_empty() {
120                continue;
121            }
122
123            // Get table route information to map regions to peers
124            let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
125
126            if phy_table_id != table_id {
127                // Skip logical tables
128                continue;
129            }
130
131            let region_to_peer = table_peer
132                .region_routes
133                .iter()
134                .filter_map(|r| {
135                    r.leader_peer
136                        .as_ref()
137                        .map(|peer| (r.region.id, peer.clone()))
138                })
139                .collect::<HashMap<RegionId, Peer>>();
140
141            for candidate in candidates {
142                if let Some(peer) = region_to_peer.get(&candidate.region_id) {
143                    datanode_to_candidates
144                        .entry(peer.clone())
145                        .or_default()
146                        .push((table_id, candidate));
147                } else {
148                    warn!(
149                        "Skipping region {} for table {}: no leader peer found",
150                        candidate.region_id, table_id
151                    );
152                }
153            }
154        }
155
156        info!(
157            "Aggregated GC candidates for {} datanodes",
158            datanode_to_candidates.len()
159        );
160        Ok(datanode_to_candidates)
161    }
162
163    /// Process multiple datanodes concurrently with limited parallelism.
164    pub(crate) async fn parallel_process_datanodes(
165        &self,
166        datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
167        force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
168        region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
169    ) -> GcJobReport {
170        let mut report = GcJobReport::default();
171
172        // Create a stream of datanode GC tasks with limited concurrency
173        let results: Vec<_> = futures::stream::iter(
174            datanode_to_candidates
175                .into_iter()
176                .filter(|(_, candidates)| !candidates.is_empty()),
177        )
178        .map(|(peer, candidates)| {
179            let scheduler = self;
180            let peer_clone = peer.clone();
181            let force_full_listing = force_full_listing_by_peer
182                .get(&peer)
183                .cloned()
184                .unwrap_or_default();
185            let region_routes_override = region_routes_override_by_peer
186                .get(&peer)
187                .cloned()
188                .unwrap_or_default();
189            async move {
190                (
191                    peer,
192                    scheduler
193                        .process_datanode_gc(
194                            peer_clone,
195                            candidates,
196                            force_full_listing,
197                            region_routes_override,
198                        )
199                        .await,
200                )
201            }
202        })
203        .buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes
204        .collect()
205        .await;
206
207        // Process all datanode GC results and collect regions that need retry from table reports
208        for (peer, result) in results {
209            match result {
210                Ok(dn_report) => {
211                    report.per_datanode_reports.insert(peer.id, dn_report);
212                }
213                Err(e) => {
214                    error!(e; "Failed to process datanode GC for peer {}", peer);
215                    // Note: We don't have a direct way to map peer to table_id here,
216                    // so we just log the error. The table_reports will contain individual region failures.
217                    report.failed_datanodes.entry(peer.id).or_default().push(e);
218                }
219            }
220        }
221
222        report
223    }
224
225    /// Process GC for a single datanode with all its candidate regions.
226    /// Returns the table reports for this datanode.
227    pub(crate) async fn process_datanode_gc(
228        &self,
229        peer: Peer,
230        candidates: Vec<(TableId, GcCandidate)>,
231        force_full_listing: HashSet<RegionId>,
232        region_routes_override: Region2Peers,
233    ) -> Result<GcReport> {
234        info!(
235            "Starting GC for datanode {} with {} candidate regions",
236            peer,
237            candidates.len()
238        );
239
240        if candidates.is_empty() {
241            return Ok(Default::default());
242        }
243
244        let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
245
246        // Step 2: Run GC for all regions on this datanode in a single batch
247        let (gc_report, fully_listed_regions) = {
248            // Partition regions into full listing and fast listing in a single pass
249
250            let batch_full_listing_decisions = self
251                .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
252                .await;
253
254            let need_full_list_regions = batch_full_listing_decisions
255                .iter()
256                .filter_map(
257                    |(&region_id, &need_full)| {
258                        if need_full { Some(region_id) } else { None }
259                    },
260                )
261                .collect_vec();
262            let fast_list_regions = batch_full_listing_decisions
263                .iter()
264                .filter_map(
265                    |(&region_id, &need_full)| {
266                        if !need_full { Some(region_id) } else { None }
267                    },
268                )
269                .collect_vec();
270
271            let mut combined_report = GcReport::default();
272
273            // First process regions that can fast list
274            if !fast_list_regions.is_empty() {
275                match self
276                    .ctx
277                    .gc_regions(
278                        &fast_list_regions,
279                        false,
280                        self.config.mailbox_timeout,
281                        region_routes_override.clone(),
282                    )
283                    .await
284                {
285                    Ok(report) => combined_report.merge(report),
286                    Err(e) => {
287                        error!(
288                            e; "Failed to GC regions {:?} on datanode {}",
289                            fast_list_regions, peer,
290                        );
291
292                        // Add to need_retry_regions since it failed
293                        combined_report
294                            .need_retry_regions
295                            .extend(fast_list_regions.clone().into_iter());
296                    }
297                }
298            }
299
300            if !need_full_list_regions.is_empty() {
301                match self
302                    .ctx
303                    .gc_regions(
304                        &need_full_list_regions,
305                        true,
306                        self.config.mailbox_timeout,
307                        region_routes_override,
308                    )
309                    .await
310                {
311                    Ok(report) => combined_report.merge(report),
312                    Err(e) => {
313                        error!(
314                            e; "Failed to GC regions {:?} on datanode {}",
315                            need_full_list_regions, peer,
316                        );
317
318                        // Add to need_retry_regions since it failed
319                        combined_report
320                            .need_retry_regions
321                            .extend(need_full_list_regions.clone());
322                    }
323                }
324            }
325            let fully_listed_regions = need_full_list_regions
326                .into_iter()
327                .filter(|r| !combined_report.need_retry_regions.contains(r))
328                .collect::<HashSet<_>>();
329
330            (combined_report, fully_listed_regions)
331        };
332
333        // Step 3: Process the combined GC report and update table reports
334        for region_id in &all_region_ids {
335            self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
336                .await;
337        }
338
339        info!(
340            "Completed GC for datanode {}: {} regions processed",
341            peer,
342            all_region_ids.len()
343        );
344
345        Ok(gc_report)
346    }
347
348    async fn batch_should_use_full_listing(
349        &self,
350        region_ids: &[RegionId],
351        force_full_listing: &HashSet<RegionId>,
352    ) -> HashMap<RegionId, bool> {
353        let mut result = HashMap::new();
354        let mut gc_tracker = self.region_gc_tracker.lock().await;
355        let now = Instant::now();
356        for &region_id in region_ids {
357            if force_full_listing.contains(&region_id) {
358                gc_tracker
359                    .entry(region_id)
360                    .and_modify(|info| {
361                        info.last_full_listing_time = Some(now);
362                        info.last_gc_time = now;
363                    })
364                    .or_insert_with(|| RegionGcInfo {
365                        last_gc_time: now,
366                        last_full_listing_time: Some(now),
367                    });
368                result.insert(region_id, true);
369                continue;
370            }
371            let use_full_listing = {
372                if let Some(gc_info) = gc_tracker.get(&region_id) {
373                    if let Some(last_full_listing) = gc_info.last_full_listing_time {
374                        // check if pass cooling down interval after last full listing
375                        let elapsed = now.saturating_duration_since(last_full_listing);
376                        elapsed >= self.config.full_file_listing_interval
377                    } else {
378                        // Never did full listing for this region, do it now
379                        true
380                    }
381                } else {
382                    // First time GC for this region, skip doing full listing, for this time
383                    gc_tracker.insert(
384                        region_id,
385                        RegionGcInfo {
386                            last_gc_time: now,
387                            last_full_listing_time: Some(now),
388                        },
389                    );
390                    false
391                }
392            };
393            result.insert(region_id, use_full_listing);
394        }
395        result
396    }
397}
398
399fn dropped_candidate(region_id: RegionId) -> GcCandidate {
400    GcCandidate {
401        region_id,
402        score: OrderedFloat(0.0),
403        region_stat: dropped_region_stat(region_id),
404    }
405}
406
407fn dropped_region_stat(region_id: RegionId) -> RegionStat {
408    RegionStat {
409        id: region_id,
410        rcus: 0,
411        wcus: 0,
412        approximate_bytes: 0,
413        engine: MITO_ENGINE.to_string(),
414        role: RegionRole::Leader,
415        num_rows: 0,
416        memtable_size: 0,
417        manifest_size: 0,
418        sst_size: 0,
419        sst_num: 0,
420        index_size: 0,
421        region_manifest: RegionManifestInfo::Mito {
422            manifest_version: 0,
423            flushed_entry_id: 0,
424            file_removed_cnt: 0,
425        },
426        written_bytes: 0,
427        data_topic_latest_entry_id: 0,
428        metadata_topic_latest_entry_id: 0,
429    }
430}