meta_srv/gc/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Instant;
17
18use common_meta::key::table_route::PhysicalTableRouteValue;
19use common_meta::peer::Peer;
20use common_telemetry::{debug, error, info, warn};
21use futures::StreamExt;
22use itertools::Itertools;
23use store_api::storage::{FileRefsManifest, GcReport, RegionId};
24use table::metadata::TableId;
25use tokio::time::sleep;
26
27use crate::error::Result;
28use crate::gc::candidate::GcCandidate;
29use crate::gc::scheduler::{GcJobReport, GcScheduler};
30use crate::gc::tracker::RegionGcInfo;
31use crate::region;
32
33pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
34
35pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;
36
37impl GcScheduler {
38    /// Iterate through all region stats, find region that might need gc, and send gc instruction to
39    /// the corresponding datanode with improved parallel processing and retry logic.
40    pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
41        let start_time = Instant::now();
42        info!("Starting GC cycle");
43
44        // Step 1: Get all region statistics
45        let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
46        info!(
47            "Fetched region stats for {} tables",
48            table_to_region_stats.len()
49        );
50
51        // Step 2: Select GC candidates based on our scoring algorithm
52        let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
53
54        if per_table_candidates.is_empty() {
55            info!("No GC candidates found, skipping GC cycle");
56            return Ok(Default::default());
57        }
58
59        // Step 3: Aggregate candidates by datanode
60        let datanode_to_candidates = self
61            .aggregate_candidates_by_datanode(per_table_candidates)
62            .await?;
63
64        if datanode_to_candidates.is_empty() {
65            info!("No valid datanode candidates found, skipping GC cycle");
66            return Ok(Default::default());
67        }
68
69        // Step 4: Process datanodes concurrently with limited parallelism
70        let report = self
71            .parallel_process_datanodes(datanode_to_candidates)
72            .await;
73
74        let duration = start_time.elapsed();
75        info!(
76            "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
77            report.per_datanode_reports.len(), // Reuse field for datanode count
78            report.failed_datanodes.len(),
79            duration
80        );
81        debug!("Detailed GC Job Report: {report:#?}");
82
83        Ok(report)
84    }
85
86    /// Find related regions that might share files with the candidate regions.
87    /// Currently returns the same regions since repartition is not implemented yet.
88    /// TODO(discord9): When repartition is implemented, this should also find src/dst regions
89    /// that might share files with the candidate regions.
90    pub(crate) async fn find_related_regions(
91        &self,
92        candidate_region_ids: &[RegionId],
93    ) -> Result<HashMap<RegionId, Vec<RegionId>>> {
94        Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
95    }
96
97    /// Aggregate GC candidates by their corresponding datanode peer.
98    pub(crate) async fn aggregate_candidates_by_datanode(
99        &self,
100        per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
101    ) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
102        let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
103
104        for (table_id, candidates) in per_table_candidates {
105            if candidates.is_empty() {
106                continue;
107            }
108
109            // Get table route information to map regions to peers
110            let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
111
112            if phy_table_id != table_id {
113                // Skip logical tables
114                continue;
115            }
116
117            let region_to_peer = table_peer
118                .region_routes
119                .iter()
120                .filter_map(|r| {
121                    r.leader_peer
122                        .as_ref()
123                        .map(|peer| (r.region.id, peer.clone()))
124                })
125                .collect::<HashMap<RegionId, Peer>>();
126
127            for candidate in candidates {
128                if let Some(peer) = region_to_peer.get(&candidate.region_id) {
129                    datanode_to_candidates
130                        .entry(peer.clone())
131                        .or_default()
132                        .push((table_id, candidate));
133                } else {
134                    warn!(
135                        "Skipping region {} for table {}: no leader peer found",
136                        candidate.region_id, table_id
137                    );
138                }
139            }
140        }
141
142        info!(
143            "Aggregated GC candidates for {} datanodes",
144            datanode_to_candidates.len()
145        );
146        Ok(datanode_to_candidates)
147    }
148
149    /// Process multiple datanodes concurrently with limited parallelism.
150    pub(crate) async fn parallel_process_datanodes(
151        &self,
152        datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
153    ) -> GcJobReport {
154        let mut report = GcJobReport::default();
155
156        // Create a stream of datanode GC tasks with limited concurrency
157        let results: Vec<_> = futures::stream::iter(
158            datanode_to_candidates
159                .into_iter()
160                .filter(|(_, candidates)| !candidates.is_empty()),
161        )
162        .map(|(peer, candidates)| {
163            let scheduler = self;
164            let peer_clone = peer.clone();
165            async move {
166                (
167                    peer,
168                    scheduler.process_datanode_gc(peer_clone, candidates).await,
169                )
170            }
171        })
172        .buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes
173        .collect()
174        .await;
175
176        // Process all datanode GC results and collect regions that need retry from table reports
177        for (peer, result) in results {
178            match result {
179                Ok(dn_report) => {
180                    report.per_datanode_reports.insert(peer.id, dn_report);
181                }
182                Err(e) => {
183                    error!("Failed to process datanode GC for peer {}: {:#?}", peer, e);
184                    // Note: We don't have a direct way to map peer to table_id here,
185                    // so we just log the error. The table_reports will contain individual region failures.
186                    report.failed_datanodes.entry(peer.id).or_default().push(e);
187                }
188            }
189        }
190
191        report
192    }
193
194    /// Process GC for a single datanode with all its candidate regions.
195    /// Returns the table reports for this datanode.
196    pub(crate) async fn process_datanode_gc(
197        &self,
198        peer: Peer,
199        candidates: Vec<(TableId, GcCandidate)>,
200    ) -> Result<GcReport> {
201        info!(
202            "Starting GC for datanode {} with {} candidate regions",
203            peer,
204            candidates.len()
205        );
206
207        if candidates.is_empty() {
208            return Ok(Default::default());
209        }
210
211        let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
212
213        let all_related_regions = self.find_related_regions(&all_region_ids).await?;
214
215        let (region_to_peer, _) = self
216            .discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec())
217            .await?;
218
219        // Step 1: Get file references for all regions on this datanode
220        let file_refs_manifest = self
221            .ctx
222            .get_file_references(
223                &all_region_ids,
224                all_related_regions,
225                &region_to_peer,
226                self.config.mailbox_timeout,
227            )
228            .await?;
229
230        // Step 2: Create a single GcRegionProcedure for all regions on this datanode
231        let (gc_report, fully_listed_regions) = {
232            // Partition regions into full listing and fast listing in a single pass
233
234            let mut batch_full_listing_decisions =
235                self.batch_should_use_full_listing(&all_region_ids).await;
236
237            let need_full_list_regions = batch_full_listing_decisions
238                .iter()
239                .filter_map(
240                    |(&region_id, &need_full)| {
241                        if need_full { Some(region_id) } else { None }
242                    },
243                )
244                .collect_vec();
245            let mut fast_list_regions = batch_full_listing_decisions
246                .iter()
247                .filter_map(
248                    |(&region_id, &need_full)| {
249                        if !need_full { Some(region_id) } else { None }
250                    },
251                )
252                .collect_vec();
253
254            let mut combined_report = GcReport::default();
255
256            // First process regions that can fast list
257            if !fast_list_regions.is_empty() {
258                match self
259                    .ctx
260                    .gc_regions(
261                        peer.clone(),
262                        &fast_list_regions,
263                        &file_refs_manifest,
264                        false,
265                        self.config.mailbox_timeout,
266                    )
267                    .await
268                {
269                    Ok(report) => combined_report.merge(report),
270                    Err(e) => {
271                        error!(
272                            "Failed to GC regions {:?} on datanode {}: {}",
273                            fast_list_regions, peer, e
274                        );
275
276                        // Add to need_retry_regions since it failed
277                        combined_report
278                            .need_retry_regions
279                            .extend(fast_list_regions.clone().into_iter());
280                    }
281                }
282            }
283
284            if !need_full_list_regions.is_empty() {
285                match self
286                    .ctx
287                    .gc_regions(
288                        peer.clone(),
289                        &need_full_list_regions,
290                        &file_refs_manifest,
291                        true,
292                        self.config.mailbox_timeout,
293                    )
294                    .await
295                {
296                    Ok(report) => combined_report.merge(report),
297                    Err(e) => {
298                        error!(
299                            "Failed to GC regions {:?} on datanode {}: {}",
300                            need_full_list_regions, peer, e
301                        );
302
303                        // Add to need_retry_regions since it failed
304                        combined_report
305                            .need_retry_regions
306                            .extend(need_full_list_regions.clone());
307                    }
308                }
309            }
310            let fully_listed_regions = need_full_list_regions
311                .into_iter()
312                .filter(|r| !combined_report.need_retry_regions.contains(r))
313                .collect::<HashSet<_>>();
314
315            (combined_report, fully_listed_regions)
316        };
317
318        // Step 3: Process the combined GC report and update table reports
319        for region_id in &all_region_ids {
320            self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
321                .await;
322        }
323
324        info!(
325            "Completed GC for datanode {}: {} regions processed",
326            peer,
327            all_region_ids.len()
328        );
329
330        Ok(gc_report)
331    }
332
333    /// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches.
334    /// Returns mappings from region to peer(leader, Vec<followers>) and peer to regions.
335    async fn discover_datanodes_for_regions(
336        &self,
337        regions: &[RegionId],
338    ) -> Result<(Region2Peers, Peer2Regions)> {
339        let all_related_regions = self
340            .find_related_regions(regions)
341            .await?
342            .into_iter()
343            .flat_map(|(k, mut v)| {
344                v.push(k);
345                v
346            })
347            .collect_vec();
348        let mut region_to_peer = HashMap::new();
349        let mut peer_to_regions = HashMap::new();
350
351        // Group regions by table ID for batch processing
352        let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
353        for region_id in all_related_regions {
354            let table_id = region_id.table_id();
355            table_to_regions
356                .entry(table_id)
357                .or_default()
358                .push(region_id);
359        }
360
361        // Process each table's regions together for efficiency
362        for (table_id, table_regions) in table_to_regions {
363            match self.ctx.get_table_route(table_id).await {
364                Ok((_phy_table_id, table_route)) => {
365                    self.get_table_regions_peer(
366                        &table_route,
367                        &table_regions,
368                        &mut region_to_peer,
369                        &mut peer_to_regions,
370                    );
371                }
372                Err(e) => {
373                    // Continue with other tables instead of failing completely
374                    // TODO(discord9): consider failing here instead
375                    warn!(
376                        "Failed to get table route for table {}: {}, skipping its regions",
377                        table_id, e
378                    );
379                    continue;
380                }
381            }
382        }
383
384        Ok((region_to_peer, peer_to_regions))
385    }
386
387    /// Process regions for a single table to find their current leader peers.
388    fn get_table_regions_peer(
389        &self,
390        table_route: &PhysicalTableRouteValue,
391        table_regions: &[RegionId],
392        region_to_peer: &mut Region2Peers,
393        peer_to_regions: &mut Peer2Regions,
394    ) {
395        for &region_id in table_regions {
396            let mut found = false;
397
398            // Find the region in the table route
399            for region_route in &table_route.region_routes {
400                if region_route.region.id == region_id
401                    && let Some(leader_peer) = &region_route.leader_peer
402                {
403                    region_to_peer.insert(
404                        region_id,
405                        (leader_peer.clone(), region_route.follower_peers.clone()),
406                    );
407                    peer_to_regions
408                        .entry(leader_peer.clone())
409                        .or_default()
410                        .insert(region_id);
411                    found = true;
412                    break;
413                }
414            }
415
416            if !found {
417                warn!(
418                    "Failed to find region {} in table route or no leader peer found",
419                    region_id,
420                );
421            }
422        }
423    }
424
425    async fn batch_should_use_full_listing(
426        &self,
427        region_ids: &[RegionId],
428    ) -> HashMap<RegionId, bool> {
429        let mut result = HashMap::new();
430        let mut gc_tracker = self.region_gc_tracker.lock().await;
431        let now = Instant::now();
432        for &region_id in region_ids {
433            let use_full_listing = {
434                if let Some(gc_info) = gc_tracker.get(&region_id) {
435                    if let Some(last_full_listing) = gc_info.last_full_listing_time {
436                        // check if pass cooling down interval after last full listing
437                        let elapsed = now.duration_since(last_full_listing);
438                        elapsed >= self.config.full_file_listing_interval
439                    } else {
440                        // Never did full listing for this region, do it now
441                        true
442                    }
443                } else {
444                    // First time GC for this region, skip doing full listing, for this time
445                    gc_tracker.insert(
446                        region_id,
447                        RegionGcInfo {
448                            last_gc_time: now,
449                            last_full_listing_time: Some(now),
450                        },
451                    );
452                    false
453                }
454            };
455            result.insert(region_id, use_full_listing);
456        }
457        result
458    }
459}