meta_srv/gc/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_catalog::consts::MITO_ENGINE;
19use common_meta::datanode::{RegionManifestInfo, RegionStat};
20use common_meta::peer::Peer;
21use common_telemetry::{debug, error, info, warn};
22use futures::StreamExt;
23use itertools::Itertools;
24use ordered_float::OrderedFloat;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{GcReport, RegionId};
27use table::metadata::TableId;
28
29use crate::error::Result;
30use crate::gc::Region2Peers;
31use crate::gc::candidate::GcCandidate;
32use crate::gc::dropped::DroppedRegionCollector;
33use crate::gc::scheduler::{GcJobReport, GcScheduler};
34use crate::gc::tracker::RegionGcInfo;
35
36impl GcScheduler {
37    pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
38        let start_time = Instant::now();
39        info!("Starting GC cycle");
40
41        // limit gc region scope to regions whose datanode have reported stats(by heartbeat)
42        let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
43        info!(
44            "Fetched region stats for {} tables",
45            table_to_region_stats.len()
46        );
47
48        let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
49
50        let table_reparts = self.ctx.get_table_reparts().await?;
51        let dropped_collector =
52            DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
53        let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
54
55        if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
56            info!("No GC candidates found, skipping GC cycle");
57            return Ok(Default::default());
58        }
59
60        let mut datanode_to_candidates = self
61            .aggregate_candidates_by_datanode(per_table_candidates)
62            .await?;
63
64        self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
65
66        if datanode_to_candidates.is_empty() {
67            info!("No valid datanode candidates found, skipping GC cycle");
68            return Ok(Default::default());
69        }
70
71        let report = self
72            .parallel_process_datanodes(
73                datanode_to_candidates,
74                dropped_assignment.force_full_listing,
75                dropped_assignment.region_routes_override,
76            )
77            .await;
78
79        let duration = start_time.elapsed();
80        info!(
81            "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
82            report.per_datanode_reports.len(),
83            report.failed_datanodes.len(),
84            duration
85        );
86        debug!("Detailed GC Job Report: {report:#?}");
87
88        Ok(report)
89    }
90
91    fn merge_dropped_regions(
92        &self,
93        datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
94        assignment: &crate::gc::dropped::DroppedRegionAssignment,
95    ) {
96        for (peer, dropped_infos) in &assignment.regions_by_peer {
97            let entry = datanode_to_candidates.entry(peer.clone()).or_default();
98            for info in dropped_infos {
99                entry.push((info.table_id, dropped_candidate(info.region_id)));
100            }
101        }
102    }
103
104    pub(crate) async fn aggregate_candidates_by_datanode(
105        &self,
106        per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
107    ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
108        let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
109
110        for (table_id, candidates) in per_table_candidates {
111            if candidates.is_empty() {
112                continue;
113            }
114
115            // Get table route information to map regions to peers
116            let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
117
118            if phy_table_id != table_id {
119                // Skip logical tables
120                continue;
121            }
122
123            let region_to_peer = table_peer
124                .region_routes
125                .iter()
126                .filter_map(|r| {
127                    r.leader_peer
128                        .as_ref()
129                        .map(|peer| (r.region.id, peer.clone()))
130                })
131                .collect::<HashMap<RegionId, Peer>>();
132
133            for candidate in candidates {
134                if let Some(peer) = region_to_peer.get(&candidate.region_id) {
135                    datanode_to_candidates
136                        .entry(peer.clone())
137                        .or_default()
138                        .push((table_id, candidate));
139                } else {
140                    warn!(
141                        "Skipping region {} for table {}: no leader peer found",
142                        candidate.region_id, table_id
143                    );
144                }
145            }
146        }
147
148        info!(
149            "Aggregated GC candidates for {} datanodes",
150            datanode_to_candidates.len()
151        );
152        Ok(datanode_to_candidates)
153    }
154
155    /// Process multiple datanodes concurrently with limited parallelism.
156    pub(crate) async fn parallel_process_datanodes(
157        &self,
158        datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
159        force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
160        region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
161    ) -> GcJobReport {
162        let mut report = GcJobReport::default();
163
164        // Create a stream of datanode GC tasks with limited concurrency
165        let results: Vec<_> = futures::stream::iter(
166            datanode_to_candidates
167                .into_iter()
168                .filter(|(_, candidates)| !candidates.is_empty()),
169        )
170        .map(|(peer, candidates)| {
171            let scheduler = self;
172            let peer_clone = peer.clone();
173            let force_full_listing = force_full_listing_by_peer
174                .get(&peer)
175                .cloned()
176                .unwrap_or_default();
177            let region_routes_override = region_routes_override_by_peer
178                .get(&peer)
179                .cloned()
180                .unwrap_or_default();
181            async move {
182                (
183                    peer,
184                    scheduler
185                        .process_datanode_gc(
186                            peer_clone,
187                            candidates,
188                            force_full_listing,
189                            region_routes_override,
190                        )
191                        .await,
192                )
193            }
194        })
195        .buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes
196        .collect()
197        .await;
198
199        // Process all datanode GC results and collect regions that need retry from table reports
200        for (peer, result) in results {
201            match result {
202                Ok(dn_report) => {
203                    report.per_datanode_reports.insert(peer.id, dn_report);
204                }
205                Err(e) => {
206                    error!(e; "Failed to process datanode GC for peer {}", peer);
207                    // Note: We don't have a direct way to map peer to table_id here,
208                    // so we just log the error. The table_reports will contain individual region failures.
209                    report.failed_datanodes.entry(peer.id).or_default().push(e);
210                }
211            }
212        }
213
214        report
215    }
216
217    /// Process GC for a single datanode with all its candidate regions.
218    /// Returns the table reports for this datanode.
219    pub(crate) async fn process_datanode_gc(
220        &self,
221        peer: Peer,
222        candidates: Vec<(TableId, GcCandidate)>,
223        force_full_listing: HashSet<RegionId>,
224        region_routes_override: Region2Peers,
225    ) -> Result<GcReport> {
226        info!(
227            "Starting GC for datanode {} with {} candidate regions",
228            peer,
229            candidates.len()
230        );
231
232        if candidates.is_empty() {
233            return Ok(Default::default());
234        }
235
236        let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
237
238        // Step 2: Run GC for all regions on this datanode in a single batch
239        let (gc_report, fully_listed_regions) = {
240            // Partition regions into full listing and fast listing in a single pass
241
242            let batch_full_listing_decisions = self
243                .batch_should_use_full_listing(&all_region_ids, &force_full_listing)
244                .await;
245
246            let need_full_list_regions = batch_full_listing_decisions
247                .iter()
248                .filter_map(
249                    |(&region_id, &need_full)| {
250                        if need_full { Some(region_id) } else { None }
251                    },
252                )
253                .collect_vec();
254            let fast_list_regions = batch_full_listing_decisions
255                .iter()
256                .filter_map(
257                    |(&region_id, &need_full)| {
258                        if !need_full { Some(region_id) } else { None }
259                    },
260                )
261                .collect_vec();
262
263            let mut combined_report = GcReport::default();
264
265            // First process regions that can fast list
266            if !fast_list_regions.is_empty() {
267                match self
268                    .ctx
269                    .gc_regions(
270                        &fast_list_regions,
271                        false,
272                        self.config.mailbox_timeout,
273                        region_routes_override.clone(),
274                    )
275                    .await
276                {
277                    Ok(report) => combined_report.merge(report),
278                    Err(e) => {
279                        error!(
280                            e; "Failed to GC regions {:?} on datanode {}",
281                            fast_list_regions, peer,
282                        );
283
284                        // Add to need_retry_regions since it failed
285                        combined_report
286                            .need_retry_regions
287                            .extend(fast_list_regions.clone().into_iter());
288                    }
289                }
290            }
291
292            if !need_full_list_regions.is_empty() {
293                match self
294                    .ctx
295                    .gc_regions(
296                        &need_full_list_regions,
297                        true,
298                        self.config.mailbox_timeout,
299                        region_routes_override,
300                    )
301                    .await
302                {
303                    Ok(report) => combined_report.merge(report),
304                    Err(e) => {
305                        error!(
306                            e; "Failed to GC regions {:?} on datanode {}",
307                            need_full_list_regions, peer,
308                        );
309
310                        // Add to need_retry_regions since it failed
311                        combined_report
312                            .need_retry_regions
313                            .extend(need_full_list_regions.clone());
314                    }
315                }
316            }
317            let fully_listed_regions = need_full_list_regions
318                .into_iter()
319                .filter(|r| !combined_report.need_retry_regions.contains(r))
320                .collect::<HashSet<_>>();
321
322            (combined_report, fully_listed_regions)
323        };
324
325        // Step 3: Process the combined GC report and update table reports
326        for region_id in &all_region_ids {
327            self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
328                .await;
329        }
330
331        info!(
332            "Completed GC for datanode {}: {} regions processed",
333            peer,
334            all_region_ids.len()
335        );
336
337        Ok(gc_report)
338    }
339
340    async fn batch_should_use_full_listing(
341        &self,
342        region_ids: &[RegionId],
343        force_full_listing: &HashSet<RegionId>,
344    ) -> HashMap<RegionId, bool> {
345        let mut result = HashMap::new();
346        let mut gc_tracker = self.region_gc_tracker.lock().await;
347        let now = Instant::now();
348        for &region_id in region_ids {
349            if force_full_listing.contains(&region_id) {
350                gc_tracker
351                    .entry(region_id)
352                    .and_modify(|info| {
353                        info.last_full_listing_time = Some(now);
354                        info.last_gc_time = now;
355                    })
356                    .or_insert_with(|| RegionGcInfo {
357                        last_gc_time: now,
358                        last_full_listing_time: Some(now),
359                    });
360                result.insert(region_id, true);
361                continue;
362            }
363            let use_full_listing = {
364                if let Some(gc_info) = gc_tracker.get(&region_id) {
365                    if let Some(last_full_listing) = gc_info.last_full_listing_time {
366                        // check if pass cooling down interval after last full listing
367                        let elapsed = now.saturating_duration_since(last_full_listing);
368                        elapsed >= self.config.full_file_listing_interval
369                    } else {
370                        // Never did full listing for this region, do it now
371                        true
372                    }
373                } else {
374                    // First time GC for this region, skip doing full listing, for this time
375                    gc_tracker.insert(
376                        region_id,
377                        RegionGcInfo {
378                            last_gc_time: now,
379                            last_full_listing_time: Some(now),
380                        },
381                    );
382                    false
383                }
384            };
385            result.insert(region_id, use_full_listing);
386        }
387        result
388    }
389}
390
391fn dropped_candidate(region_id: RegionId) -> GcCandidate {
392    GcCandidate {
393        region_id,
394        score: OrderedFloat(0.0),
395        region_stat: dropped_region_stat(region_id),
396    }
397}
398
399fn dropped_region_stat(region_id: RegionId) -> RegionStat {
400    RegionStat {
401        id: region_id,
402        rcus: 0,
403        wcus: 0,
404        approximate_bytes: 0,
405        engine: MITO_ENGINE.to_string(),
406        role: RegionRole::Leader,
407        num_rows: 0,
408        memtable_size: 0,
409        manifest_size: 0,
410        sst_size: 0,
411        sst_num: 0,
412        index_size: 0,
413        region_manifest: RegionManifestInfo::Mito {
414            manifest_version: 0,
415            flushed_entry_id: 0,
416            file_removed_cnt: 0,
417        },
418        written_bytes: 0,
419        data_topic_latest_entry_id: 0,
420        metadata_topic_latest_entry_id: 0,
421    }
422}