1use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30 Result as ProcedureResult, Status,
31};
32use common_telemetry::tracing::Instrument as _;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, info, warn};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use serde::{Deserialize, Serialize};
38use snafu::ResultExt as _;
39use store_api::storage::{FileRefsManifest, GcReport, RegionId};
40use table::metadata::TableId;
41
42use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
43use crate::gc::util::table_route_to_region;
44use crate::gc::{Peer2Regions, Region2Peers};
45use crate::handler::HeartbeatMailbox;
46use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
47use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
48
49async fn send_get_file_refs_inner(
50 mailbox: &MailboxRef,
51 server_addr: &str,
52 peer: &Peer,
53 instruction: GetFileRefs,
54 timeout: Duration,
55) -> Result<MailboxReceiver> {
56 let instruction = instruction::Instruction::GetFileRefs(instruction);
57 let tracing_ctx = TracingContext::from_current_span();
58 let msg = MailboxMessage::json_message(
59 &format!("Get file references: {}", instruction),
60 &format!("Metasrv@{}", server_addr),
61 &format!("Datanode-{}@{}", peer.id, peer.addr),
62 common_time::util::current_time_millis(),
63 &instruction,
64 Some(tracing_ctx.to_w3c()),
65 )
66 .with_context(|_| SerializeToJsonSnafu {
67 input: instruction.to_string(),
68 })?;
69
70 mailbox
71 .send(&Channel::Datanode(peer.id), msg, timeout)
72 .await
73}
74
75async fn recv_get_file_refs_reply(
76 peer: &Peer,
77 mailbox_rx: MailboxReceiver,
78) -> Result<GetFileRefsReply> {
79 let reply = match mailbox_rx.await {
80 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
81 Err(e) => {
82 error!(
83 e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
84 peer,
85 );
86 return Err(e);
87 }
88 };
89
90 let InstructionReply::GetFileRefs(reply) = reply else {
91 return error::UnexpectedInstructionReplySnafu {
92 mailbox_message: format!("{:?}", reply),
93 reason: "Unexpected reply of the GetFileRefs instruction",
94 }
95 .fail();
96 };
97
98 Ok(reply)
99}
100
101async fn send_gc_regions_inner(
102 mailbox: &MailboxRef,
103 peer: &Peer,
104 gc_regions: &GcRegions,
105 server_addr: &str,
106 timeout: Duration,
107 description: &str,
108) -> Result<MailboxReceiver> {
109 let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
110 let tracing_ctx = TracingContext::from_current_span();
111 let msg = MailboxMessage::json_message(
112 &format!("{}: {}", description, instruction),
113 &format!("Metasrv@{}", server_addr),
114 &format!("Datanode-{}@{}", peer.id, peer.addr),
115 common_time::util::current_time_millis(),
116 &instruction,
117 Some(tracing_ctx.to_w3c()),
118 )
119 .with_context(|_| SerializeToJsonSnafu {
120 input: instruction.to_string(),
121 })?;
122
123 mailbox
124 .send(&Channel::Datanode(peer.id), msg, timeout)
125 .await
126}
127
128async fn recv_gc_regions_reply(
129 peer: &Peer,
130 gc_regions: &GcRegions,
131 description: &str,
132 mailbox_rx: MailboxReceiver,
133) -> Result<GcReport> {
134 let reply = match mailbox_rx.await {
135 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
136 Err(e) => {
137 error!(
138 e; "Failed to receive reply from datanode {} for {}",
139 peer, description
140 );
141 return Err(e);
142 }
143 };
144
145 let InstructionReply::GcRegions(reply) = reply else {
146 return error::UnexpectedInstructionReplySnafu {
147 mailbox_message: format!("{:?}", reply),
148 reason: "Unexpected reply of the GcRegions instruction",
149 }
150 .fail();
151 };
152
153 let res = reply.result;
154 match res {
155 Ok(report) => Ok(report),
156 Err(e) => {
157 error!(
158 e; "Datanode {} reported error during GC for regions {:?}",
159 peer, gc_regions
160 );
161 error::UnexpectedSnafu {
162 violated: format!(
163 "Datanode {} reported error during GC for regions {:?}: {}",
164 peer, gc_regions, e
165 ),
166 }
167 .fail()
168 }
169 }
170}
171
172pub struct BatchGcProcedure {
175 mailbox: MailboxRef,
176 table_metadata_manager: TableMetadataManagerRef,
177 data: BatchGcData,
178}
179
180#[derive(Serialize, Deserialize)]
181pub struct BatchGcData {
182 state: State,
183 server_addr: String,
185 regions: Vec<RegionId>,
187 full_file_listing: bool,
188 region_routes: Region2Peers,
189 #[serde(default)]
191 region_routes_override: Region2Peers,
192 related_regions: HashMap<RegionId, HashSet<RegionId>>,
195 file_refs: FileRefsManifest,
197 timeout: Duration,
199 gc_report: Option<GcReport>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
203pub enum State {
204 Start,
206 Acquiring,
208 Gcing,
210 UpdateRepartition,
212}
213
214impl BatchGcProcedure {
215 pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
216
217 pub fn new(
218 mailbox: MailboxRef,
219 table_metadata_manager: TableMetadataManagerRef,
220 server_addr: String,
221 regions: Vec<RegionId>,
222 full_file_listing: bool,
223 timeout: Duration,
224 region_routes_override: Region2Peers,
225 ) -> Self {
226 Self {
227 mailbox,
228 table_metadata_manager,
229 data: BatchGcData {
230 state: State::Start,
231 server_addr,
232 regions,
233 full_file_listing,
234 timeout,
235 region_routes: HashMap::new(),
236 region_routes_override,
237 related_regions: HashMap::new(),
238 file_refs: FileRefsManifest::default(),
239 gc_report: None,
240 },
241 }
242 }
243
244 #[cfg(feature = "mock")]
248 pub fn new_update_repartition_for_test(
249 mailbox: MailboxRef,
250 table_metadata_manager: TableMetadataManagerRef,
251 server_addr: String,
252 regions: Vec<RegionId>,
253 file_refs: FileRefsManifest,
254 timeout: Duration,
255 ) -> Self {
256 Self {
257 mailbox,
258 table_metadata_manager,
259 data: BatchGcData {
260 state: State::UpdateRepartition,
261 server_addr,
262 regions,
263 full_file_listing: false,
264 timeout,
265 region_routes: HashMap::new(),
266 region_routes_override: HashMap::new(),
267 related_regions: HashMap::new(),
268 file_refs,
269 gc_report: Some(GcReport::default()),
270 },
271 }
272 }
273
274 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
275 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
276 error::UnexpectedSnafu {
277 violated: format!(
278 "Failed to downcast procedure result to GcReport, got {:?}",
279 std::any::type_name_of_val(&res.as_ref())
280 ),
281 }
282 .build()
283 })
284 }
285
286 async fn get_table_route(
287 &self,
288 table_id: TableId,
289 ) -> Result<(TableId, PhysicalTableRouteValue)> {
290 self.table_metadata_manager
291 .table_route_manager()
292 .get_physical_table_route(table_id)
293 .await
294 .context(TableMetadataManagerSnafu)
295 }
296
297 async fn find_related_regions(
301 &self,
302 regions: &[RegionId],
303 ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
304 let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
305 let table_ids = table_ids.into_iter().collect::<Vec<_>>();
306
307 let table_routes = self
308 .table_metadata_manager
309 .table_route_manager()
310 .batch_get_physical_table_routes(&table_ids)
311 .await
312 .context(TableMetadataManagerSnafu)?;
313
314 if table_routes.len() != table_ids.len() {
315 for table_id in &table_ids {
317 if !table_routes.contains_key(table_id) {
318 return error::InvalidArgumentsSnafu {
320 err_msg: format!(
321 "Unexpected logical table route: table {} resolved to physical table regions",
322 table_id
323 ),
324 }
325 .fail();
326 }
327 }
328 }
329
330 let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
331 for (table_id, table_route) in table_routes {
332 let all_regions: HashSet<RegionId> = table_route
333 .region_routes
334 .iter()
335 .map(|r| r.region.id)
336 .collect();
337
338 table_all_regions.insert(table_id, all_regions);
339 }
340
341 let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
342 for region_id in regions {
343 let table_id = region_id.table_id();
344 if let Some(all_regions) = table_all_regions.get(&table_id) {
345 let mut related: HashSet<RegionId> = all_regions.clone();
346 related.remove(region_id);
347 related_regions.insert(*region_id, related);
348 } else {
349 related_regions.insert(*region_id, Default::default());
350 }
351 }
352
353 Ok(related_regions)
354 }
355
356 async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
359 let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
360 HashMap::new();
361 for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
362 cross_refs_grouped
363 .entry(src_region.table_id())
364 .or_default()
365 .entry(*src_region)
366 .or_default()
367 .extend(dst_regions.iter().copied());
368 }
369
370 let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
371 for (src_region, refs) in &self.data.file_refs.file_refs {
372 if refs.is_empty() {
373 continue;
374 }
375
376 tmp_refs_grouped
377 .entry(src_region.table_id())
378 .or_default()
379 .insert(*src_region);
380 }
381
382 let repart_mgr = self.table_metadata_manager.table_repart_manager();
383
384 let mut table_ids: HashSet<TableId> = cross_refs_grouped
385 .keys()
386 .copied()
387 .chain(tmp_refs_grouped.keys().copied())
388 .collect();
389 table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
390
391 for table_id in table_ids {
392 let table_lock = TableLock::Write(table_id).into();
393 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
394
395 let cross_refs = cross_refs_grouped
396 .get(&table_id)
397 .cloned()
398 .unwrap_or_default();
399 let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
400
401 let current = repart_mgr
402 .get_with_raw_bytes(table_id)
403 .await
404 .context(KvBackendSnafu)?;
405
406 let mut new_value = current
407 .as_ref()
408 .map(|v| (**v).clone())
409 .unwrap_or_else(TableRepartValue::new);
410
411 let batch_src_regions: HashSet<RegionId> = self
414 .data
415 .regions
416 .iter()
417 .copied()
418 .filter(|r| r.table_id() == table_id)
419 .collect();
420
421 let all_src_regions: HashSet<RegionId> = batch_src_regions;
425
426 for src_region in all_src_regions {
427 let cross_dst = cross_refs.get(&src_region);
428 let has_tmp_ref = tmp_refs.contains(&src_region);
429
430 if let Some(dst_regions) = cross_dst {
431 let mut set = BTreeSet::new();
432 set.extend(dst_regions.iter().copied());
433 new_value.src_to_dst.insert(src_region, set);
434 } else if has_tmp_ref {
435 new_value.src_to_dst.insert(src_region, BTreeSet::new());
438 } else {
439 new_value.src_to_dst.remove(&src_region);
440 }
441 }
442
443 if new_value.src_to_dst.is_empty() && current.is_none() {
445 continue;
446 }
447
448 repart_mgr
449 .upsert_value(table_id, current, &new_value)
450 .await
451 .context(KvBackendSnafu)?;
452 }
453
454 Ok(())
455 }
456
457 async fn discover_route_for_regions(
459 &self,
460 regions: &[RegionId],
461 ) -> Result<(Region2Peers, Peer2Regions)> {
462 let mut region_to_peer = HashMap::new();
463 let mut peer_to_regions = HashMap::new();
464
465 let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
467 for region_id in regions {
468 let table_id = region_id.table_id();
469 table_to_regions
470 .entry(table_id)
471 .or_default()
472 .push(*region_id);
473 }
474
475 for (table_id, table_regions) in table_to_regions {
477 match self.get_table_route(table_id).await {
478 Ok((_phy_table_id, table_route)) => {
479 table_route_to_region(
480 &table_route,
481 &table_regions,
482 &mut region_to_peer,
483 &mut peer_to_regions,
484 );
485 }
486 Err(e) => {
487 warn!(
490 "Failed to get table route for table {}: {}, skipping its regions",
491 table_id, e
492 );
493 continue;
494 }
495 }
496 }
497
498 Ok((region_to_peer, peer_to_regions))
499 }
500
501 async fn set_routes_and_related_regions(&mut self) -> Result<()> {
503 let related_regions = self.find_related_regions(&self.data.regions).await?;
504
505 self.data.related_regions = related_regions.clone();
506
507 let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
510
511 regions_set.extend(related_regions.keys().cloned());
512 regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
513
514 let regions_to_discover = regions_set.into_iter().collect_vec();
515
516 let (mut region_to_peer, _) = self
517 .discover_route_for_regions(®ions_to_discover)
518 .await?;
519
520 for (region_id, route) in &self.data.region_routes_override {
521 region_to_peer
522 .entry(*region_id)
523 .or_insert_with(|| route.clone());
524 }
525
526 self.data.region_routes = region_to_peer;
527
528 Ok(())
529 }
530
531 async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
533 let region_count = self.data.regions.len();
534 self.set_routes_and_related_regions()
535 .instrument(common_telemetry::tracing::info_span!(
536 "meta_gc_procedure_prepare_routes",
537 region_count = region_count
538 ))
539 .await?;
540
541 let query_regions = &self.data.regions;
542 let related_regions = &self.data.related_regions;
543 let region_routes = &self.data.region_routes;
544 let timeout = self.data.timeout;
545 let dropped_regions = self
546 .data
547 .region_routes_override
548 .keys()
549 .collect::<HashSet<_>>();
550
551 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
553
554 for region_id in query_regions {
555 if dropped_regions.contains(region_id) {
556 continue;
557 }
558 if let Some((leader, followers)) = region_routes.get(region_id) {
559 datanode2query_regions
560 .entry(leader.clone())
561 .or_default()
562 .push(*region_id);
563 for follower in followers {
565 datanode2query_regions
566 .entry(follower.clone())
567 .or_default()
568 .push(*region_id);
569 }
570 } else {
571 return error::UnexpectedSnafu {
572 violated: format!(
573 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
574 ),
575 }
576 .fail();
577 }
578 }
579
580 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
581 HashMap::new();
582 for (src_region, dst_regions) in related_regions {
583 for dst_region in dst_regions {
584 if let Some((leader, _followers)) = region_routes.get(dst_region) {
585 datanode2related_regions
586 .entry(leader.clone())
587 .or_default()
588 .entry(*src_region)
589 .or_default()
590 .insert(*dst_region);
591 } }
593 }
594
595 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
597 let mut all_manifest_versions = HashMap::new();
598 let mut all_cross_region_refs = HashMap::new();
599
600 let mut peers = HashSet::new();
601 peers.extend(datanode2query_regions.keys().cloned());
602 peers.extend(datanode2related_regions.keys().cloned());
603
604 let mailbox = &self.mailbox;
605 let server_addr = &self.data.server_addr;
606 let mut tasks = Vec::new();
607
608 for peer in peers {
609 let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
610 let related_regions_for_peer =
611 datanode2related_regions.remove(&peer).unwrap_or_default();
612
613 if regions.is_empty() && related_regions_for_peer.is_empty() {
614 continue;
615 }
616
617 tasks.push(async move {
618 let instruction = GetFileRefs {
619 query_regions: regions.clone(),
620 related_regions: related_regions_for_peer.clone(),
621 };
622
623 let reply =
624 send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout)
625 .await;
626
627 (peer, regions, related_regions_for_peer, reply)
628 });
629 }
630
631 let mut recv_tasks = Vec::new();
632 let mut first_error = None;
634 let mut record_get_file_refs_error = |e| {
635 METRIC_META_GC_DATANODE_CALLS_TOTAL
636 .with_label_values(&["get_file_refs", "error"])
637 .inc();
638 if first_error.is_none() {
639 first_error = Some(e);
640 }
641 };
642 for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await {
643 match reply {
644 Ok(mailbox_rx) => {
645 recv_tasks.push(async move {
646 let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await;
647 (peer, regions, related_regions_for_peer, reply)
648 });
649 }
650 Err(e) => record_get_file_refs_error(e),
651 }
652 }
653
654 let replies = join_all(recv_tasks).await;
655
656 for (peer, regions, related_regions_for_peer, reply) in replies {
657 let reply = match reply {
658 Ok(reply) => reply,
659 Err(e) => {
660 record_get_file_refs_error(e);
661 continue;
662 }
663 };
664 debug!(
665 "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
666 peer, regions, related_regions_for_peer, reply
667 );
668
669 if !reply.success {
670 METRIC_META_GC_DATANODE_CALLS_TOTAL
671 .with_label_values(&["get_file_refs", "error"])
672 .inc();
673 let err = error::UnexpectedSnafu {
674 violated: format!(
675 "Failed to get file references from datanode {}: {:?}",
676 peer, reply.error
677 ),
678 }
679 .build();
680 record_get_file_refs_error(err);
681 continue;
682 }
683 METRIC_META_GC_DATANODE_CALLS_TOTAL
684 .with_label_values(&["get_file_refs", "success"])
685 .inc();
686
687 for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
689 all_file_refs
690 .entry(region_id)
691 .or_default()
692 .extend(file_refs);
693 }
694
695 for (region_id, version) in reply.file_refs_manifest.manifest_version {
697 let entry = all_manifest_versions.entry(region_id).or_insert(version);
698 *entry = (*entry).min(version);
699 }
700
701 for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
702 let entry = all_cross_region_refs
703 .entry(region_id)
704 .or_insert_with(HashSet::new);
705 entry.extend(related_region_ids);
706 }
707 }
708
709 if let Some(e) = first_error {
710 return Err(e);
711 }
712
713 Ok(FileRefsManifest {
714 file_refs: all_file_refs,
715 manifest_version: all_manifest_versions,
716 cross_region_refs: all_cross_region_refs,
717 })
718 }
719
720 async fn send_gc_instructions(&self) -> Result<GcReport> {
723 let regions = &self.data.regions;
724 let region_routes = &self.data.region_routes;
725 let file_refs = &self.data.file_refs;
726 let timeout = self.data.timeout;
727
728 let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
730 let mut all_report = GcReport::default();
731
732 for region_id in regions {
733 if let Some((leader, _followers)) = region_routes.get(region_id) {
734 datanode2regions
735 .entry(leader.clone())
736 .or_default()
737 .push(*region_id);
738 } else {
739 return error::UnexpectedSnafu {
740 violated: format!(
741 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
742 ),
743 }
744 .fail();
745 }
746 }
747
748 let mut all_need_retry = HashSet::new();
749 let mailbox = &self.mailbox;
750 let server_addr = self.data.server_addr.as_str();
751 let full_file_listing = self.data.full_file_listing;
752 let tasks = datanode2regions
753 .into_iter()
754 .map(|(peer, regions_for_peer)| {
755 let gc_regions = GcRegions {
756 regions: regions_for_peer.clone(),
757 file_refs_manifest: file_refs.clone(),
761 full_file_listing,
762 };
763 let region_count = gc_regions.regions.len() as u64;
764
765 async move {
766 let report = send_gc_regions_inner(
767 mailbox,
768 &peer,
769 &gc_regions,
770 server_addr,
771 timeout,
772 "Batch GC",
773 )
774 .await;
775
776 (peer, gc_regions, region_count, report)
777 }
778 });
779
780 let mut recv_tasks = Vec::new();
781 let mut first_error = None;
782 let mut record_gc_error = |e, region_count| {
783 METRIC_META_GC_DATANODE_CALLS_TOTAL
784 .with_label_values(&["gc_regions", "error"])
785 .inc();
786 if region_count > 0 {
787 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count);
788 }
789 if first_error.is_none() {
790 first_error = Some(e);
791 }
792 };
793 for (peer, gc_regions, region_count, report) in join_all(tasks).await {
794 match report {
795 Ok(mailbox_rx) => {
796 recv_tasks.push(async move {
797 let report =
798 recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await;
799 (peer, region_count, report)
800 });
801 }
802 Err(e) => record_gc_error(e, region_count),
803 }
804 }
805
806 for (peer, region_count, report) in join_all(recv_tasks).await {
807 let report = match report {
808 Ok(report) => {
809 METRIC_META_GC_DATANODE_CALLS_TOTAL
810 .with_label_values(&["gc_regions", "success"])
811 .inc();
812 let need_retry_count = report.need_retry_regions.len() as u64;
813 if need_retry_count > 0 {
814 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
815 }
816 report
817 }
818 Err(e) => {
819 record_gc_error(e, region_count);
820 continue;
821 }
822 };
823
824 let success = report.deleted_files.keys().collect_vec();
825 let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
826
827 if need_retry.is_empty() {
828 info!(
829 "GC report from datanode {}: successfully deleted files for regions {:?}",
830 peer, success
831 );
832 } else {
833 warn!(
834 "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
835 peer, success, need_retry
836 );
837 }
838 all_need_retry.extend(report.need_retry_regions.clone());
839 all_report.merge(report);
840 }
841
842 if let Some(e) = first_error {
843 return Err(e);
844 }
845
846 if !all_need_retry.is_empty() {
847 warn!("Regions need retry after batch GC: {:?}", all_need_retry);
848 }
849
850 Ok(all_report)
851 }
852}
853
854#[async_trait::async_trait]
855impl Procedure for BatchGcProcedure {
856 fn type_name(&self) -> &str {
857 Self::TYPE_NAME
858 }
859
860 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
861 match self.data.state {
862 State::Start => {
863 let _regions_span = common_telemetry::tracing::debug_span!(
864 "meta_gc_procedure_regions",
865 state = "start",
866 regions = ?self.data.regions
867 )
868 .entered();
869 info!(
870 "Batch GC procedure transitioning from Start to Acquiring for {} regions",
871 self.data.regions.len()
872 );
873 self.data.state = State::Acquiring;
875 Ok(Status::executing(false))
876 }
877 State::Acquiring => {
878 let region_count = self.data.regions.len();
879 let full_file_listing = self.data.full_file_listing;
880 let regions = self.data.regions.clone();
881 info!(
882 "Batch GC procedure acquiring file references for {} regions",
883 region_count
884 );
885 match self
887 .get_file_references()
888 .instrument(common_telemetry::tracing::debug_span!(
889 "meta_gc_procedure_regions",
890 state = "acquiring",
891 regions = ?regions
892 ))
893 .instrument(common_telemetry::tracing::info_span!(
894 "meta_gc_procedure_get_file_references",
895 region_count = region_count,
896 full_file_listing = full_file_listing
897 ))
898 .await
899 {
900 Ok(file_refs) => {
901 info!(
902 "Batch GC procedure acquired file references for {} regions",
903 file_refs.file_refs.len()
904 );
905 self.data.file_refs = file_refs;
906 self.data.state = State::Gcing;
907 Ok(Status::executing(false))
908 }
909 Err(e) => {
910 error!(e; "Failed to get file references");
911 Err(ProcedureError::external(e))
912 }
913 }
914 }
915 State::Gcing => {
916 info!(
917 "Batch GC procedure sending GC instructions for {} regions",
918 self.data.regions.len()
919 );
920 match self
923 .send_gc_instructions()
924 .instrument(common_telemetry::tracing::debug_span!(
925 "meta_gc_procedure_regions",
926 state = "gcing",
927 regions = ?self.data.regions
928 ))
929 .instrument(common_telemetry::tracing::info_span!(
930 "meta_gc_procedure_send_gc_instructions",
931 region_count = self.data.regions.len(),
932 full_file_listing = self.data.full_file_listing
933 ))
934 .await
935 {
936 Ok(report) => {
937 info!(
938 "Batch GC procedure received GC report, retry region count: {}",
939 report.need_retry_regions.len()
940 );
941 self.data.state = State::UpdateRepartition;
942 self.data.gc_report = Some(report);
943 Ok(Status::executing(false))
944 }
945 Err(e) => {
946 error!(e; "Failed to send GC instructions");
947 Err(ProcedureError::external(e))
948 }
949 }
950 }
951 State::UpdateRepartition => match self
952 .cleanup_region_repartition(ctx)
953 .instrument(common_telemetry::tracing::debug_span!(
954 "meta_gc_procedure_regions",
955 state = "update_repartition",
956 regions = ?self.data.regions
957 ))
958 .instrument(common_telemetry::tracing::info_span!(
959 "meta_gc_procedure_update_repartition",
960 region_count = self.data.regions.len()
961 ))
962 .await
963 {
964 Ok(()) => {
965 debug!(
966 "Cleanup region repartition info completed successfully for regions {:?}",
967 self.data.regions
968 );
969 info!(
970 "Batch GC completed successfully for regions {:?}",
971 self.data.regions
972 );
973 let Some(report) = self.data.gc_report.take() else {
974 return common_procedure::error::UnexpectedSnafu {
975 err_msg: "GC report should be present after GC completion".to_string(),
976 }
977 .fail();
978 };
979 info!("GC report: {:?}", report);
980 Ok(Status::done_with_output(report))
981 }
982 Err(e) => {
983 error!(e; "Failed to cleanup region repartition info");
984 Err(ProcedureError::external(e))
985 }
986 },
987 }
988 }
989
990 fn dump(&self) -> ProcedureResult<String> {
991 serde_json::to_string(&self.data).context(ToJsonSnafu)
992 }
993
994 fn lock_key(&self) -> LockKey {
997 let lock_key: Vec<_> = self
998 .data
999 .regions
1000 .iter()
1001 .sorted() .map(|id| RegionLock::Read(*id).into())
1003 .collect();
1004
1005 LockKey::new(lock_key)
1006 }
1007}