Skip to main content

meta_srv/gc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30    Result as ProcedureResult, Status,
31};
32use common_telemetry::tracing::Instrument as _;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, info, warn};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use serde::{Deserialize, Serialize};
38use snafu::ResultExt as _;
39use store_api::storage::{FileRefsManifest, GcReport, RegionId};
40use table::metadata::TableId;
41
42use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
43use crate::gc::util::table_route_to_region;
44use crate::gc::{Peer2Regions, Region2Peers};
45use crate::handler::HeartbeatMailbox;
46use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
47use crate::procedure::utils::{instruction_error_result, instruction_to_error};
48use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
49
50async fn send_get_file_refs_inner(
51    mailbox: &MailboxRef,
52    server_addr: &str,
53    peer: &Peer,
54    instruction: GetFileRefs,
55    timeout: Duration,
56) -> Result<MailboxReceiver> {
57    let instruction = instruction::Instruction::GetFileRefs(instruction);
58    let tracing_ctx = TracingContext::from_current_span();
59    let msg = MailboxMessage::json_message(
60        &format!("Get file references: {}", instruction),
61        &format!("Metasrv@{}", server_addr),
62        &format!("Datanode-{}@{}", peer.id, peer.addr),
63        common_time::util::current_time_millis(),
64        &instruction,
65        Some(tracing_ctx.to_w3c()),
66    )
67    .with_context(|_| SerializeToJsonSnafu {
68        input: instruction.to_string(),
69    })?;
70
71    mailbox
72        .send(&Channel::Datanode(peer.id), msg, timeout)
73        .await
74}
75
76async fn recv_get_file_refs_reply(
77    peer: &Peer,
78    mailbox_rx: MailboxReceiver,
79) -> Result<GetFileRefsReply> {
80    let reply = match mailbox_rx.await {
81        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
82        Err(e) => {
83            error!(
84                e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
85                peer,
86            );
87            return Err(e);
88        }
89    };
90
91    let InstructionReply::GetFileRefs(reply) = reply else {
92        return error::UnexpectedInstructionReplySnafu {
93            mailbox_message: format!("{:?}", reply),
94            reason: "Unexpected reply of the GetFileRefs instruction",
95        }
96        .fail();
97    };
98
99    Ok(reply)
100}
101
102async fn send_gc_regions_inner(
103    mailbox: &MailboxRef,
104    peer: &Peer,
105    gc_regions: &GcRegions,
106    server_addr: &str,
107    timeout: Duration,
108    description: &str,
109) -> Result<MailboxReceiver> {
110    let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
111    let tracing_ctx = TracingContext::from_current_span();
112    let msg = MailboxMessage::json_message(
113        &format!("{}: {}", description, instruction),
114        &format!("Metasrv@{}", server_addr),
115        &format!("Datanode-{}@{}", peer.id, peer.addr),
116        common_time::util::current_time_millis(),
117        &instruction,
118        Some(tracing_ctx.to_w3c()),
119    )
120    .with_context(|_| SerializeToJsonSnafu {
121        input: instruction.to_string(),
122    })?;
123
124    mailbox
125        .send(&Channel::Datanode(peer.id), msg, timeout)
126        .await
127}
128
129async fn recv_gc_regions_reply(
130    peer: &Peer,
131    gc_regions: &GcRegions,
132    description: &str,
133    mailbox_rx: MailboxReceiver,
134) -> Result<GcReport> {
135    let reply = match mailbox_rx.await {
136        Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
137        Err(e) => {
138            error!(
139                e; "Failed to receive reply from datanode {} for {}",
140                peer, description
141            );
142            return Err(e);
143        }
144    };
145
146    let InstructionReply::GcRegions(reply) = reply else {
147        return error::UnexpectedInstructionReplySnafu {
148            mailbox_message: format!("{:?}", reply),
149            reason: "Unexpected reply of the GcRegions instruction",
150        }
151        .fail();
152    };
153
154    let res = reply.result;
155    match res {
156        Ok(report) => Ok(report),
157        Err(e) => {
158            error!(
159                e; "Datanode {} reported error during GC for regions {:?}",
160                peer, gc_regions
161            );
162            instruction_error_result(
163                &e,
164                format!(
165                    "Datanode {} reported error during GC for regions {:?}: {}",
166                    peer, gc_regions, e
167                ),
168            )
169        }
170    }
171}
172
173/// Procedure to perform get file refs then batch GC for multiple regions,
174/// it holds locks for all regions during the whole procedure.
175pub struct BatchGcProcedure {
176    mailbox: MailboxRef,
177    table_metadata_manager: TableMetadataManagerRef,
178    data: BatchGcData,
179}
180
181#[derive(Serialize, Deserialize)]
182pub struct BatchGcData {
183    state: State,
184    /// Meta server address
185    server_addr: String,
186    /// The regions to be GC-ed
187    regions: Vec<RegionId>,
188    full_file_listing: bool,
189    region_routes: Region2Peers,
190    /// Routes assigned by the scheduler for regions missing from table routes.
191    #[serde(default)]
192    region_routes_override: Region2Peers,
193    /// Related regions (e.g., for shared files after repartition).
194    /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
195    related_regions: HashMap<RegionId, HashSet<RegionId>>,
196    /// Acquired file references (Populated in Acquiring state)
197    file_refs: FileRefsManifest,
198    /// mailbox timeout duration
199    timeout: Duration,
200    gc_report: Option<GcReport>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum State {
205    /// Initial state
206    Start,
207    /// Fetching file references from datanodes
208    Acquiring,
209    /// Sending GC instruction to the target datanode
210    Gcing,
211    /// Updating region repartition info in kvbackend after GC based on the GC result
212    UpdateRepartition,
213}
214
215impl BatchGcProcedure {
216    pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
217
218    pub fn new(
219        mailbox: MailboxRef,
220        table_metadata_manager: TableMetadataManagerRef,
221        server_addr: String,
222        regions: Vec<RegionId>,
223        full_file_listing: bool,
224        timeout: Duration,
225        region_routes_override: Region2Peers,
226    ) -> Self {
227        Self {
228            mailbox,
229            table_metadata_manager,
230            data: BatchGcData {
231                state: State::Start,
232                server_addr,
233                regions,
234                full_file_listing,
235                timeout,
236                region_routes: HashMap::new(),
237                region_routes_override,
238                related_regions: HashMap::new(),
239                file_refs: FileRefsManifest::default(),
240                gc_report: None,
241            },
242        }
243    }
244
245    /// Test-only constructor to jump directly into the repartition update state.
246    /// Intended for integration tests that validate `cleanup_region_repartition` without
247    /// running the full batch GC state machine.
248    #[cfg(feature = "mock")]
249    pub fn new_update_repartition_for_test(
250        mailbox: MailboxRef,
251        table_metadata_manager: TableMetadataManagerRef,
252        server_addr: String,
253        regions: Vec<RegionId>,
254        file_refs: FileRefsManifest,
255        timeout: Duration,
256    ) -> Self {
257        Self {
258            mailbox,
259            table_metadata_manager,
260            data: BatchGcData {
261                state: State::UpdateRepartition,
262                server_addr,
263                regions,
264                full_file_listing: false,
265                timeout,
266                region_routes: HashMap::new(),
267                region_routes_override: HashMap::new(),
268                related_regions: HashMap::new(),
269                file_refs,
270                gc_report: Some(GcReport::default()),
271            },
272        }
273    }
274
275    pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
276        res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
277            error::UnexpectedSnafu {
278                violated: format!(
279                    "Failed to downcast procedure result to GcReport, got {:?}",
280                    std::any::type_name_of_val(&res.as_ref())
281                ),
282            }
283            .build()
284        })
285    }
286
287    async fn get_table_route(
288        &self,
289        table_id: TableId,
290    ) -> Result<(TableId, PhysicalTableRouteValue)> {
291        self.table_metadata_manager
292            .table_route_manager()
293            .get_physical_table_route(table_id)
294            .await
295            .context(TableMetadataManagerSnafu)
296    }
297
298    /// Return related regions for the given regions.
299    /// The returned map uses the input region as key, and all other regions
300    /// from the same table as values (excluding the input region itself).
301    async fn find_related_regions(
302        &self,
303        regions: &[RegionId],
304    ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
305        let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
306        let table_ids = table_ids.into_iter().collect::<Vec<_>>();
307
308        let table_routes = self
309            .table_metadata_manager
310            .table_route_manager()
311            .batch_get_physical_table_routes(&table_ids)
312            .await
313            .context(TableMetadataManagerSnafu)?;
314
315        if table_routes.len() != table_ids.len() {
316            // batch_get_physical_table_routes returns a subset on misses; treat that as error
317            for table_id in &table_ids {
318                if !table_routes.contains_key(table_id) {
319                    // indicate is a logical table id
320                    return error::InvalidArgumentsSnafu {
321                    err_msg: format!(
322                        "Unexpected logical table route: table {} resolved to physical table regions",
323                        table_id
324                    ),
325                }
326                    .fail();
327                }
328            }
329        }
330
331        let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
332        for (table_id, table_route) in table_routes {
333            let all_regions: HashSet<RegionId> = table_route
334                .region_routes
335                .iter()
336                .map(|r| r.region.id)
337                .collect();
338
339            table_all_regions.insert(table_id, all_regions);
340        }
341
342        let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
343        for region_id in regions {
344            let table_id = region_id.table_id();
345            if let Some(all_regions) = table_all_regions.get(&table_id) {
346                let mut related: HashSet<RegionId> = all_regions.clone();
347                related.remove(region_id);
348                related_regions.insert(*region_id, related);
349            } else {
350                related_regions.insert(*region_id, Default::default());
351            }
352        }
353
354        Ok(related_regions)
355    }
356
357    /// Clean up region repartition info in kvbackend after GC
358    /// according to cross reference in `FileRefsManifest`.
359    async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
360        let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
361            HashMap::new();
362        for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
363            cross_refs_grouped
364                .entry(src_region.table_id())
365                .or_default()
366                .entry(*src_region)
367                .or_default()
368                .extend(dst_regions.iter().copied());
369        }
370
371        let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
372        for (src_region, refs) in &self.data.file_refs.file_refs {
373            if refs.is_empty() {
374                continue;
375            }
376
377            tmp_refs_grouped
378                .entry(src_region.table_id())
379                .or_default()
380                .insert(*src_region);
381        }
382
383        let repart_mgr = self.table_metadata_manager.table_repart_manager();
384
385        let mut table_ids: HashSet<TableId> = cross_refs_grouped
386            .keys()
387            .copied()
388            .chain(tmp_refs_grouped.keys().copied())
389            .collect();
390        table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
391
392        for table_id in table_ids {
393            let table_lock = TableLock::Write(table_id).into();
394            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
395
396            let cross_refs = cross_refs_grouped
397                .get(&table_id)
398                .cloned()
399                .unwrap_or_default();
400            let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
401
402            let current = repart_mgr
403                .get_with_raw_bytes(table_id)
404                .await
405                .context(KvBackendSnafu)?;
406
407            let mut new_value = current
408                .as_ref()
409                .map(|v| (**v).clone())
410                .unwrap_or_else(TableRepartValue::new);
411
412            // We only touch regions involved in this GC batch for the current table to avoid
413            // clobbering unrelated repart entries. Start from the batch regions of this table.
414            let batch_src_regions: HashSet<RegionId> = self
415                .data
416                .regions
417                .iter()
418                .copied()
419                .filter(|r| r.table_id() == table_id)
420                .collect();
421
422            // Merge targets: only the batch regions of this table. This avoids touching unrelated
423            // repart entries; we just reconcile mappings for regions involved in the current GC
424            // cycle for this table.
425            let all_src_regions: HashSet<RegionId> = batch_src_regions;
426
427            for src_region in all_src_regions {
428                let cross_dst = cross_refs.get(&src_region);
429                let has_tmp_ref = tmp_refs.contains(&src_region);
430
431                if let Some(dst_regions) = cross_dst {
432                    let mut set = BTreeSet::new();
433                    set.extend(dst_regions.iter().copied());
434                    new_value.src_to_dst.insert(src_region, set);
435                } else if has_tmp_ref {
436                    // Keep a tombstone entry with an empty set so dropped regions that still
437                    // have tmp refs are preserved; removing it would lose the repartition trace.
438                    new_value.src_to_dst.insert(src_region, BTreeSet::new());
439                } else {
440                    new_value.src_to_dst.remove(&src_region);
441                }
442            }
443
444            // If there is no repartition info to persist, skip creating/updating the key
445            if new_value.src_to_dst.is_empty() && current.is_none() {
446                continue;
447            }
448
449            repart_mgr
450                .upsert_value(table_id, current, &new_value)
451                .await
452                .context(KvBackendSnafu)?;
453        }
454
455        Ok(())
456    }
457
458    /// Discover region routes for the given regions.
459    async fn discover_route_for_regions(
460        &self,
461        regions: &[RegionId],
462    ) -> Result<(Region2Peers, Peer2Regions)> {
463        let mut region_to_peer = HashMap::new();
464        let mut peer_to_regions = HashMap::new();
465
466        // Group regions by table ID for batch processing
467        let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
468        for region_id in regions {
469            let table_id = region_id.table_id();
470            table_to_regions
471                .entry(table_id)
472                .or_default()
473                .push(*region_id);
474        }
475
476        // Process each table's regions together for efficiency
477        for (table_id, table_regions) in table_to_regions {
478            match self.get_table_route(table_id).await {
479                Ok((_phy_table_id, table_route)) => {
480                    table_route_to_region(
481                        &table_route,
482                        &table_regions,
483                        &mut region_to_peer,
484                        &mut peer_to_regions,
485                    );
486                }
487                Err(e) => {
488                    // Continue with other tables instead of failing completely
489                    // TODO(discord9): consider failing here instead
490                    warn!(
491                        "Failed to get table route for table {}: {}, skipping its regions",
492                        table_id, e
493                    );
494                    continue;
495                }
496            }
497        }
498
499        Ok((region_to_peer, peer_to_regions))
500    }
501
502    /// Set region routes and related regions for GC procedure
503    async fn set_routes_and_related_regions(&mut self) -> Result<()> {
504        let related_regions = self.find_related_regions(&self.data.regions).await?;
505
506        self.data.related_regions = related_regions.clone();
507
508        // Discover routes for all regions involved in GC, including both the
509        // primary GC regions and their related regions.
510        let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
511
512        regions_set.extend(related_regions.keys().cloned());
513        regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
514
515        let regions_to_discover = regions_set.into_iter().collect_vec();
516
517        let (mut region_to_peer, _) = self
518            .discover_route_for_regions(&regions_to_discover)
519            .await?;
520
521        for (region_id, route) in &self.data.region_routes_override {
522            region_to_peer
523                .entry(*region_id)
524                .or_insert_with(|| route.clone());
525        }
526
527        self.data.region_routes = region_to_peer;
528
529        Ok(())
530    }
531
532    /// Get file references from all datanodes that host the regions
533    async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
534        let region_count = self.data.regions.len();
535        self.set_routes_and_related_regions()
536            .instrument(common_telemetry::tracing::info_span!(
537                "meta_gc_procedure_prepare_routes",
538                region_count = region_count
539            ))
540            .await?;
541
542        let query_regions = &self.data.regions;
543        let related_regions = &self.data.related_regions;
544        let region_routes = &self.data.region_routes;
545        let timeout = self.data.timeout;
546        let dropped_regions = self
547            .data
548            .region_routes_override
549            .keys()
550            .collect::<HashSet<_>>();
551
552        // Group regions by datanode to minimize RPC calls
553        let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
554
555        for region_id in query_regions {
556            if dropped_regions.contains(region_id) {
557                continue;
558            }
559            if let Some((leader, followers)) = region_routes.get(region_id) {
560                datanode2query_regions
561                    .entry(leader.clone())
562                    .or_default()
563                    .push(*region_id);
564                // also need to send for follower regions for file refs in case query is running on follower
565                for follower in followers {
566                    datanode2query_regions
567                        .entry(follower.clone())
568                        .or_default()
569                        .push(*region_id);
570                }
571            } else {
572                return error::UnexpectedSnafu {
573                    violated: format!(
574                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
575                    ),
576                }
577                .fail();
578            }
579        }
580
581        let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
582            HashMap::new();
583        for (src_region, dst_regions) in related_regions {
584            for dst_region in dst_regions {
585                if let Some((leader, _followers)) = region_routes.get(dst_region) {
586                    datanode2related_regions
587                        .entry(leader.clone())
588                        .or_default()
589                        .entry(*src_region)
590                        .or_default()
591                        .insert(*dst_region);
592                } // since read from manifest, no need to send to followers
593            }
594        }
595
596        // Send GetFileRefs instructions to each datanode
597        let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
598        let mut all_manifest_versions = HashMap::new();
599        let mut all_cross_region_refs = HashMap::new();
600
601        let mut peers = HashSet::new();
602        peers.extend(datanode2query_regions.keys().cloned());
603        peers.extend(datanode2related_regions.keys().cloned());
604
605        let mailbox = &self.mailbox;
606        let server_addr = &self.data.server_addr;
607        let mut tasks = Vec::new();
608
609        for peer in peers {
610            let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
611            let related_regions_for_peer =
612                datanode2related_regions.remove(&peer).unwrap_or_default();
613
614            if regions.is_empty() && related_regions_for_peer.is_empty() {
615                continue;
616            }
617
618            tasks.push(async move {
619                let instruction = GetFileRefs {
620                    query_regions: regions.clone(),
621                    related_regions: related_regions_for_peer.clone(),
622                };
623
624                let reply =
625                    send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout)
626                        .await;
627
628                (peer, regions, related_regions_for_peer, reply)
629            });
630        }
631
632        let mut recv_tasks = Vec::new();
633        // store error to make sure metrics doesn't ignore other peers
634        let mut first_error = None;
635        let mut record_get_file_refs_error = |e| {
636            METRIC_META_GC_DATANODE_CALLS_TOTAL
637                .with_label_values(&["get_file_refs", "error"])
638                .inc();
639            if first_error.is_none() {
640                first_error = Some(e);
641            }
642        };
643        for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await {
644            match reply {
645                Ok(mailbox_rx) => {
646                    recv_tasks.push(async move {
647                        let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await;
648                        (peer, regions, related_regions_for_peer, reply)
649                    });
650                }
651                Err(e) => record_get_file_refs_error(e),
652            }
653        }
654
655        let replies = join_all(recv_tasks).await;
656
657        for (peer, regions, related_regions_for_peer, reply) in replies {
658            let reply = match reply {
659                Ok(reply) => reply,
660                Err(e) => {
661                    record_get_file_refs_error(e);
662                    continue;
663                }
664            };
665            debug!(
666                "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
667                peer, regions, related_regions_for_peer, reply
668            );
669
670            if !reply.success {
671                METRIC_META_GC_DATANODE_CALLS_TOTAL
672                    .with_label_values(&["get_file_refs", "error"])
673                    .inc();
674                let err = if let Some(error) = &reply.error {
675                    instruction_to_error(
676                        error,
677                        format!(
678                            "Failed to get file references from datanode {}: {:?}",
679                            peer, error
680                        ),
681                    )
682                } else {
683                    error::UnexpectedSnafu {
684                        violated: format!(
685                            "Failed to get file references from datanode {}: {:?}",
686                            peer, reply.error
687                        ),
688                    }
689                    .build()
690                };
691                record_get_file_refs_error(err);
692                continue;
693            }
694            METRIC_META_GC_DATANODE_CALLS_TOTAL
695                .with_label_values(&["get_file_refs", "success"])
696                .inc();
697
698            // Merge the file references from this datanode
699            for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
700                all_file_refs
701                    .entry(region_id)
702                    .or_default()
703                    .extend(file_refs);
704            }
705
706            // region manifest version should be the smallest one among all peers, so outdated region can be detected
707            for (region_id, version) in reply.file_refs_manifest.manifest_version {
708                let entry = all_manifest_versions.entry(region_id).or_insert(version);
709                *entry = (*entry).min(version);
710            }
711
712            for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
713                let entry = all_cross_region_refs
714                    .entry(region_id)
715                    .or_insert_with(HashSet::new);
716                entry.extend(related_region_ids);
717            }
718        }
719
720        if let Some(e) = first_error {
721            return Err(e);
722        }
723
724        Ok(FileRefsManifest {
725            file_refs: all_file_refs,
726            manifest_version: all_manifest_versions,
727            cross_region_refs: all_cross_region_refs,
728        })
729    }
730
731    /// Send GC instruction to all datanodes that host the regions,
732    /// returns regions that need retry.
733    async fn send_gc_instructions(&self) -> Result<GcReport> {
734        let regions = &self.data.regions;
735        let region_routes = &self.data.region_routes;
736        let file_refs = &self.data.file_refs;
737        let timeout = self.data.timeout;
738
739        // Group regions by datanode
740        let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
741        let mut all_report = GcReport::default();
742
743        for region_id in regions {
744            if let Some((leader, _followers)) = region_routes.get(region_id) {
745                datanode2regions
746                    .entry(leader.clone())
747                    .or_default()
748                    .push(*region_id);
749            } else {
750                return error::UnexpectedSnafu {
751                    violated: format!(
752                        "region_routes: {region_routes:?} does not contain region_id: {region_id}",
753                    ),
754                }
755                .fail();
756            }
757        }
758
759        let mut all_need_retry = HashSet::new();
760        let mailbox = &self.mailbox;
761        let server_addr = self.data.server_addr.as_str();
762        let full_file_listing = self.data.full_file_listing;
763        let tasks = datanode2regions
764            .into_iter()
765            .map(|(peer, regions_for_peer)| {
766                let gc_regions = GcRegions {
767                    regions: regions_for_peer.clone(),
768                    // file_refs_manifest could be somewhere large. But still intentionally clone per datanode here:
769                    // this path is admin-triggered or scheduler-triggered, peer count is expected to be bounded, and
770                    // and abnormal manifest growth should be addressed at the source
771                    file_refs_manifest: file_refs.clone(),
772                    full_file_listing,
773                };
774                let region_count = gc_regions.regions.len() as u64;
775
776                async move {
777                    let report = send_gc_regions_inner(
778                        mailbox,
779                        &peer,
780                        &gc_regions,
781                        server_addr,
782                        timeout,
783                        "Batch GC",
784                    )
785                    .await;
786
787                    (peer, gc_regions, region_count, report)
788                }
789            });
790
791        let mut recv_tasks = Vec::new();
792        let mut first_error = None;
793        let mut record_gc_error = |e, region_count| {
794            METRIC_META_GC_DATANODE_CALLS_TOTAL
795                .with_label_values(&["gc_regions", "error"])
796                .inc();
797            if region_count > 0 {
798                METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count);
799            }
800            if first_error.is_none() {
801                first_error = Some(e);
802            }
803        };
804        for (peer, gc_regions, region_count, report) in join_all(tasks).await {
805            match report {
806                Ok(mailbox_rx) => {
807                    recv_tasks.push(async move {
808                        let report =
809                            recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await;
810                        (peer, region_count, report)
811                    });
812                }
813                Err(e) => record_gc_error(e, region_count),
814            }
815        }
816
817        for (peer, region_count, report) in join_all(recv_tasks).await {
818            let report = match report {
819                Ok(report) => {
820                    METRIC_META_GC_DATANODE_CALLS_TOTAL
821                        .with_label_values(&["gc_regions", "success"])
822                        .inc();
823                    let need_retry_count = report.need_retry_regions.len() as u64;
824                    if need_retry_count > 0 {
825                        METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
826                    }
827                    report
828                }
829                Err(e) => {
830                    record_gc_error(e, region_count);
831                    continue;
832                }
833            };
834
835            let success = report.deleted_files.keys().collect_vec();
836            let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
837
838            if need_retry.is_empty() {
839                info!(
840                    "GC report from datanode {}: successfully deleted files for regions {:?}",
841                    peer, success
842                );
843            } else {
844                warn!(
845                    "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
846                    peer, success, need_retry
847                );
848            }
849            all_need_retry.extend(report.need_retry_regions.clone());
850            all_report.merge(report);
851        }
852
853        if let Some(e) = first_error {
854            return Err(e);
855        }
856
857        if !all_need_retry.is_empty() {
858            warn!("Regions need retry after batch GC: {:?}", all_need_retry);
859        }
860
861        Ok(all_report)
862    }
863}
864
865#[async_trait::async_trait]
866impl Procedure for BatchGcProcedure {
867    fn type_name(&self) -> &str {
868        Self::TYPE_NAME
869    }
870
871    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
872        match self.data.state {
873            State::Start => {
874                let _regions_span = common_telemetry::tracing::debug_span!(
875                    "meta_gc_procedure_regions",
876                    state = "start",
877                    regions = ?self.data.regions
878                )
879                .entered();
880                info!(
881                    "Batch GC procedure transitioning from Start to Acquiring for {} regions",
882                    self.data.regions.len()
883                );
884                // Transition to Acquiring state
885                self.data.state = State::Acquiring;
886                Ok(Status::executing(false))
887            }
888            State::Acquiring => {
889                let region_count = self.data.regions.len();
890                let full_file_listing = self.data.full_file_listing;
891                let regions = self.data.regions.clone();
892                info!(
893                    "Batch GC procedure acquiring file references for {} regions",
894                    region_count
895                );
896                // Get file references from all datanodes
897                match self
898                    .get_file_references()
899                    .instrument(common_telemetry::tracing::debug_span!(
900                        "meta_gc_procedure_regions",
901                        state = "acquiring",
902                        regions = ?regions
903                    ))
904                    .instrument(common_telemetry::tracing::info_span!(
905                        "meta_gc_procedure_get_file_references",
906                        region_count = region_count,
907                        full_file_listing = full_file_listing
908                    ))
909                    .await
910                {
911                    Ok(file_refs) => {
912                        info!(
913                            "Batch GC procedure acquired file references for {} regions",
914                            file_refs.file_refs.len()
915                        );
916                        self.data.file_refs = file_refs;
917                        self.data.state = State::Gcing;
918                        Ok(Status::executing(false))
919                    }
920                    Err(e) => {
921                        error!(e; "Failed to get file references");
922                        Err(ProcedureError::external(e))
923                    }
924                }
925            }
926            State::Gcing => {
927                info!(
928                    "Batch GC procedure sending GC instructions for {} regions",
929                    self.data.regions.len()
930                );
931                // Send GC instructions to all datanodes
932                // TODO(discord9): handle need-retry regions
933                match self
934                    .send_gc_instructions()
935                    .instrument(common_telemetry::tracing::debug_span!(
936                        "meta_gc_procedure_regions",
937                        state = "gcing",
938                        regions = ?self.data.regions
939                    ))
940                    .instrument(common_telemetry::tracing::info_span!(
941                        "meta_gc_procedure_send_gc_instructions",
942                        region_count = self.data.regions.len(),
943                        full_file_listing = self.data.full_file_listing
944                    ))
945                    .await
946                {
947                    Ok(report) => {
948                        info!(
949                            "Batch GC procedure received GC report, retry region count: {}",
950                            report.need_retry_regions.len()
951                        );
952                        self.data.state = State::UpdateRepartition;
953                        self.data.gc_report = Some(report);
954                        Ok(Status::executing(false))
955                    }
956                    Err(e) => {
957                        error!(e; "Failed to send GC instructions");
958                        Err(ProcedureError::external(e))
959                    }
960                }
961            }
962            State::UpdateRepartition => match self
963                .cleanup_region_repartition(ctx)
964                .instrument(common_telemetry::tracing::debug_span!(
965                    "meta_gc_procedure_regions",
966                    state = "update_repartition",
967                    regions = ?self.data.regions
968                ))
969                .instrument(common_telemetry::tracing::info_span!(
970                    "meta_gc_procedure_update_repartition",
971                    region_count = self.data.regions.len()
972                ))
973                .await
974            {
975                Ok(()) => {
976                    debug!(
977                        "Cleanup region repartition info completed successfully for regions {:?}",
978                        self.data.regions
979                    );
980                    info!(
981                        "Batch GC completed successfully for regions {:?}",
982                        self.data.regions
983                    );
984                    let Some(report) = self.data.gc_report.take() else {
985                        return common_procedure::error::UnexpectedSnafu {
986                            err_msg: "GC report should be present after GC completion".to_string(),
987                        }
988                        .fail();
989                    };
990                    info!("GC report: {:?}", report);
991                    Ok(Status::done_with_output(report))
992                }
993                Err(e) => {
994                    error!(e; "Failed to cleanup region repartition info");
995                    Err(ProcedureError::external(e))
996                }
997            },
998        }
999    }
1000
1001    fn dump(&self) -> ProcedureResult<String> {
1002        serde_json::to_string(&self.data).context(ToJsonSnafu)
1003    }
1004
1005    /// Read lock all regions involved in this GC procedure.
1006    /// So i.e. region migration won't happen during GC and cause race conditions.
1007    fn lock_key(&self) -> LockKey {
1008        let lock_key: Vec<_> = self
1009            .data
1010            .regions
1011            .iter()
1012            .sorted() // sort to have a deterministic lock order
1013            .map(|id| RegionLock::Read(*id).into())
1014            .collect();
1015
1016        LockKey::new(lock_key)
1017    }
1018}