meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30    Result as ProcedureResult, Status,
31};
32use common_telemetry::{debug, error, info, warn};
33use itertools::Itertools as _;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt as _;
36use store_api::storage::{FileRefsManifest, GcReport, RegionId};
37use table::metadata::TableId;
38
39use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
40use crate::gc::util::table_route_to_region;
41use crate::gc::{Peer2Regions, Region2Peers};
42use crate::handler::HeartbeatMailbox;
43use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
44use crate::service::mailbox::{Channel, MailboxRef};
45
46/// Helper function to send GetFileRefs instruction and wait for reply.
47async fn send_get_file_refs(
48    mailbox: &MailboxRef,
49    server_addr: &str,
50    peer: &Peer,
51    instruction: GetFileRefs,
52    timeout: Duration,
53) -> Result<GetFileRefsReply> {
54    let result = send_get_file_refs_inner(mailbox, server_addr, peer, instruction, timeout).await;
55
56    match &result {
57        Ok(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
58            .with_label_values(&["get_file_refs", "success"])
59            .inc(),
60        Err(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
61            .with_label_values(&["get_file_refs", "error"])
62            .inc(),
63    }
64
65    result
66}
67
68async fn send_get_file_refs_inner(
69    mailbox: &MailboxRef,
70    server_addr: &str,
71    peer: &Peer,
72    instruction: GetFileRefs,
73    timeout: Duration,
74) -> Result<GetFileRefsReply> {
75    let instruction = instruction::Instruction::GetFileRefs(instruction);
76    let msg = MailboxMessage::json_message(
77        &format!("Get file references: {}", instruction),
78        &format!("Metasrv@{}", server_addr),
79        &format!("Datanode-{}@{}", peer.id, peer.addr),
80        common_time::util::current_time_millis(),
81        &instruction,
82    )
83    .with_context(|_| SerializeToJsonSnafu {
84        input: instruction.to_string(),
85    })?;
86
87    let mailbox_rx = mailbox
88        .send(&Channel::Datanode(peer.id), msg, timeout)
89        .await?;
90
91    let reply = match mailbox_rx.await {
92        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
93        Err(e) => {
94            error!(
95                e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
96                peer,
97            );
98            return Err(e);
99        }
100    };
101
102    let InstructionReply::GetFileRefs(reply) = reply else {
103        return error::UnexpectedInstructionReplySnafu {
104            mailbox_message: format!("{:?}", reply),
105            reason: "Unexpected reply of the GetFileRefs instruction",
106        }
107        .fail();
108    };
109
110    Ok(reply)
111}
112
113/// Helper function to send GcRegions instruction and wait for reply.
114async fn send_gc_regions(
115    mailbox: &MailboxRef,
116    peer: &Peer,
117    gc_regions: GcRegions,
118    server_addr: &str,
119    timeout: Duration,
120    description: &str,
121) -> Result<GcReport> {
122    let failed_region_count = gc_regions.regions.len() as u64;
123    let result =
124        send_gc_regions_inner(mailbox, peer, gc_regions, server_addr, timeout, description).await;
125
126    match result {
127        Ok(report) => {
128            METRIC_META_GC_DATANODE_CALLS_TOTAL
129                .with_label_values(&["gc_regions", "success"])
130                .inc();
131
132            let need_retry_count = report.need_retry_regions.len() as u64;
133            if need_retry_count > 0 {
134                METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
135            }
136
137            Ok(report)
138        }
139        Err(e) => {
140            METRIC_META_GC_DATANODE_CALLS_TOTAL
141                .with_label_values(&["gc_regions", "error"])
142                .inc();
143            if failed_region_count > 0 {
144                METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(failed_region_count);
145            }
146            Err(e)
147        }
148    }
149}
150
151async fn send_gc_regions_inner(
152    mailbox: &MailboxRef,
153    peer: &Peer,
154    gc_regions: GcRegions,
155    server_addr: &str,
156    timeout: Duration,
157    description: &str,
158) -> Result<GcReport> {
159    let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
160    let msg = MailboxMessage::json_message(
161        &format!("{}: {}", description, instruction),
162        &format!("Metasrv@{}", server_addr),
163        &format!("Datanode-{}@{}", peer.id, peer.addr),
164        common_time::util::current_time_millis(),
165        &instruction,
166    )
167    .with_context(|_| SerializeToJsonSnafu {
168        input: instruction.to_string(),
169    })?;
170
171    let mailbox_rx = mailbox
172        .send(&Channel::Datanode(peer.id), msg, timeout)
173        .await?;
174
175    let reply = match mailbox_rx.await {
176        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
177        Err(e) => {
178            error!(
179                e; "Failed to receive reply from datanode {} for {}",
180                peer, description
181            );
182            return Err(e);
183        }
184    };
185
186    let InstructionReply::GcRegions(reply) = reply else {
187        return error::UnexpectedInstructionReplySnafu {
188            mailbox_message: format!("{:?}", reply),
189            reason: "Unexpected reply of the GcRegions instruction",
190        }
191        .fail();
192    };
193
194    let res = reply.result;
195    match res {
196        Ok(report) => Ok(report),
197        Err(e) => {
198            error!(
199                e; "Datanode {} reported error during GC for regions {:?}",
200                peer, gc_regions
201            );
202            error::UnexpectedSnafu {
203                violated: format!(
204                    "Datanode {} reported error during GC for regions {:?}: {}",
205                    peer, gc_regions, e
206                ),
207            }
208            .fail()
209        }
210    }
211}
212
213/// Procedure to perform get file refs then batch GC for multiple regions,
214/// it holds locks for all regions during the whole procedure.
215pub struct BatchGcProcedure {
216    mailbox: MailboxRef,
217    table_metadata_manager: TableMetadataManagerRef,
218    data: BatchGcData,
219}
220
221#[derive(Serialize, Deserialize)]
222pub struct BatchGcData {
223    state: State,
224    /// Meta server address
225    server_addr: String,
226    /// The regions to be GC-ed
227    regions: Vec<RegionId>,
228    full_file_listing: bool,
229    region_routes: Region2Peers,
230    /// Routes assigned by the scheduler for regions missing from table routes.
231    #[serde(default)]
232    region_routes_override: Region2Peers,
233    /// Related regions (e.g., for shared files after repartition).
234    /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
235    related_regions: HashMap<RegionId, HashSet<RegionId>>,
236    /// Acquired file references (Populated in Acquiring state)
237    file_refs: FileRefsManifest,
238    /// mailbox timeout duration
239    timeout: Duration,
240    gc_report: Option<GcReport>,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
244pub enum State {
245    /// Initial state
246    Start,
247    /// Fetching file references from datanodes
248    Acquiring,
249    /// Sending GC instruction to the target datanode
250    Gcing,
251    /// Updating region repartition info in kvbackend after GC based on the GC result
252    UpdateRepartition,
253}
254
255impl BatchGcProcedure {
256    pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
257
258    pub fn new(
259        mailbox: MailboxRef,
260        table_metadata_manager: TableMetadataManagerRef,
261        server_addr: String,
262        regions: Vec<RegionId>,
263        full_file_listing: bool,
264        timeout: Duration,
265        region_routes_override: Region2Peers,
266    ) -> Self {
267        Self {
268            mailbox,
269            table_metadata_manager,
270            data: BatchGcData {
271                state: State::Start,
272                server_addr,
273                regions,
274                full_file_listing,
275                timeout,
276                region_routes: HashMap::new(),
277                region_routes_override,
278                related_regions: HashMap::new(),
279                file_refs: FileRefsManifest::default(),
280                gc_report: None,
281            },
282        }
283    }
284
285    /// Test-only constructor to jump directly into the repartition update state.
286    /// Intended for integration tests that validate `cleanup_region_repartition` without
287    /// running the full batch GC state machine.
288    #[cfg(feature = "mock")]
289    pub fn new_update_repartition_for_test(
290        mailbox: MailboxRef,
291        table_metadata_manager: TableMetadataManagerRef,
292        server_addr: String,
293        regions: Vec<RegionId>,
294        file_refs: FileRefsManifest,
295        timeout: Duration,
296    ) -> Self {
297        Self {
298            mailbox,
299            table_metadata_manager,
300            data: BatchGcData {
301                state: State::UpdateRepartition,
302                server_addr,
303                regions,
304                full_file_listing: false,
305                timeout,
306                region_routes: HashMap::new(),
307                region_routes_override: HashMap::new(),
308                related_regions: HashMap::new(),
309                file_refs,
310                gc_report: Some(GcReport::default()),
311            },
312        }
313    }
314
315    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
316        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
317            error::UnexpectedSnafu {
318                violated: format!(
319                    "Failed to downcast procedure result to GcReport, got {:?}",
320                    std::any::type_name_of_val(&res.as_ref())
321                ),
322            }
323            .build()
324        })
325    }
326
327    async fn get_table_route(
328        &self,
329        table_id: TableId,
330    ) -> Result<(TableId, PhysicalTableRouteValue)> {
331        self.table_metadata_manager
332            .table_route_manager()
333            .get_physical_table_route(table_id)
334            .await
335            .context(TableMetadataManagerSnafu)
336    }
337
338    /// Return related regions for the given regions.
339    /// The returned map uses the input region as key, and all other regions
340    /// from the same table as values (excluding the input region itself).
341    async fn find_related_regions(
342        &self,
343        regions: &[RegionId],
344    ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
345        let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
346        let table_ids = table_ids.into_iter().collect::<Vec<_>>();
347
348        let table_routes = self
349            .table_metadata_manager
350            .table_route_manager()
351            .batch_get_physical_table_routes(&table_ids)
352            .await
353            .context(TableMetadataManagerSnafu)?;
354
355        if table_routes.len() != table_ids.len() {
356            // batch_get_physical_table_routes returns a subset on misses; treat that as error
357            for table_id in &table_ids {
358                if !table_routes.contains_key(table_id) {
359                    // indicate is a logical table id
360                    return error::InvalidArgumentsSnafu {
361                    err_msg: format!(
362                        "Unexpected logical table route: table {} resolved to physical table regions",
363                        table_id
364                    ),
365                }
366                    .fail();
367                }
368            }
369        }
370
371        let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
372        for (table_id, table_route) in table_routes {
373            let all_regions: HashSet<RegionId> = table_route
374                .region_routes
375                .iter()
376                .map(|r| r.region.id)
377                .collect();
378
379            table_all_regions.insert(table_id, all_regions);
380        }
381
382        let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
383        for region_id in regions {
384            let table_id = region_id.table_id();
385            if let Some(all_regions) = table_all_regions.get(&table_id) {
386                let mut related: HashSet<RegionId> = all_regions.clone();
387                related.remove(region_id);
388                related_regions.insert(*region_id, related);
389            } else {
390                related_regions.insert(*region_id, Default::default());
391            }
392        }
393
394        Ok(related_regions)
395    }
396
397    /// Clean up region repartition info in kvbackend after GC
398    /// according to cross reference in `FileRefsManifest`.
399    async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
400        let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
401            HashMap::new();
402        for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
403            cross_refs_grouped
404                .entry(src_region.table_id())
405                .or_default()
406                .entry(*src_region)
407                .or_default()
408                .extend(dst_regions.iter().copied());
409        }
410
411        let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
412        for (src_region, refs) in &self.data.file_refs.file_refs {
413            if refs.is_empty() {
414                continue;
415            }
416
417            tmp_refs_grouped
418                .entry(src_region.table_id())
419                .or_default()
420                .insert(*src_region);
421        }
422
423        let repart_mgr = self.table_metadata_manager.table_repart_manager();
424
425        let mut table_ids: HashSet<TableId> = cross_refs_grouped
426            .keys()
427            .copied()
428            .chain(tmp_refs_grouped.keys().copied())
429            .collect();
430        table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
431
432        for table_id in table_ids {
433            let table_lock = TableLock::Write(table_id).into();
434            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
435
436            let cross_refs = cross_refs_grouped
437                .get(&table_id)
438                .cloned()
439                .unwrap_or_default();
440            let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
441
442            let current = repart_mgr
443                .get_with_raw_bytes(table_id)
444                .await
445                .context(KvBackendSnafu)?;
446
447            let mut new_value = current
448                .as_ref()
449                .map(|v| (**v).clone())
450                .unwrap_or_else(TableRepartValue::new);
451
452            // We only touch regions involved in this GC batch for the current table to avoid
453            // clobbering unrelated repart entries. Start from the batch regions of this table.
454            let batch_src_regions: HashSet<RegionId> = self
455                .data
456                .regions
457                .iter()
458                .copied()
459                .filter(|r| r.table_id() == table_id)
460                .collect();
461
462            // Merge targets: only the batch regions of this table. This avoids touching unrelated
463            // repart entries; we just reconcile mappings for regions involved in the current GC
464            // cycle for this table.
465            let all_src_regions: HashSet<RegionId> = batch_src_regions;
466
467            for src_region in all_src_regions {
468                let cross_dst = cross_refs.get(&src_region);
469                let has_tmp_ref = tmp_refs.contains(&src_region);
470
471                if let Some(dst_regions) = cross_dst {
472                    let mut set = BTreeSet::new();
473                    set.extend(dst_regions.iter().copied());
474                    new_value.src_to_dst.insert(src_region, set);
475                } else if has_tmp_ref {
476                    // Keep a tombstone entry with an empty set so dropped regions that still
477                    // have tmp refs are preserved; removing it would lose the repartition trace.
478                    new_value.src_to_dst.insert(src_region, BTreeSet::new());
479                } else {
480                    new_value.src_to_dst.remove(&src_region);
481                }
482            }
483
484            // If there is no repartition info to persist, skip creating/updating the key
485            if new_value.src_to_dst.is_empty() && current.is_none() {
486                continue;
487            }
488
489            repart_mgr
490                .upsert_value(table_id, current, &new_value)
491                .await
492                .context(KvBackendSnafu)?;
493        }
494
495        Ok(())
496    }
497
498    /// Discover region routes for the given regions.
499    async fn discover_route_for_regions(
500        &self,
501        regions: &[RegionId],
502    ) -> Result<(Region2Peers, Peer2Regions)> {
503        let mut region_to_peer = HashMap::new();
504        let mut peer_to_regions = HashMap::new();
505
506        // Group regions by table ID for batch processing
507        let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
508        for region_id in regions {
509            let table_id = region_id.table_id();
510            table_to_regions
511                .entry(table_id)
512                .or_default()
513                .push(*region_id);
514        }
515
516        // Process each table's regions together for efficiency
517        for (table_id, table_regions) in table_to_regions {
518            match self.get_table_route(table_id).await {
519                Ok((_phy_table_id, table_route)) => {
520                    table_route_to_region(
521                        &table_route,
522                        &table_regions,
523                        &mut region_to_peer,
524                        &mut peer_to_regions,
525                    );
526                }
527                Err(e) => {
528                    // Continue with other tables instead of failing completely
529                    // TODO(discord9): consider failing here instead
530                    warn!(
531                        "Failed to get table route for table {}: {}, skipping its regions",
532                        table_id, e
533                    );
534                    continue;
535                }
536            }
537        }
538
539        Ok((region_to_peer, peer_to_regions))
540    }
541
542    /// Set region routes and related regions for GC procedure
543    async fn set_routes_and_related_regions(&mut self) -> Result<()> {
544        let related_regions = self.find_related_regions(&self.data.regions).await?;
545
546        self.data.related_regions = related_regions.clone();
547
548        // Discover routes for all regions involved in GC, including both the
549        // primary GC regions and their related regions.
550        let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
551
552        regions_set.extend(related_regions.keys().cloned());
553        regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
554
555        let regions_to_discover = regions_set.into_iter().collect_vec();
556
557        let (mut region_to_peer, _) = self
558            .discover_route_for_regions(&regions_to_discover)
559            .await?;
560
561        for (region_id, route) in &self.data.region_routes_override {
562            region_to_peer
563                .entry(*region_id)
564                .or_insert_with(|| route.clone());
565        }
566
567        self.data.region_routes = region_to_peer;
568
569        Ok(())
570    }
571
572    /// Get file references from all datanodes that host the regions
573    async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
574        self.set_routes_and_related_regions().await?;
575
576        let query_regions = &self.data.regions;
577        let related_regions = &self.data.related_regions;
578        let region_routes = &self.data.region_routes;
579        let timeout = self.data.timeout;
580        let dropped_regions = self
581            .data
582            .region_routes_override
583            .keys()
584            .collect::<HashSet<_>>();
585
586        // Group regions by datanode to minimize RPC calls
587        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
588
589        for region_id in query_regions {
590            if dropped_regions.contains(region_id) {
591                continue;
592            }
593            if let Some((leader, followers)) = region_routes.get(region_id) {
594                datanode2query_regions
595                    .entry(leader.clone())
596                    .or_default()
597                    .push(*region_id);
598                // also need to send for follower regions for file refs in case query is running on follower
599                for follower in followers {
600                    datanode2query_regions
601                        .entry(follower.clone())
602                        .or_default()
603                        .push(*region_id);
604                }
605            } else {
606                return error::UnexpectedSnafu {
607                    violated: format!(
608                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
609                    ),
610                }
611                .fail();
612            }
613        }
614
615        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
616            HashMap::new();
617        for (src_region, dst_regions) in related_regions {
618            for dst_region in dst_regions {
619                if let Some((leader, _followers)) = region_routes.get(dst_region) {
620                    datanode2related_regions
621                        .entry(leader.clone())
622                        .or_default()
623                        .entry(*src_region)
624                        .or_default()
625                        .insert(*dst_region);
626                } // since read from manifest, no need to send to followers
627            }
628        }
629
630        // Send GetFileRefs instructions to each datanode
631        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
632        let mut all_manifest_versions = HashMap::new();
633        let mut all_cross_region_refs = HashMap::new();
634
635        let mut peers = HashSet::new();
636        peers.extend(datanode2query_regions.keys().cloned());
637        peers.extend(datanode2related_regions.keys().cloned());
638
639        for peer in peers {
640            let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
641            let related_regions_for_peer =
642                datanode2related_regions.remove(&peer).unwrap_or_default();
643
644            if regions.is_empty() && related_regions_for_peer.is_empty() {
645                continue;
646            }
647
648            let instruction = GetFileRefs {
649                query_regions: regions.clone(),
650                related_regions: related_regions_for_peer.clone(),
651            };
652
653            let reply = send_get_file_refs(
654                &self.mailbox,
655                &self.data.server_addr,
656                &peer,
657                instruction,
658                timeout,
659            )
660            .await?;
661            debug!(
662                "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
663                peer, regions, related_regions_for_peer, reply
664            );
665
666            if !reply.success {
667                return error::UnexpectedSnafu {
668                    violated: format!(
669                        "Failed to get file references from datanode {}: {:?}",
670                        peer, reply.error
671                    ),
672                }
673                .fail();
674            }
675
676            // Merge the file references from this datanode
677            for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
678                all_file_refs
679                    .entry(region_id)
680                    .or_default()
681                    .extend(file_refs);
682            }
683
684            // region manifest version should be the smallest one among all peers, so outdated region can be detected
685            for (region_id, version) in reply.file_refs_manifest.manifest_version {
686                let entry = all_manifest_versions.entry(region_id).or_insert(version);
687                *entry = (*entry).min(version);
688            }
689
690            for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
691                let entry = all_cross_region_refs
692                    .entry(region_id)
693                    .or_insert_with(HashSet::new);
694                entry.extend(related_region_ids);
695            }
696        }
697
698        Ok(FileRefsManifest {
699            file_refs: all_file_refs,
700            manifest_version: all_manifest_versions,
701            cross_region_refs: all_cross_region_refs,
702        })
703    }
704
705    /// Send GC instruction to all datanodes that host the regions,
706    /// returns regions that need retry.
707    async fn send_gc_instructions(&self) -> Result<GcReport> {
708        let regions = &self.data.regions;
709        let region_routes = &self.data.region_routes;
710        let file_refs = &self.data.file_refs;
711        let timeout = self.data.timeout;
712
713        // Group regions by datanode
714        let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
715        let mut all_report = GcReport::default();
716
717        for region_id in regions {
718            if let Some((leader, _followers)) = region_routes.get(region_id) {
719                datanode2regions
720                    .entry(leader.clone())
721                    .or_default()
722                    .push(*region_id);
723            } else {
724                return error::UnexpectedSnafu {
725                    violated: format!(
726                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
727                    ),
728                }
729                .fail();
730            }
731        }
732
733        let mut all_need_retry = HashSet::new();
734        // Send GC instructions to each datanode
735        for (peer, regions_for_peer) in datanode2regions {
736            let gc_regions = GcRegions {
737                regions: regions_for_peer.clone(),
738                // file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation.
739                file_refs_manifest: file_refs.clone(),
740                full_file_listing: self.data.full_file_listing,
741            };
742
743            let report = send_gc_regions(
744                &self.mailbox,
745                &peer,
746                gc_regions,
747                self.data.server_addr.as_str(),
748                timeout,
749                "Batch GC",
750            )
751            .await?;
752
753            let success = report.deleted_files.keys().collect_vec();
754            let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
755
756            if need_retry.is_empty() {
757                info!(
758                    "GC report from datanode {}: successfully deleted files for regions {:?}",
759                    peer, success
760                );
761            } else {
762                warn!(
763                    "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
764                    peer, success, need_retry
765                );
766            }
767            all_need_retry.extend(report.need_retry_regions.clone());
768            all_report.merge(report);
769        }
770
771        if !all_need_retry.is_empty() {
772            warn!("Regions need retry after batch GC: {:?}", all_need_retry);
773        }
774
775        Ok(all_report)
776    }
777}
778
779#[async_trait::async_trait]
780impl Procedure for BatchGcProcedure {
781    fn type_name(&self) -> &str {
782        Self::TYPE_NAME
783    }
784
785    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
786        match self.data.state {
787            State::Start => {
788                // Transition to Acquiring state
789                self.data.state = State::Acquiring;
790                Ok(Status::executing(false))
791            }
792            State::Acquiring => {
793                // Get file references from all datanodes
794                match self.get_file_references().await {
795                    Ok(file_refs) => {
796                        self.data.file_refs = file_refs;
797                        self.data.state = State::Gcing;
798                        Ok(Status::executing(false))
799                    }
800                    Err(e) => {
801                        error!(e; "Failed to get file references");
802                        Err(ProcedureError::external(e))
803                    }
804                }
805            }
806            State::Gcing => {
807                // Send GC instructions to all datanodes
808                // TODO(discord9): handle need-retry regions
809                match self.send_gc_instructions().await {
810                    Ok(report) => {
811                        self.data.state = State::UpdateRepartition;
812                        self.data.gc_report = Some(report);
813                        Ok(Status::executing(false))
814                    }
815                    Err(e) => {
816                        error!(e; "Failed to send GC instructions");
817                        Err(ProcedureError::external(e))
818                    }
819                }
820            }
821            State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
822                Ok(()) => {
823                    info!(
824                        "Cleanup region repartition info completed successfully for regions {:?}",
825                        self.data.regions
826                    );
827                    info!(
828                        "Batch GC completed successfully for regions {:?}",
829                        self.data.regions
830                    );
831                    let Some(report) = self.data.gc_report.take() else {
832                        return common_procedure::error::UnexpectedSnafu {
833                            err_msg: "GC report should be present after GC completion".to_string(),
834                        }
835                        .fail();
836                    };
837                    info!("GC report: {:?}", report);
838                    Ok(Status::done_with_output(report))
839                }
840                Err(e) => {
841                    error!(e; "Failed to cleanup region repartition info");
842                    Err(ProcedureError::external(e))
843                }
844            },
845        }
846    }
847
848    fn dump(&self) -> ProcedureResult<String> {
849        serde_json::to_string(&self.data).context(ToJsonSnafu)
850    }
851
852    /// Read lock all regions involved in this GC procedure.
853    /// So i.e. region migration won't happen during GC and cause race conditions.
854    fn lock_key(&self) -> LockKey {
855        let lock_key: Vec<_> = self
856            .data
857            .regions
858            .iter()
859            .sorted() // sort to have a deterministic lock order
860            .map(|id| RegionLock::Read(*id).into())
861            .collect();
862
863        LockKey::new(lock_key)
864    }
865}