meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30    Result as ProcedureResult, Status,
31};
32use common_telemetry::{debug, error, info, warn};
33use itertools::Itertools as _;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt as _;
36use store_api::storage::{FileRefsManifest, GcReport, RegionId};
37use table::metadata::TableId;
38
39use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
40use crate::gc::util::table_route_to_region;
41use crate::gc::{Peer2Regions, Region2Peers};
42use crate::handler::HeartbeatMailbox;
43use crate::service::mailbox::{Channel, MailboxRef};
44
45/// Helper function to send GetFileRefs instruction and wait for reply.
46async fn send_get_file_refs(
47    mailbox: &MailboxRef,
48    server_addr: &str,
49    peer: &Peer,
50    instruction: GetFileRefs,
51    timeout: Duration,
52) -> Result<GetFileRefsReply> {
53    let instruction = instruction::Instruction::GetFileRefs(instruction);
54    let msg = MailboxMessage::json_message(
55        &format!("Get file references: {}", instruction),
56        &format!("Metasrv@{}", server_addr),
57        &format!("Datanode-{}@{}", peer.id, peer.addr),
58        common_time::util::current_time_millis(),
59        &instruction,
60    )
61    .with_context(|_| SerializeToJsonSnafu {
62        input: instruction.to_string(),
63    })?;
64
65    let mailbox_rx = mailbox
66        .send(&Channel::Datanode(peer.id), msg, timeout)
67        .await?;
68
69    let reply = match mailbox_rx.await {
70        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
71        Err(e) => {
72            error!(
73                e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
74                peer,
75            );
76            return Err(e);
77        }
78    };
79
80    let InstructionReply::GetFileRefs(reply) = reply else {
81        return error::UnexpectedInstructionReplySnafu {
82            mailbox_message: format!("{:?}", reply),
83            reason: "Unexpected reply of the GetFileRefs instruction",
84        }
85        .fail();
86    };
87
88    Ok(reply)
89}
90
91/// Helper function to send GcRegions instruction and wait for reply.
92async fn send_gc_regions(
93    mailbox: &MailboxRef,
94    peer: &Peer,
95    gc_regions: GcRegions,
96    server_addr: &str,
97    timeout: Duration,
98    description: &str,
99) -> Result<GcReport> {
100    let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
101    let msg = MailboxMessage::json_message(
102        &format!("{}: {}", description, instruction),
103        &format!("Metasrv@{}", server_addr),
104        &format!("Datanode-{}@{}", peer.id, peer.addr),
105        common_time::util::current_time_millis(),
106        &instruction,
107    )
108    .with_context(|_| SerializeToJsonSnafu {
109        input: instruction.to_string(),
110    })?;
111
112    let mailbox_rx = mailbox
113        .send(&Channel::Datanode(peer.id), msg, timeout)
114        .await?;
115
116    let reply = match mailbox_rx.await {
117        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
118        Err(e) => {
119            error!(
120                e; "Failed to receive reply from datanode {} for {}",
121                peer, description
122            );
123            return Err(e);
124        }
125    };
126
127    let InstructionReply::GcRegions(reply) = reply else {
128        return error::UnexpectedInstructionReplySnafu {
129            mailbox_message: format!("{:?}", reply),
130            reason: "Unexpected reply of the GcRegions instruction",
131        }
132        .fail();
133    };
134
135    let res = reply.result;
136    match res {
137        Ok(report) => Ok(report),
138        Err(e) => {
139            error!(
140                e; "Datanode {} reported error during GC for regions {:?}",
141                peer, gc_regions
142            );
143            error::UnexpectedSnafu {
144                violated: format!(
145                    "Datanode {} reported error during GC for regions {:?}: {}",
146                    peer, gc_regions, e
147                ),
148            }
149            .fail()
150        }
151    }
152}
153
154/// Procedure to perform get file refs then batch GC for multiple regions,
155/// it holds locks for all regions during the whole procedure.
156pub struct BatchGcProcedure {
157    mailbox: MailboxRef,
158    table_metadata_manager: TableMetadataManagerRef,
159    data: BatchGcData,
160}
161
162#[derive(Serialize, Deserialize)]
163pub struct BatchGcData {
164    state: State,
165    /// Meta server address
166    server_addr: String,
167    /// The regions to be GC-ed
168    regions: Vec<RegionId>,
169    full_file_listing: bool,
170    region_routes: Region2Peers,
171    /// Routes assigned by the scheduler for regions missing from table routes.
172    #[serde(default)]
173    region_routes_override: Region2Peers,
174    /// Related regions (e.g., for shared files after repartition).
175    /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
176    related_regions: HashMap<RegionId, HashSet<RegionId>>,
177    /// Acquired file references (Populated in Acquiring state)
178    file_refs: FileRefsManifest,
179    /// mailbox timeout duration
180    timeout: Duration,
181    gc_report: Option<GcReport>,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub enum State {
186    /// Initial state
187    Start,
188    /// Fetching file references from datanodes
189    Acquiring,
190    /// Sending GC instruction to the target datanode
191    Gcing,
192    /// Updating region repartition info in kvbackend after GC based on the GC result
193    UpdateRepartition,
194}
195
196impl BatchGcProcedure {
197    pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
198
199    pub fn new(
200        mailbox: MailboxRef,
201        table_metadata_manager: TableMetadataManagerRef,
202        server_addr: String,
203        regions: Vec<RegionId>,
204        full_file_listing: bool,
205        timeout: Duration,
206        region_routes_override: Region2Peers,
207    ) -> Self {
208        Self {
209            mailbox,
210            table_metadata_manager,
211            data: BatchGcData {
212                state: State::Start,
213                server_addr,
214                regions,
215                full_file_listing,
216                timeout,
217                region_routes: HashMap::new(),
218                region_routes_override,
219                related_regions: HashMap::new(),
220                file_refs: FileRefsManifest::default(),
221                gc_report: None,
222            },
223        }
224    }
225
226    /// Test-only constructor to jump directly into the repartition update state.
227    /// Intended for integration tests that validate `cleanup_region_repartition` without
228    /// running the full batch GC state machine.
229    #[cfg(feature = "mock")]
230    pub fn new_update_repartition_for_test(
231        mailbox: MailboxRef,
232        table_metadata_manager: TableMetadataManagerRef,
233        server_addr: String,
234        regions: Vec<RegionId>,
235        file_refs: FileRefsManifest,
236        timeout: Duration,
237    ) -> Self {
238        Self {
239            mailbox,
240            table_metadata_manager,
241            data: BatchGcData {
242                state: State::UpdateRepartition,
243                server_addr,
244                regions,
245                full_file_listing: false,
246                timeout,
247                region_routes: HashMap::new(),
248                region_routes_override: HashMap::new(),
249                related_regions: HashMap::new(),
250                file_refs,
251                gc_report: Some(GcReport::default()),
252            },
253        }
254    }
255
256    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
257        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
258            error::UnexpectedSnafu {
259                violated: format!(
260                    "Failed to downcast procedure result to GcReport, got {:?}",
261                    std::any::type_name_of_val(&res.as_ref())
262                ),
263            }
264            .build()
265        })
266    }
267
268    async fn get_table_route(
269        &self,
270        table_id: TableId,
271    ) -> Result<(TableId, PhysicalTableRouteValue)> {
272        self.table_metadata_manager
273            .table_route_manager()
274            .get_physical_table_route(table_id)
275            .await
276            .context(TableMetadataManagerSnafu)
277    }
278
279    /// Return related regions for the given regions.
280    /// The returned map uses the input region as key, and all other regions
281    /// from the same table as values (excluding the input region itself).
282    async fn find_related_regions(
283        &self,
284        regions: &[RegionId],
285    ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
286        let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
287        let table_ids = table_ids.into_iter().collect::<Vec<_>>();
288
289        let table_routes = self
290            .table_metadata_manager
291            .table_route_manager()
292            .batch_get_physical_table_routes(&table_ids)
293            .await
294            .context(TableMetadataManagerSnafu)?;
295
296        if table_routes.len() != table_ids.len() {
297            // batch_get_physical_table_routes returns a subset on misses; treat that as error
298            for table_id in &table_ids {
299                if !table_routes.contains_key(table_id) {
300                    // indicate is a logical table id
301                    return error::InvalidArgumentsSnafu {
302                    err_msg: format!(
303                        "Unexpected logical table route: table {} resolved to physical table regions",
304                        table_id
305                    ),
306                }
307                    .fail();
308                }
309            }
310        }
311
312        let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
313        for (table_id, table_route) in table_routes {
314            let all_regions: HashSet<RegionId> = table_route
315                .region_routes
316                .iter()
317                .map(|r| r.region.id)
318                .collect();
319
320            table_all_regions.insert(table_id, all_regions);
321        }
322
323        let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
324        for region_id in regions {
325            let table_id = region_id.table_id();
326            if let Some(all_regions) = table_all_regions.get(&table_id) {
327                let mut related: HashSet<RegionId> = all_regions.clone();
328                related.remove(region_id);
329                related_regions.insert(*region_id, related);
330            } else {
331                related_regions.insert(*region_id, Default::default());
332            }
333        }
334
335        Ok(related_regions)
336    }
337
338    /// Clean up region repartition info in kvbackend after GC
339    /// according to cross reference in `FileRefsManifest`.
340    async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
341        let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
342            HashMap::new();
343        for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
344            cross_refs_grouped
345                .entry(src_region.table_id())
346                .or_default()
347                .entry(*src_region)
348                .or_default()
349                .extend(dst_regions.iter().copied());
350        }
351
352        let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
353        for (src_region, refs) in &self.data.file_refs.file_refs {
354            if refs.is_empty() {
355                continue;
356            }
357
358            tmp_refs_grouped
359                .entry(src_region.table_id())
360                .or_default()
361                .insert(*src_region);
362        }
363
364        let repart_mgr = self.table_metadata_manager.table_repart_manager();
365
366        let mut table_ids: HashSet<TableId> = cross_refs_grouped
367            .keys()
368            .copied()
369            .chain(tmp_refs_grouped.keys().copied())
370            .collect();
371        table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
372
373        for table_id in table_ids {
374            let table_lock = TableLock::Write(table_id).into();
375            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
376
377            let cross_refs = cross_refs_grouped
378                .get(&table_id)
379                .cloned()
380                .unwrap_or_default();
381            let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
382
383            let current = repart_mgr
384                .get_with_raw_bytes(table_id)
385                .await
386                .context(KvBackendSnafu)?;
387
388            let mut new_value = current
389                .as_ref()
390                .map(|v| (**v).clone())
391                .unwrap_or_else(TableRepartValue::new);
392
393            // We only touch regions involved in this GC batch for the current table to avoid
394            // clobbering unrelated repart entries. Start from the batch regions of this table.
395            let batch_src_regions: HashSet<RegionId> = self
396                .data
397                .regions
398                .iter()
399                .copied()
400                .filter(|r| r.table_id() == table_id)
401                .collect();
402
403            // Merge targets: only the batch regions of this table. This avoids touching unrelated
404            // repart entries; we just reconcile mappings for regions involved in the current GC
405            // cycle for this table.
406            let all_src_regions: HashSet<RegionId> = batch_src_regions;
407
408            for src_region in all_src_regions {
409                let cross_dst = cross_refs.get(&src_region);
410                let has_tmp_ref = tmp_refs.contains(&src_region);
411
412                if let Some(dst_regions) = cross_dst {
413                    let mut set = BTreeSet::new();
414                    set.extend(dst_regions.iter().copied());
415                    new_value.src_to_dst.insert(src_region, set);
416                } else if has_tmp_ref {
417                    // Keep a tombstone entry with an empty set so dropped regions that still
418                    // have tmp refs are preserved; removing it would lose the repartition trace.
419                    new_value.src_to_dst.insert(src_region, BTreeSet::new());
420                } else {
421                    new_value.src_to_dst.remove(&src_region);
422                }
423            }
424
425            // If there is no repartition info to persist, skip creating/updating the key
426            if new_value.src_to_dst.is_empty() && current.is_none() {
427                continue;
428            }
429
430            repart_mgr
431                .upsert_value(table_id, current, &new_value)
432                .await
433                .context(KvBackendSnafu)?;
434        }
435
436        Ok(())
437    }
438
439    /// Discover region routes for the given regions.
440    async fn discover_route_for_regions(
441        &self,
442        regions: &[RegionId],
443    ) -> Result<(Region2Peers, Peer2Regions)> {
444        let mut region_to_peer = HashMap::new();
445        let mut peer_to_regions = HashMap::new();
446
447        // Group regions by table ID for batch processing
448        let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
449        for region_id in regions {
450            let table_id = region_id.table_id();
451            table_to_regions
452                .entry(table_id)
453                .or_default()
454                .push(*region_id);
455        }
456
457        // Process each table's regions together for efficiency
458        for (table_id, table_regions) in table_to_regions {
459            match self.get_table_route(table_id).await {
460                Ok((_phy_table_id, table_route)) => {
461                    table_route_to_region(
462                        &table_route,
463                        &table_regions,
464                        &mut region_to_peer,
465                        &mut peer_to_regions,
466                    );
467                }
468                Err(e) => {
469                    // Continue with other tables instead of failing completely
470                    // TODO(discord9): consider failing here instead
471                    warn!(
472                        "Failed to get table route for table {}: {}, skipping its regions",
473                        table_id, e
474                    );
475                    continue;
476                }
477            }
478        }
479
480        Ok((region_to_peer, peer_to_regions))
481    }
482
483    /// Set region routes and related regions for GC procedure
484    async fn set_routes_and_related_regions(&mut self) -> Result<()> {
485        let related_regions = self.find_related_regions(&self.data.regions).await?;
486
487        self.data.related_regions = related_regions.clone();
488
489        // Discover routes for all regions involved in GC, including both the
490        // primary GC regions and their related regions.
491        let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
492
493        regions_set.extend(related_regions.keys().cloned());
494        regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
495
496        let regions_to_discover = regions_set.into_iter().collect_vec();
497
498        let (mut region_to_peer, _) = self
499            .discover_route_for_regions(&regions_to_discover)
500            .await?;
501
502        for (region_id, route) in &self.data.region_routes_override {
503            region_to_peer
504                .entry(*region_id)
505                .or_insert_with(|| route.clone());
506        }
507
508        self.data.region_routes = region_to_peer;
509
510        Ok(())
511    }
512
513    /// Get file references from all datanodes that host the regions
514    async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
515        self.set_routes_and_related_regions().await?;
516
517        let query_regions = &self.data.regions;
518        let related_regions = &self.data.related_regions;
519        let region_routes = &self.data.region_routes;
520        let timeout = self.data.timeout;
521        let dropped_regions = self
522            .data
523            .region_routes_override
524            .keys()
525            .collect::<HashSet<_>>();
526
527        // Group regions by datanode to minimize RPC calls
528        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
529
530        for region_id in query_regions {
531            if dropped_regions.contains(region_id) {
532                continue;
533            }
534            if let Some((leader, followers)) = region_routes.get(region_id) {
535                datanode2query_regions
536                    .entry(leader.clone())
537                    .or_default()
538                    .push(*region_id);
539                // also need to send for follower regions for file refs in case query is running on follower
540                for follower in followers {
541                    datanode2query_regions
542                        .entry(follower.clone())
543                        .or_default()
544                        .push(*region_id);
545                }
546            } else {
547                return error::UnexpectedSnafu {
548                    violated: format!(
549                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
550                    ),
551                }
552                .fail();
553            }
554        }
555
556        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
557            HashMap::new();
558        for (src_region, dst_regions) in related_regions {
559            for dst_region in dst_regions {
560                if let Some((leader, _followers)) = region_routes.get(dst_region) {
561                    datanode2related_regions
562                        .entry(leader.clone())
563                        .or_default()
564                        .entry(*src_region)
565                        .or_default()
566                        .insert(*dst_region);
567                } // since read from manifest, no need to send to followers
568            }
569        }
570
571        // Send GetFileRefs instructions to each datanode
572        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
573        let mut all_manifest_versions = HashMap::new();
574        let mut all_cross_region_refs = HashMap::new();
575
576        let mut peers = HashSet::new();
577        peers.extend(datanode2query_regions.keys().cloned());
578        peers.extend(datanode2related_regions.keys().cloned());
579
580        for peer in peers {
581            let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
582            let related_regions_for_peer =
583                datanode2related_regions.remove(&peer).unwrap_or_default();
584
585            if regions.is_empty() && related_regions_for_peer.is_empty() {
586                continue;
587            }
588
589            let instruction = GetFileRefs {
590                query_regions: regions.clone(),
591                related_regions: related_regions_for_peer.clone(),
592            };
593
594            let reply = send_get_file_refs(
595                &self.mailbox,
596                &self.data.server_addr,
597                &peer,
598                instruction,
599                timeout,
600            )
601            .await?;
602            debug!(
603                "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
604                peer, regions, related_regions_for_peer, reply
605            );
606
607            if !reply.success {
608                return error::UnexpectedSnafu {
609                    violated: format!(
610                        "Failed to get file references from datanode {}: {:?}",
611                        peer, reply.error
612                    ),
613                }
614                .fail();
615            }
616
617            // Merge the file references from this datanode
618            for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
619                all_file_refs
620                    .entry(region_id)
621                    .or_default()
622                    .extend(file_refs);
623            }
624
625            // region manifest version should be the smallest one among all peers, so outdated region can be detected
626            for (region_id, version) in reply.file_refs_manifest.manifest_version {
627                let entry = all_manifest_versions.entry(region_id).or_insert(version);
628                *entry = (*entry).min(version);
629            }
630
631            for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
632                let entry = all_cross_region_refs
633                    .entry(region_id)
634                    .or_insert_with(HashSet::new);
635                entry.extend(related_region_ids);
636            }
637        }
638
639        Ok(FileRefsManifest {
640            file_refs: all_file_refs,
641            manifest_version: all_manifest_versions,
642            cross_region_refs: all_cross_region_refs,
643        })
644    }
645
646    /// Send GC instruction to all datanodes that host the regions,
647    /// returns regions that need retry.
648    async fn send_gc_instructions(&self) -> Result<GcReport> {
649        let regions = &self.data.regions;
650        let region_routes = &self.data.region_routes;
651        let file_refs = &self.data.file_refs;
652        let timeout = self.data.timeout;
653
654        // Group regions by datanode
655        let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
656        let mut all_report = GcReport::default();
657
658        for region_id in regions {
659            if let Some((leader, _followers)) = region_routes.get(region_id) {
660                datanode2regions
661                    .entry(leader.clone())
662                    .or_default()
663                    .push(*region_id);
664            } else {
665                return error::UnexpectedSnafu {
666                    violated: format!(
667                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
668                    ),
669                }
670                .fail();
671            }
672        }
673
674        let mut all_need_retry = HashSet::new();
675        // Send GC instructions to each datanode
676        for (peer, regions_for_peer) in datanode2regions {
677            let gc_regions = GcRegions {
678                regions: regions_for_peer.clone(),
679                // file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation.
680                file_refs_manifest: file_refs.clone(),
681                full_file_listing: self.data.full_file_listing,
682            };
683
684            let report = send_gc_regions(
685                &self.mailbox,
686                &peer,
687                gc_regions,
688                self.data.server_addr.as_str(),
689                timeout,
690                "Batch GC",
691            )
692            .await?;
693
694            let success = report.deleted_files.keys().collect_vec();
695            let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
696
697            if need_retry.is_empty() {
698                info!(
699                    "GC report from datanode {}: successfully deleted files for regions {:?}",
700                    peer, success
701                );
702            } else {
703                warn!(
704                    "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
705                    peer, success, need_retry
706                );
707            }
708            all_need_retry.extend(report.need_retry_regions.clone());
709            all_report.merge(report);
710        }
711
712        if !all_need_retry.is_empty() {
713            warn!("Regions need retry after batch GC: {:?}", all_need_retry);
714        }
715
716        Ok(all_report)
717    }
718}
719
720#[async_trait::async_trait]
721impl Procedure for BatchGcProcedure {
722    fn type_name(&self) -> &str {
723        Self::TYPE_NAME
724    }
725
726    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
727        match self.data.state {
728            State::Start => {
729                // Transition to Acquiring state
730                self.data.state = State::Acquiring;
731                Ok(Status::executing(false))
732            }
733            State::Acquiring => {
734                // Get file references from all datanodes
735                match self.get_file_references().await {
736                    Ok(file_refs) => {
737                        self.data.file_refs = file_refs;
738                        self.data.state = State::Gcing;
739                        Ok(Status::executing(false))
740                    }
741                    Err(e) => {
742                        error!(e; "Failed to get file references");
743                        Err(ProcedureError::external(e))
744                    }
745                }
746            }
747            State::Gcing => {
748                // Send GC instructions to all datanodes
749                // TODO(discord9): handle need-retry regions
750                match self.send_gc_instructions().await {
751                    Ok(report) => {
752                        self.data.state = State::UpdateRepartition;
753                        self.data.gc_report = Some(report);
754                        Ok(Status::executing(false))
755                    }
756                    Err(e) => {
757                        error!(e; "Failed to send GC instructions");
758                        Err(ProcedureError::external(e))
759                    }
760                }
761            }
762            State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
763                Ok(()) => {
764                    info!(
765                        "Cleanup region repartition info completed successfully for regions {:?}",
766                        self.data.regions
767                    );
768                    info!(
769                        "Batch GC completed successfully for regions {:?}",
770                        self.data.regions
771                    );
772                    let Some(report) = self.data.gc_report.take() else {
773                        return common_procedure::error::UnexpectedSnafu {
774                            err_msg: "GC report should be present after GC completion".to_string(),
775                        }
776                        .fail();
777                    };
778                    info!("GC report: {:?}", report);
779                    Ok(Status::done_with_output(report))
780                }
781                Err(e) => {
782                    error!(e; "Failed to cleanup region repartition info");
783                    Err(ProcedureError::external(e))
784                }
785            },
786        }
787    }
788
789    fn dump(&self) -> ProcedureResult<String> {
790        serde_json::to_string(&self.data).context(ToJsonSnafu)
791    }
792
793    /// Read lock all regions involved in this GC procedure.
794    /// So i.e. region migration won't happen during GC and cause race conditions.
795    fn lock_key(&self) -> LockKey {
796        let lock_key: Vec<_> = self
797            .data
798            .regions
799            .iter()
800            .sorted() // sort to have a deterministic lock order
801            .map(|id| RegionLock::Read(*id).into())
802            .collect();
803
804        LockKey::new(lock_key)
805    }
806}