meta_srv/gc/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::tracing::Instrument as _;
22use common_telemetry::{debug, error, info, warn};
23use futures::StreamExt;
24use itertools::Itertools;
25use ordered_float::OrderedFloat;
26use store_api::region_engine::RegionRole;
27use store_api::storage::{GcReport, RegionId};
28use table::metadata::TableId;
29
30use crate::error::Result;
31use crate::gc::Region2Peers;
32use crate::gc::candidate::GcCandidate;
33use crate::gc::dropped::DroppedRegionCollector;
34use crate::gc::scheduler::{GcJobReport, GcScheduler};
35use crate::gc::tracker::RegionGcInfo;
36use crate::metrics::METRIC_META_GC_CANDIDATE_REGIONS;
37
38impl GcScheduler {
39    pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
40        let start_time = Instant::now();
41        info!("Starting GC cycle");
42
43        // limit gc region scope to regions whose datanode have reported stats(by heartbeat)
44        let table_to_region_stats = self
45            .ctx
46            .get_table_to_region_stats()
47            .instrument(common_telemetry::tracing::info_span!(
48                "meta_gc_get_region_stats"
49            ))
50            .await?;
51        info!(
52            "Fetched region stats for {} tables",
53            table_to_region_stats.len()
54        );
55
56        let per_table_candidates = self
57            .select_gc_candidates(&table_to_region_stats)
58            .instrument(common_telemetry::tracing::info_span!(
59                "meta_gc_select_candidates"
60            ))
61            .await?;
62
63        let table_reparts = self
64            .ctx
65            .get_table_reparts()
66            .instrument(common_telemetry::tracing::info_span!(
67                "meta_gc_get_table_reparts"
68            ))
69            .await?;
70        let dropped_collector =
71            DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
72        let dropped_assignment = dropped_collector
73            .collect_and_assign(&table_reparts)
74            .instrument(common_telemetry::tracing::info_span!(
75                "meta_gc_collect_dropped_regions"
76            ))
77            .await?;
78        let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum();
79        let dropped_count: usize = dropped_assignment
80            .regions_by_peer
81            .values()
82            .map(|regions| regions.len())
83            .sum();
84        METRIC_META_GC_CANDIDATE_REGIONS.set((candidate_count + dropped_count) as i64);
85
86        if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
87            info!("No GC candidates found, skipping GC cycle");
88            return Ok(Default::default());
89        }
90
91        let mut datanode_to_candidates = self
92            .aggregate_candidates_by_datanode(per_table_candidates)
93            .instrument(common_telemetry::tracing::info_span!(
94                "meta_gc_aggregate_by_datanode"
95            ))
96            .await?;
97
98        self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
99
100        if datanode_to_candidates.is_empty() {
101            info!("No valid datanode candidates found, skipping GC cycle");
102            return Ok(Default::default());
103        }
104
105        let report = self
106            .parallel_process_datanodes(
107                datanode_to_candidates,
108                dropped_assignment.force_full_listing,
109                dropped_assignment.region_routes_override,
110            )
111            .instrument(common_telemetry::tracing::info_span!(
112                "meta_gc_dispatch_to_datanodes"
113            ))
114            .await;
115
116        let duration = start_time.elapsed();
117        match &report {
118            GcJobReport::PerDatanode {
119                per_datanode_reports,
120                failed_datanodes,
121            } => {
122                info!(
123                    "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
124                    per_datanode_reports.len(),
125                    failed_datanodes.len(),
126                    duration
127                );
128            }
129            GcJobReport::Combined { report } => {
130                info!(
131                    "Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}",
132                    report.deleted_files.len(),
133                    report.deleted_indexes.len(),
134                    duration
135                );
136            }
137        }
138        debug!("Detailed GC Job Report: {report:#?}");
139
140        Ok(report)
141    }
142
143    fn merge_dropped_regions(
144        &self,
145        datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
146        assignment: &crate::gc::dropped::DroppedRegionAssignment,
147    ) {
148        for (peer, dropped_infos) in &assignment.regions_by_peer {
149            let entry = datanode_to_candidates.entry(peer.clone()).or_default();
150            for info in dropped_infos {
151                entry.push((info.table_id, dropped_candidate(info.region_id)));
152            }
153        }
154    }
155
156    pub(crate) async fn aggregate_candidates_by_datanode(
157        &self,
158        per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
159    ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
160        let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
161
162        for (table_id, candidates) in per_table_candidates {
163            if candidates.is_empty() {
164                continue;
165            }
166
167            // Get table route information to map regions to peers
168            let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
169
170            if phy_table_id != table_id {
171                // Skip logical tables
172                continue;
173            }
174
175            let region_to_peer = table_peer
176                .region_routes
177                .iter()
178                .filter_map(|r| {
179                    r.leader_peer
180                        .as_ref()
181                        .map(|peer| (r.region.id, peer.clone()))
182                })
183                .collect::<HashMap<RegionId, Peer>>();
184
185            for candidate in candidates {
186                if let Some(peer) = region_to_peer.get(&candidate.region_id) {
187                    datanode_to_candidates
188                        .entry(peer.clone())
189                        .or_default()
190                        .push((table_id, candidate));
191                } else {
192                    warn!(
193                        "Skipping region {} for table {}: no leader peer found",
194                        candidate.region_id, table_id
195                    );
196                }
197            }
198        }
199
200        info!(
201            "Aggregated GC candidates for {} datanodes",
202            datanode_to_candidates.len()
203        );
204        Ok(datanode_to_candidates)
205    }
206
207    /// Process multiple datanodes concurrently with limited parallelism.
208    pub(crate) async fn parallel_process_datanodes(
209        &self,
210        datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
211        force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
212        region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
213    ) -> GcJobReport {
214        let mut per_datanode_reports = HashMap::new();
215        let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new();
216
217        // Create a stream of datanode GC tasks with limited concurrency
218        let results: Vec<_> = futures::stream::iter(
219            datanode_to_candidates
220                .into_iter()
221                .filter(|(_, candidates)| !candidates.is_empty()),
222        )
223        .map(|(peer, candidates)| {
224            let scheduler = self;
225            let peer_clone = peer.clone();
226            let force_full_listing = force_full_listing_by_peer
227                .get(&peer)
228                .cloned()
229                .unwrap_or_default();
230            let region_routes_override = region_routes_override_by_peer
231                .get(&peer)
232                .cloned()
233                .unwrap_or_default();
234            async move {
235                (
236                    peer,
237                    scheduler
238                        .process_datanode_gc(
239                            peer_clone,
240                            candidates,
241                            force_full_listing,
242                            region_routes_override,
243                        )
244                        .await,
245                )
246            }
247        })
248        .buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes
249        .collect()
250        .await;
251
252        // Process all datanode GC results and collect regions that need retry from table reports
253        for (peer, result) in results {
254            match result {
255                Ok(dn_report) => {
256                    per_datanode_reports.insert(peer.id, dn_report);
257                }
258                Err(e) => {
259                    error!(e; "Failed to process datanode GC for peer {}", peer);
260                    // Note: We don't have a direct way to map peer to table_id here,
261                    // so we just log the error. The table_reports will contain individual region failures.
262                    failed_datanodes.entry(peer.id).or_default().push(e);
263                }
264            }
265        }
266
267        GcJobReport::PerDatanode {
268            per_datanode_reports,
269            failed_datanodes,
270        }
271    }
272
273    /// Process GC for a single datanode with all its candidate regions.
274    /// Returns the table reports for this datanode.
275    pub(crate) async fn process_datanode_gc(
276        &self,
277        peer: Peer,
278        candidates: Vec<(TableId, GcCandidate)>,
279        force_full_listing: HashSet<RegionId>,
280        region_routes_override: Region2Peers,
281    ) -> Result<GcReport> {
282        info!(
283            "Starting GC for datanode {} with {} candidate regions",
284            peer,
285            candidates.len()
286        );
287
288        if candidates.is_empty() {
289            return Ok(Default::default());
290        }
291
292        let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
293
294        // Step 2: Run GC for all regions on this datanode in a single batch
295        let (gc_report, fully_listed_regions) = {
296            // Partition regions into full listing and fast listing in a single pass
297
298            let batch_full_listing_decisions = self
299                .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
300                .await;
301
302            let need_full_list_regions = batch_full_listing_decisions
303                .iter()
304                .filter_map(
305                    |(&region_id, &need_full)| {
306                        if need_full { Some(region_id) } else { None }
307                    },
308                )
309                .collect_vec();
310            let fast_list_regions = batch_full_listing_decisions
311                .iter()
312                .filter_map(
313                    |(&region_id, &need_full)| {
314                        if !need_full { Some(region_id) } else { None }
315                    },
316                )
317                .collect_vec();
318
319            let mut combined_report = GcReport::default();
320
321            // First process regions that can fast list
322            if !fast_list_regions.is_empty() {
323                match self
324                    .ctx
325                    .gc_regions(
326                        &fast_list_regions,
327                        false,
328                        self.config.mailbox_timeout,
329                        region_routes_override.clone(),
330                    )
331                    .instrument(common_telemetry::tracing::info_span!(
332                        "meta_gc_call_datanode",
333                        peer = %peer,
334                        mode = "fast",
335                        region_count = fast_list_regions.len()
336                    ))
337                    .await
338                {
339                    Ok(report) => combined_report.merge(report),
340                    Err(e) => {
341                        error!(
342                            e; "Failed to GC regions {:?} on datanode {}",
343                            fast_list_regions, peer,
344                        );
345
346                        // Add to need_retry_regions since it failed
347                        combined_report
348                            .need_retry_regions
349                            .extend(fast_list_regions.clone().into_iter());
350                    }
351                }
352            }
353
354            if !need_full_list_regions.is_empty() {
355                match self
356                    .ctx
357                    .gc_regions(
358                        &need_full_list_regions,
359                        true,
360                        self.config.mailbox_timeout,
361                        region_routes_override,
362                    )
363                    .instrument(common_telemetry::tracing::info_span!(
364                        "meta_gc_call_datanode",
365                        peer = %peer,
366                        mode = "full",
367                        region_count = need_full_list_regions.len()
368                    ))
369                    .await
370                {
371                    Ok(report) => combined_report.merge(report),
372                    Err(e) => {
373                        error!(
374                            e; "Failed to GC regions {:?} on datanode {}",
375                            need_full_list_regions, peer,
376                        );
377
378                        // Add to need_retry_regions since it failed
379                        combined_report
380                            .need_retry_regions
381                            .extend(need_full_list_regions.clone());
382                    }
383                }
384            }
385            let fully_listed_regions = need_full_list_regions
386                .into_iter()
387                .filter(|r| !combined_report.need_retry_regions.contains(r))
388                .collect::<HashSet<_>>();
389
390            (combined_report, fully_listed_regions)
391        };
392
393        // Step 3: Process the combined GC report and update table reports
394        for region_id in &all_region_ids {
395            self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
396                .await;
397        }
398
399        info!(
400            "Completed GC for datanode {}: {} regions processed",
401            peer,
402            all_region_ids.len()
403        );
404
405        Ok(gc_report)
406    }
407
408    async fn batch_should_use_full_listing(
409        &self,
410        region_ids: &[RegionId],
411        force_full_listing: &HashSet<RegionId>,
412    ) -> HashMap<RegionId, bool> {
413        let mut result = HashMap::new();
414        let mut gc_tracker = self.region_gc_tracker.lock().await;
415        let now = Instant::now();
416        for &region_id in region_ids {
417            if force_full_listing.contains(&region_id) {
418                gc_tracker
419                    .entry(region_id)
420                    .and_modify(|info| {
421                        info.last_full_listing_time = Some(now);
422                        info.last_gc_time = now;
423                    })
424                    .or_insert_with(|| RegionGcInfo {
425                        last_gc_time: now,
426                        last_full_listing_time: Some(now),
427                    });
428                result.insert(region_id, true);
429                continue;
430            }
431            let use_full_listing = {
432                if let Some(gc_info) = gc_tracker.get(&region_id) {
433                    if let Some(last_full_listing) = gc_info.last_full_listing_time {
434                        // check if pass cooling down interval after last full listing
435                        let elapsed = now.saturating_duration_since(last_full_listing);
436                        elapsed >= self.config.full_file_listing_interval
437                    } else {
438                        // Never did full listing for this region, do it now
439                        true
440                    }
441                } else {
442                    // First time GC for this region, skip doing full listing, for this time
443                    gc_tracker.insert(
444                        region_id,
445                        RegionGcInfo {
446                            last_gc_time: now,
447                            last_full_listing_time: Some(now),
448                        },
449                    );
450                    false
451                }
452            };
453            result.insert(region_id, use_full_listing);
454        }
455        result
456    }
457}
458
459fn dropped_candidate(region_id: RegionId) -> GcCandidate {
460    GcCandidate {
461        region_id,
462        score: OrderedFloat(0.0),
463        region_stat: dropped_region_stat(region_id),
464    }
465}
466
467fn dropped_region_stat(region_id: RegionId) -> RegionStat {
468    RegionStat {
469        id: region_id,
470        rcus: 0,
471        wcus: 0,
472        approximate_bytes: 0,
473        engine: MITO_ENGINE.to_string(),
474        role: RegionRole::Leader,
475        num_rows: 0,
476        memtable_size: 0,
477        manifest_size: 0,
478        sst_size: 0,
479        sst_num: 0,
480        index_size: 0,
481        region_manifest: RegionManifestInfo::Mito {
482            manifest_version: 0,
483            flushed_entry_id: 0,
484            file_removed_cnt: 0,
485        },
486        written_bytes: 0,
487        data_topic_latest_entry_id: 0,
488        metadata_topic_latest_entry_id: 0,
489    }
490}