meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30    Result as ProcedureResult, Status,
31};
32use common_telemetry::tracing::Instrument as _;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, info, warn};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use serde::{Deserialize, Serialize};
38use snafu::ResultExt as _;
39use store_api::storage::{FileRefsManifest, GcReport, RegionId};
40use table::metadata::TableId;
41
42use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
43use crate::gc::util::table_route_to_region;
44use crate::gc::{Peer2Regions, Region2Peers};
45use crate::handler::HeartbeatMailbox;
46use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
47use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
48
49async fn send_get_file_refs_inner(
50    mailbox: &MailboxRef,
51    server_addr: &str,
52    peer: &Peer,
53    instruction: GetFileRefs,
54    timeout: Duration,
55) -> Result<MailboxReceiver> {
56    let instruction = instruction::Instruction::GetFileRefs(instruction);
57    let tracing_ctx = TracingContext::from_current_span();
58    let msg = MailboxMessage::json_message(
59        &format!("Get file references: {}", instruction),
60        &format!("Metasrv@{}", server_addr),
61        &format!("Datanode-{}@{}", peer.id, peer.addr),
62        common_time::util::current_time_millis(),
63        &instruction,
64        Some(tracing_ctx.to_w3c()),
65    )
66    .with_context(|_| SerializeToJsonSnafu {
67        input: instruction.to_string(),
68    })?;
69
70    mailbox
71        .send(&Channel::Datanode(peer.id), msg, timeout)
72        .await
73}
74
75async fn recv_get_file_refs_reply(
76    peer: &Peer,
77    mailbox_rx: MailboxReceiver,
78) -> Result<GetFileRefsReply> {
79    let reply = match mailbox_rx.await {
80        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
81        Err(e) => {
82            error!(
83                e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
84                peer,
85            );
86            return Err(e);
87        }
88    };
89
90    let InstructionReply::GetFileRefs(reply) = reply else {
91        return error::UnexpectedInstructionReplySnafu {
92            mailbox_message: format!("{:?}", reply),
93            reason: "Unexpected reply of the GetFileRefs instruction",
94        }
95        .fail();
96    };
97
98    Ok(reply)
99}
100
101async fn send_gc_regions_inner(
102    mailbox: &MailboxRef,
103    peer: &Peer,
104    gc_regions: &GcRegions,
105    server_addr: &str,
106    timeout: Duration,
107    description: &str,
108) -> Result<MailboxReceiver> {
109    let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
110    let tracing_ctx = TracingContext::from_current_span();
111    let msg = MailboxMessage::json_message(
112        &format!("{}: {}", description, instruction),
113        &format!("Metasrv@{}", server_addr),
114        &format!("Datanode-{}@{}", peer.id, peer.addr),
115        common_time::util::current_time_millis(),
116        &instruction,
117        Some(tracing_ctx.to_w3c()),
118    )
119    .with_context(|_| SerializeToJsonSnafu {
120        input: instruction.to_string(),
121    })?;
122
123    mailbox
124        .send(&Channel::Datanode(peer.id), msg, timeout)
125        .await
126}
127
128async fn recv_gc_regions_reply(
129    peer: &Peer,
130    gc_regions: &GcRegions,
131    description: &str,
132    mailbox_rx: MailboxReceiver,
133) -> Result<GcReport> {
134    let reply = match mailbox_rx.await {
135        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
136        Err(e) => {
137            error!(
138                e; "Failed to receive reply from datanode {} for {}",
139                peer, description
140            );
141            return Err(e);
142        }
143    };
144
145    let InstructionReply::GcRegions(reply) = reply else {
146        return error::UnexpectedInstructionReplySnafu {
147            mailbox_message: format!("{:?}", reply),
148            reason: "Unexpected reply of the GcRegions instruction",
149        }
150        .fail();
151    };
152
153    let res = reply.result;
154    match res {
155        Ok(report) => Ok(report),
156        Err(e) => {
157            error!(
158                e; "Datanode {} reported error during GC for regions {:?}",
159                peer, gc_regions
160            );
161            error::UnexpectedSnafu {
162                violated: format!(
163                    "Datanode {} reported error during GC for regions {:?}: {}",
164                    peer, gc_regions, e
165                ),
166            }
167            .fail()
168        }
169    }
170}
171
172/// Procedure to perform get file refs then batch GC for multiple regions,
173/// it holds locks for all regions during the whole procedure.
174pub struct BatchGcProcedure {
175    mailbox: MailboxRef,
176    table_metadata_manager: TableMetadataManagerRef,
177    data: BatchGcData,
178}
179
180#[derive(Serialize, Deserialize)]
181pub struct BatchGcData {
182    state: State,
183    /// Meta server address
184    server_addr: String,
185    /// The regions to be GC-ed
186    regions: Vec<RegionId>,
187    full_file_listing: bool,
188    region_routes: Region2Peers,
189    /// Routes assigned by the scheduler for regions missing from table routes.
190    #[serde(default)]
191    region_routes_override: Region2Peers,
192    /// Related regions (e.g., for shared files after repartition).
193    /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
194    related_regions: HashMap<RegionId, HashSet<RegionId>>,
195    /// Acquired file references (Populated in Acquiring state)
196    file_refs: FileRefsManifest,
197    /// mailbox timeout duration
198    timeout: Duration,
199    gc_report: Option<GcReport>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub enum State {
204    /// Initial state
205    Start,
206    /// Fetching file references from datanodes
207    Acquiring,
208    /// Sending GC instruction to the target datanode
209    Gcing,
210    /// Updating region repartition info in kvbackend after GC based on the GC result
211    UpdateRepartition,
212}
213
214impl BatchGcProcedure {
215    pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
216
217    pub fn new(
218        mailbox: MailboxRef,
219        table_metadata_manager: TableMetadataManagerRef,
220        server_addr: String,
221        regions: Vec<RegionId>,
222        full_file_listing: bool,
223        timeout: Duration,
224        region_routes_override: Region2Peers,
225    ) -> Self {
226        Self {
227            mailbox,
228            table_metadata_manager,
229            data: BatchGcData {
230                state: State::Start,
231                server_addr,
232                regions,
233                full_file_listing,
234                timeout,
235                region_routes: HashMap::new(),
236                region_routes_override,
237                related_regions: HashMap::new(),
238                file_refs: FileRefsManifest::default(),
239                gc_report: None,
240            },
241        }
242    }
243
244    /// Test-only constructor to jump directly into the repartition update state.
245    /// Intended for integration tests that validate `cleanup_region_repartition` without
246    /// running the full batch GC state machine.
247    #[cfg(feature = "mock")]
248    pub fn new_update_repartition_for_test(
249        mailbox: MailboxRef,
250        table_metadata_manager: TableMetadataManagerRef,
251        server_addr: String,
252        regions: Vec<RegionId>,
253        file_refs: FileRefsManifest,
254        timeout: Duration,
255    ) -> Self {
256        Self {
257            mailbox,
258            table_metadata_manager,
259            data: BatchGcData {
260                state: State::UpdateRepartition,
261                server_addr,
262                regions,
263                full_file_listing: false,
264                timeout,
265                region_routes: HashMap::new(),
266                region_routes_override: HashMap::new(),
267                related_regions: HashMap::new(),
268                file_refs,
269                gc_report: Some(GcReport::default()),
270            },
271        }
272    }
273
274    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
275        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
276            error::UnexpectedSnafu {
277                violated: format!(
278                    "Failed to downcast procedure result to GcReport, got {:?}",
279                    std::any::type_name_of_val(&res.as_ref())
280                ),
281            }
282            .build()
283        })
284    }
285
286    async fn get_table_route(
287        &self,
288        table_id: TableId,
289    ) -> Result<(TableId, PhysicalTableRouteValue)> {
290        self.table_metadata_manager
291            .table_route_manager()
292            .get_physical_table_route(table_id)
293            .await
294            .context(TableMetadataManagerSnafu)
295    }
296
297    /// Return related regions for the given regions.
298    /// The returned map uses the input region as key, and all other regions
299    /// from the same table as values (excluding the input region itself).
300    async fn find_related_regions(
301        &self,
302        regions: &[RegionId],
303    ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
304        let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
305        let table_ids = table_ids.into_iter().collect::<Vec<_>>();
306
307        let table_routes = self
308            .table_metadata_manager
309            .table_route_manager()
310            .batch_get_physical_table_routes(&table_ids)
311            .await
312            .context(TableMetadataManagerSnafu)?;
313
314        if table_routes.len() != table_ids.len() {
315            // batch_get_physical_table_routes returns a subset on misses; treat that as error
316            for table_id in &table_ids {
317                if !table_routes.contains_key(table_id) {
318                    // indicate is a logical table id
319                    return error::InvalidArgumentsSnafu {
320                    err_msg: format!(
321                        "Unexpected logical table route: table {} resolved to physical table regions",
322                        table_id
323                    ),
324                }
325                    .fail();
326                }
327            }
328        }
329
330        let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
331        for (table_id, table_route) in table_routes {
332            let all_regions: HashSet<RegionId> = table_route
333                .region_routes
334                .iter()
335                .map(|r| r.region.id)
336                .collect();
337
338            table_all_regions.insert(table_id, all_regions);
339        }
340
341        let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
342        for region_id in regions {
343            let table_id = region_id.table_id();
344            if let Some(all_regions) = table_all_regions.get(&table_id) {
345                let mut related: HashSet<RegionId> = all_regions.clone();
346                related.remove(region_id);
347                related_regions.insert(*region_id, related);
348            } else {
349                related_regions.insert(*region_id, Default::default());
350            }
351        }
352
353        Ok(related_regions)
354    }
355
356    /// Clean up region repartition info in kvbackend after GC
357    /// according to cross reference in `FileRefsManifest`.
358    async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
359        let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
360            HashMap::new();
361        for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
362            cross_refs_grouped
363                .entry(src_region.table_id())
364                .or_default()
365                .entry(*src_region)
366                .or_default()
367                .extend(dst_regions.iter().copied());
368        }
369
370        let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
371        for (src_region, refs) in &self.data.file_refs.file_refs {
372            if refs.is_empty() {
373                continue;
374            }
375
376            tmp_refs_grouped
377                .entry(src_region.table_id())
378                .or_default()
379                .insert(*src_region);
380        }
381
382        let repart_mgr = self.table_metadata_manager.table_repart_manager();
383
384        let mut table_ids: HashSet<TableId> = cross_refs_grouped
385            .keys()
386            .copied()
387            .chain(tmp_refs_grouped.keys().copied())
388            .collect();
389        table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
390
391        for table_id in table_ids {
392            let table_lock = TableLock::Write(table_id).into();
393            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
394
395            let cross_refs = cross_refs_grouped
396                .get(&table_id)
397                .cloned()
398                .unwrap_or_default();
399            let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
400
401            let current = repart_mgr
402                .get_with_raw_bytes(table_id)
403                .await
404                .context(KvBackendSnafu)?;
405
406            let mut new_value = current
407                .as_ref()
408                .map(|v| (**v).clone())
409                .unwrap_or_else(TableRepartValue::new);
410
411            // We only touch regions involved in this GC batch for the current table to avoid
412            // clobbering unrelated repart entries. Start from the batch regions of this table.
413            let batch_src_regions: HashSet<RegionId> = self
414                .data
415                .regions
416                .iter()
417                .copied()
418                .filter(|r| r.table_id() == table_id)
419                .collect();
420
421            // Merge targets: only the batch regions of this table. This avoids touching unrelated
422            // repart entries; we just reconcile mappings for regions involved in the current GC
423            // cycle for this table.
424            let all_src_regions: HashSet<RegionId> = batch_src_regions;
425
426            for src_region in all_src_regions {
427                let cross_dst = cross_refs.get(&src_region);
428                let has_tmp_ref = tmp_refs.contains(&src_region);
429
430                if let Some(dst_regions) = cross_dst {
431                    let mut set = BTreeSet::new();
432                    set.extend(dst_regions.iter().copied());
433                    new_value.src_to_dst.insert(src_region, set);
434                } else if has_tmp_ref {
435                    // Keep a tombstone entry with an empty set so dropped regions that still
436                    // have tmp refs are preserved; removing it would lose the repartition trace.
437                    new_value.src_to_dst.insert(src_region, BTreeSet::new());
438                } else {
439                    new_value.src_to_dst.remove(&src_region);
440                }
441            }
442
443            // If there is no repartition info to persist, skip creating/updating the key
444            if new_value.src_to_dst.is_empty() && current.is_none() {
445                continue;
446            }
447
448            repart_mgr
449                .upsert_value(table_id, current, &new_value)
450                .await
451                .context(KvBackendSnafu)?;
452        }
453
454        Ok(())
455    }
456
457    /// Discover region routes for the given regions.
458    async fn discover_route_for_regions(
459        &self,
460        regions: &[RegionId],
461    ) -> Result<(Region2Peers, Peer2Regions)> {
462        let mut region_to_peer = HashMap::new();
463        let mut peer_to_regions = HashMap::new();
464
465        // Group regions by table ID for batch processing
466        let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
467        for region_id in regions {
468            let table_id = region_id.table_id();
469            table_to_regions
470                .entry(table_id)
471                .or_default()
472                .push(*region_id);
473        }
474
475        // Process each table's regions together for efficiency
476        for (table_id, table_regions) in table_to_regions {
477            match self.get_table_route(table_id).await {
478                Ok((_phy_table_id, table_route)) => {
479                    table_route_to_region(
480                        &table_route,
481                        &table_regions,
482                        &mut region_to_peer,
483                        &mut peer_to_regions,
484                    );
485                }
486                Err(e) => {
487                    // Continue with other tables instead of failing completely
488                    // TODO(discord9): consider failing here instead
489                    warn!(
490                        "Failed to get table route for table {}: {}, skipping its regions",
491                        table_id, e
492                    );
493                    continue;
494                }
495            }
496        }
497
498        Ok((region_to_peer, peer_to_regions))
499    }
500
501    /// Set region routes and related regions for GC procedure
502    async fn set_routes_and_related_regions(&mut self) -> Result<()> {
503        let related_regions = self.find_related_regions(&self.data.regions).await?;
504
505        self.data.related_regions = related_regions.clone();
506
507        // Discover routes for all regions involved in GC, including both the
508        // primary GC regions and their related regions.
509        let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
510
511        regions_set.extend(related_regions.keys().cloned());
512        regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
513
514        let regions_to_discover = regions_set.into_iter().collect_vec();
515
516        let (mut region_to_peer, _) = self
517            .discover_route_for_regions(&regions_to_discover)
518            .await?;
519
520        for (region_id, route) in &self.data.region_routes_override {
521            region_to_peer
522                .entry(*region_id)
523                .or_insert_with(|| route.clone());
524        }
525
526        self.data.region_routes = region_to_peer;
527
528        Ok(())
529    }
530
531    /// Get file references from all datanodes that host the regions
532    async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
533        let region_count = self.data.regions.len();
534        self.set_routes_and_related_regions()
535            .instrument(common_telemetry::tracing::info_span!(
536                "meta_gc_procedure_prepare_routes",
537                region_count = region_count
538            ))
539            .await?;
540
541        let query_regions = &self.data.regions;
542        let related_regions = &self.data.related_regions;
543        let region_routes = &self.data.region_routes;
544        let timeout = self.data.timeout;
545        let dropped_regions = self
546            .data
547            .region_routes_override
548            .keys()
549            .collect::<HashSet<_>>();
550
551        // Group regions by datanode to minimize RPC calls
552        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
553
554        for region_id in query_regions {
555            if dropped_regions.contains(region_id) {
556                continue;
557            }
558            if let Some((leader, followers)) = region_routes.get(region_id) {
559                datanode2query_regions
560                    .entry(leader.clone())
561                    .or_default()
562                    .push(*region_id);
563                // also need to send for follower regions for file refs in case query is running on follower
564                for follower in followers {
565                    datanode2query_regions
566                        .entry(follower.clone())
567                        .or_default()
568                        .push(*region_id);
569                }
570            } else {
571                return error::UnexpectedSnafu {
572                    violated: format!(
573                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
574                    ),
575                }
576                .fail();
577            }
578        }
579
580        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
581            HashMap::new();
582        for (src_region, dst_regions) in related_regions {
583            for dst_region in dst_regions {
584                if let Some((leader, _followers)) = region_routes.get(dst_region) {
585                    datanode2related_regions
586                        .entry(leader.clone())
587                        .or_default()
588                        .entry(*src_region)
589                        .or_default()
590                        .insert(*dst_region);
591                } // since read from manifest, no need to send to followers
592            }
593        }
594
595        // Send GetFileRefs instructions to each datanode
596        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
597        let mut all_manifest_versions = HashMap::new();
598        let mut all_cross_region_refs = HashMap::new();
599
600        let mut peers = HashSet::new();
601        peers.extend(datanode2query_regions.keys().cloned());
602        peers.extend(datanode2related_regions.keys().cloned());
603
604        let mailbox = &self.mailbox;
605        let server_addr = &self.data.server_addr;
606        let mut tasks = Vec::new();
607
608        for peer in peers {
609            let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
610            let related_regions_for_peer =
611                datanode2related_regions.remove(&peer).unwrap_or_default();
612
613            if regions.is_empty() && related_regions_for_peer.is_empty() {
614                continue;
615            }
616
617            tasks.push(async move {
618                let instruction = GetFileRefs {
619                    query_regions: regions.clone(),
620                    related_regions: related_regions_for_peer.clone(),
621                };
622
623                let reply =
624                    send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout)
625                        .await;
626
627                (peer, regions, related_regions_for_peer, reply)
628            });
629        }
630
631        let mut recv_tasks = Vec::new();
632        // store error to make sure metrics doesn't ignore other peers
633        let mut first_error = None;
634        let mut record_get_file_refs_error = |e| {
635            METRIC_META_GC_DATANODE_CALLS_TOTAL
636                .with_label_values(&["get_file_refs", "error"])
637                .inc();
638            if first_error.is_none() {
639                first_error = Some(e);
640            }
641        };
642        for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await {
643            match reply {
644                Ok(mailbox_rx) => {
645                    recv_tasks.push(async move {
646                        let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await;
647                        (peer, regions, related_regions_for_peer, reply)
648                    });
649                }
650                Err(e) => record_get_file_refs_error(e),
651            }
652        }
653
654        let replies = join_all(recv_tasks).await;
655
656        for (peer, regions, related_regions_for_peer, reply) in replies {
657            let reply = match reply {
658                Ok(reply) => reply,
659                Err(e) => {
660                    record_get_file_refs_error(e);
661                    continue;
662                }
663            };
664            debug!(
665                "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
666                peer, regions, related_regions_for_peer, reply
667            );
668
669            if !reply.success {
670                METRIC_META_GC_DATANODE_CALLS_TOTAL
671                    .with_label_values(&["get_file_refs", "error"])
672                    .inc();
673                let err = error::UnexpectedSnafu {
674                    violated: format!(
675                        "Failed to get file references from datanode {}: {:?}",
676                        peer, reply.error
677                    ),
678                }
679                .build();
680                record_get_file_refs_error(err);
681                continue;
682            }
683            METRIC_META_GC_DATANODE_CALLS_TOTAL
684                .with_label_values(&["get_file_refs", "success"])
685                .inc();
686
687            // Merge the file references from this datanode
688            for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
689                all_file_refs
690                    .entry(region_id)
691                    .or_default()
692                    .extend(file_refs);
693            }
694
695            // region manifest version should be the smallest one among all peers, so outdated region can be detected
696            for (region_id, version) in reply.file_refs_manifest.manifest_version {
697                let entry = all_manifest_versions.entry(region_id).or_insert(version);
698                *entry = (*entry).min(version);
699            }
700
701            for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
702                let entry = all_cross_region_refs
703                    .entry(region_id)
704                    .or_insert_with(HashSet::new);
705                entry.extend(related_region_ids);
706            }
707        }
708
709        if let Some(e) = first_error {
710            return Err(e);
711        }
712
713        Ok(FileRefsManifest {
714            file_refs: all_file_refs,
715            manifest_version: all_manifest_versions,
716            cross_region_refs: all_cross_region_refs,
717        })
718    }
719
720    /// Send GC instruction to all datanodes that host the regions,
721    /// returns regions that need retry.
722    async fn send_gc_instructions(&self) -> Result<GcReport> {
723        let regions = &self.data.regions;
724        let region_routes = &self.data.region_routes;
725        let file_refs = &self.data.file_refs;
726        let timeout = self.data.timeout;
727
728        // Group regions by datanode
729        let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
730        let mut all_report = GcReport::default();
731
732        for region_id in regions {
733            if let Some((leader, _followers)) = region_routes.get(region_id) {
734                datanode2regions
735                    .entry(leader.clone())
736                    .or_default()
737                    .push(*region_id);
738            } else {
739                return error::UnexpectedSnafu {
740                    violated: format!(
741                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
742                    ),
743                }
744                .fail();
745            }
746        }
747
748        let mut all_need_retry = HashSet::new();
749        let mailbox = &self.mailbox;
750        let server_addr = self.data.server_addr.as_str();
751        let full_file_listing = self.data.full_file_listing;
752        let tasks = datanode2regions
753            .into_iter()
754            .map(|(peer, regions_for_peer)| {
755                let gc_regions = GcRegions {
756                    regions: regions_for_peer.clone(),
757                    // file_refs_manifest could be somewhere large. But still intentionally clone per datanode here:
758                    // this path is admin-triggered or scheduler-triggered, peer count is expected to be bounded, and
759                    // and abnormal manifest growth should be addressed at the source
760                    file_refs_manifest: file_refs.clone(),
761                    full_file_listing,
762                };
763                let region_count = gc_regions.regions.len() as u64;
764
765                async move {
766                    let report = send_gc_regions_inner(
767                        mailbox,
768                        &peer,
769                        &gc_regions,
770                        server_addr,
771                        timeout,
772                        "Batch GC",
773                    )
774                    .await;
775
776                    (peer, gc_regions, region_count, report)
777                }
778            });
779
780        let mut recv_tasks = Vec::new();
781        let mut first_error = None;
782        let mut record_gc_error = |e, region_count| {
783            METRIC_META_GC_DATANODE_CALLS_TOTAL
784                .with_label_values(&["gc_regions", "error"])
785                .inc();
786            if region_count > 0 {
787                METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count);
788            }
789            if first_error.is_none() {
790                first_error = Some(e);
791            }
792        };
793        for (peer, gc_regions, region_count, report) in join_all(tasks).await {
794            match report {
795                Ok(mailbox_rx) => {
796                    recv_tasks.push(async move {
797                        let report =
798                            recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await;
799                        (peer, region_count, report)
800                    });
801                }
802                Err(e) => record_gc_error(e, region_count),
803            }
804        }
805
806        for (peer, region_count, report) in join_all(recv_tasks).await {
807            let report = match report {
808                Ok(report) => {
809                    METRIC_META_GC_DATANODE_CALLS_TOTAL
810                        .with_label_values(&["gc_regions", "success"])
811                        .inc();
812                    let need_retry_count = report.need_retry_regions.len() as u64;
813                    if need_retry_count > 0 {
814                        METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
815                    }
816                    report
817                }
818                Err(e) => {
819                    record_gc_error(e, region_count);
820                    continue;
821                }
822            };
823
824            let success = report.deleted_files.keys().collect_vec();
825            let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
826
827            if need_retry.is_empty() {
828                info!(
829                    "GC report from datanode {}: successfully deleted files for regions {:?}",
830                    peer, success
831                );
832            } else {
833                warn!(
834                    "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
835                    peer, success, need_retry
836                );
837            }
838            all_need_retry.extend(report.need_retry_regions.clone());
839            all_report.merge(report);
840        }
841
842        if let Some(e) = first_error {
843            return Err(e);
844        }
845
846        if !all_need_retry.is_empty() {
847            warn!("Regions need retry after batch GC: {:?}", all_need_retry);
848        }
849
850        Ok(all_report)
851    }
852}
853
854#[async_trait::async_trait]
855impl Procedure for BatchGcProcedure {
856    fn type_name(&self) -> &str {
857        Self::TYPE_NAME
858    }
859
860    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
861        match self.data.state {
862            State::Start => {
863                let _regions_span = common_telemetry::tracing::debug_span!(
864                    "meta_gc_procedure_regions",
865                    state = "start",
866                    regions = ?self.data.regions
867                )
868                .entered();
869                info!(
870                    "Batch GC procedure transitioning from Start to Acquiring for {} regions",
871                    self.data.regions.len()
872                );
873                // Transition to Acquiring state
874                self.data.state = State::Acquiring;
875                Ok(Status::executing(false))
876            }
877            State::Acquiring => {
878                let region_count = self.data.regions.len();
879                let full_file_listing = self.data.full_file_listing;
880                let regions = self.data.regions.clone();
881                info!(
882                    "Batch GC procedure acquiring file references for {} regions",
883                    region_count
884                );
885                // Get file references from all datanodes
886                match self
887                    .get_file_references()
888                    .instrument(common_telemetry::tracing::debug_span!(
889                        "meta_gc_procedure_regions",
890                        state = "acquiring",
891                        regions = ?regions
892                    ))
893                    .instrument(common_telemetry::tracing::info_span!(
894                        "meta_gc_procedure_get_file_references",
895                        region_count = region_count,
896                        full_file_listing = full_file_listing
897                    ))
898                    .await
899                {
900                    Ok(file_refs) => {
901                        info!(
902                            "Batch GC procedure acquired file references for {} regions",
903                            file_refs.file_refs.len()
904                        );
905                        self.data.file_refs = file_refs;
906                        self.data.state = State::Gcing;
907                        Ok(Status::executing(false))
908                    }
909                    Err(e) => {
910                        error!(e; "Failed to get file references");
911                        Err(ProcedureError::external(e))
912                    }
913                }
914            }
915            State::Gcing => {
916                info!(
917                    "Batch GC procedure sending GC instructions for {} regions",
918                    self.data.regions.len()
919                );
920                // Send GC instructions to all datanodes
921                // TODO(discord9): handle need-retry regions
922                match self
923                    .send_gc_instructions()
924                    .instrument(common_telemetry::tracing::debug_span!(
925                        "meta_gc_procedure_regions",
926                        state = "gcing",
927                        regions = ?self.data.regions
928                    ))
929                    .instrument(common_telemetry::tracing::info_span!(
930                        "meta_gc_procedure_send_gc_instructions",
931                        region_count = self.data.regions.len(),
932                        full_file_listing = self.data.full_file_listing
933                    ))
934                    .await
935                {
936                    Ok(report) => {
937                        info!(
938                            "Batch GC procedure received GC report, retry region count: {}",
939                            report.need_retry_regions.len()
940                        );
941                        self.data.state = State::UpdateRepartition;
942                        self.data.gc_report = Some(report);
943                        Ok(Status::executing(false))
944                    }
945                    Err(e) => {
946                        error!(e; "Failed to send GC instructions");
947                        Err(ProcedureError::external(e))
948                    }
949                }
950            }
951            State::UpdateRepartition => match self
952                .cleanup_region_repartition(ctx)
953                .instrument(common_telemetry::tracing::debug_span!(
954                    "meta_gc_procedure_regions",
955                    state = "update_repartition",
956                    regions = ?self.data.regions
957                ))
958                .instrument(common_telemetry::tracing::info_span!(
959                    "meta_gc_procedure_update_repartition",
960                    region_count = self.data.regions.len()
961                ))
962                .await
963            {
964                Ok(()) => {
965                    debug!(
966                        "Cleanup region repartition info completed successfully for regions {:?}",
967                        self.data.regions
968                    );
969                    info!(
970                        "Batch GC completed successfully for regions {:?}",
971                        self.data.regions
972                    );
973                    let Some(report) = self.data.gc_report.take() else {
974                        return common_procedure::error::UnexpectedSnafu {
975                            err_msg: "GC report should be present after GC completion".to_string(),
976                        }
977                        .fail();
978                    };
979                    info!("GC report: {:?}", report);
980                    Ok(Status::done_with_output(report))
981                }
982                Err(e) => {
983                    error!(e; "Failed to cleanup region repartition info");
984                    Err(ProcedureError::external(e))
985                }
986            },
987        }
988    }
989
990    fn dump(&self) -> ProcedureResult<String> {
991        serde_json::to_string(&self.data).context(ToJsonSnafu)
992    }
993
994    /// Read lock all regions involved in this GC procedure.
995    /// So i.e. region migration won't happen during GC and cause race conditions.
996    fn lock_key(&self) -> LockKey {
997        let lock_key: Vec<_> = self
998            .data
999            .regions
1000            .iter()
1001            .sorted() // sort to have a deterministic lock order
1002            .map(|id| RegionLock::Read(*id).into())
1003            .collect();
1004
1005        LockKey::new(lock_key)
1006    }
1007}