1use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30 Result as ProcedureResult, Status,
31};
32use common_telemetry::tracing::Instrument as _;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, info, warn};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use serde::{Deserialize, Serialize};
38use snafu::ResultExt as _;
39use store_api::storage::{FileRefsManifest, GcReport, RegionId};
40use table::metadata::TableId;
41
42use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
43use crate::gc::util::table_route_to_region;
44use crate::gc::{Peer2Regions, Region2Peers};
45use crate::handler::HeartbeatMailbox;
46use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
47use crate::procedure::utils::{instruction_error_result, instruction_to_error};
48use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
49
50async fn send_get_file_refs_inner(
51 mailbox: &MailboxRef,
52 server_addr: &str,
53 peer: &Peer,
54 instruction: GetFileRefs,
55 timeout: Duration,
56) -> Result<MailboxReceiver> {
57 let instruction = instruction::Instruction::GetFileRefs(instruction);
58 let tracing_ctx = TracingContext::from_current_span();
59 let msg = MailboxMessage::json_message(
60 &format!("Get file references: {}", instruction),
61 &format!("Metasrv@{}", server_addr),
62 &format!("Datanode-{}@{}", peer.id, peer.addr),
63 common_time::util::current_time_millis(),
64 &instruction,
65 Some(tracing_ctx.to_w3c()),
66 )
67 .with_context(|_| SerializeToJsonSnafu {
68 input: instruction.to_string(),
69 })?;
70
71 mailbox
72 .send(&Channel::Datanode(peer.id), msg, timeout)
73 .await
74}
75
76async fn recv_get_file_refs_reply(
77 peer: &Peer,
78 mailbox_rx: MailboxReceiver,
79) -> Result<GetFileRefsReply> {
80 let reply = match mailbox_rx.await {
81 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
82 Err(e) => {
83 error!(
84 e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
85 peer,
86 );
87 return Err(e);
88 }
89 };
90
91 let InstructionReply::GetFileRefs(reply) = reply else {
92 return error::UnexpectedInstructionReplySnafu {
93 mailbox_message: format!("{:?}", reply),
94 reason: "Unexpected reply of the GetFileRefs instruction",
95 }
96 .fail();
97 };
98
99 Ok(reply)
100}
101
102async fn send_gc_regions_inner(
103 mailbox: &MailboxRef,
104 peer: &Peer,
105 gc_regions: &GcRegions,
106 server_addr: &str,
107 timeout: Duration,
108 description: &str,
109) -> Result<MailboxReceiver> {
110 let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
111 let tracing_ctx = TracingContext::from_current_span();
112 let msg = MailboxMessage::json_message(
113 &format!("{}: {}", description, instruction),
114 &format!("Metasrv@{}", server_addr),
115 &format!("Datanode-{}@{}", peer.id, peer.addr),
116 common_time::util::current_time_millis(),
117 &instruction,
118 Some(tracing_ctx.to_w3c()),
119 )
120 .with_context(|_| SerializeToJsonSnafu {
121 input: instruction.to_string(),
122 })?;
123
124 mailbox
125 .send(&Channel::Datanode(peer.id), msg, timeout)
126 .await
127}
128
129async fn recv_gc_regions_reply(
130 peer: &Peer,
131 gc_regions: &GcRegions,
132 description: &str,
133 mailbox_rx: MailboxReceiver,
134) -> Result<GcReport> {
135 let reply = match mailbox_rx.await {
136 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
137 Err(e) => {
138 error!(
139 e; "Failed to receive reply from datanode {} for {}",
140 peer, description
141 );
142 return Err(e);
143 }
144 };
145
146 let InstructionReply::GcRegions(reply) = reply else {
147 return error::UnexpectedInstructionReplySnafu {
148 mailbox_message: format!("{:?}", reply),
149 reason: "Unexpected reply of the GcRegions instruction",
150 }
151 .fail();
152 };
153
154 let res = reply.result;
155 match res {
156 Ok(report) => Ok(report),
157 Err(e) => {
158 error!(
159 e; "Datanode {} reported error during GC for regions {:?}",
160 peer, gc_regions
161 );
162 instruction_error_result(
163 &e,
164 format!(
165 "Datanode {} reported error during GC for regions {:?}: {}",
166 peer, gc_regions, e
167 ),
168 )
169 }
170 }
171}
172
173pub struct BatchGcProcedure {
176 mailbox: MailboxRef,
177 table_metadata_manager: TableMetadataManagerRef,
178 data: BatchGcData,
179}
180
181#[derive(Serialize, Deserialize)]
182pub struct BatchGcData {
183 state: State,
184 server_addr: String,
186 regions: Vec<RegionId>,
188 full_file_listing: bool,
189 region_routes: Region2Peers,
190 #[serde(default)]
192 region_routes_override: Region2Peers,
193 related_regions: HashMap<RegionId, HashSet<RegionId>>,
196 file_refs: FileRefsManifest,
198 timeout: Duration,
200 gc_report: Option<GcReport>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
204pub enum State {
205 Start,
207 Acquiring,
209 Gcing,
211 UpdateRepartition,
213}
214
215impl BatchGcProcedure {
216 pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
217
218 pub fn new(
219 mailbox: MailboxRef,
220 table_metadata_manager: TableMetadataManagerRef,
221 server_addr: String,
222 regions: Vec<RegionId>,
223 full_file_listing: bool,
224 timeout: Duration,
225 region_routes_override: Region2Peers,
226 ) -> Self {
227 Self {
228 mailbox,
229 table_metadata_manager,
230 data: BatchGcData {
231 state: State::Start,
232 server_addr,
233 regions,
234 full_file_listing,
235 timeout,
236 region_routes: HashMap::new(),
237 region_routes_override,
238 related_regions: HashMap::new(),
239 file_refs: FileRefsManifest::default(),
240 gc_report: None,
241 },
242 }
243 }
244
245 #[cfg(feature = "mock")]
249 pub fn new_update_repartition_for_test(
250 mailbox: MailboxRef,
251 table_metadata_manager: TableMetadataManagerRef,
252 server_addr: String,
253 regions: Vec<RegionId>,
254 file_refs: FileRefsManifest,
255 timeout: Duration,
256 ) -> Self {
257 Self {
258 mailbox,
259 table_metadata_manager,
260 data: BatchGcData {
261 state: State::UpdateRepartition,
262 server_addr,
263 regions,
264 full_file_listing: false,
265 timeout,
266 region_routes: HashMap::new(),
267 region_routes_override: HashMap::new(),
268 related_regions: HashMap::new(),
269 file_refs,
270 gc_report: Some(GcReport::default()),
271 },
272 }
273 }
274
275 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
276 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
277 error::UnexpectedSnafu {
278 violated: format!(
279 "Failed to downcast procedure result to GcReport, got {:?}",
280 std::any::type_name_of_val(&res.as_ref())
281 ),
282 }
283 .build()
284 })
285 }
286
287 async fn get_table_route(
288 &self,
289 table_id: TableId,
290 ) -> Result<(TableId, PhysicalTableRouteValue)> {
291 self.table_metadata_manager
292 .table_route_manager()
293 .get_physical_table_route(table_id)
294 .await
295 .context(TableMetadataManagerSnafu)
296 }
297
298 async fn find_related_regions(
302 &self,
303 regions: &[RegionId],
304 ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
305 let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
306 let table_ids = table_ids.into_iter().collect::<Vec<_>>();
307
308 let table_routes = self
309 .table_metadata_manager
310 .table_route_manager()
311 .batch_get_physical_table_routes(&table_ids)
312 .await
313 .context(TableMetadataManagerSnafu)?;
314
315 if table_routes.len() != table_ids.len() {
316 for table_id in &table_ids {
318 if !table_routes.contains_key(table_id) {
319 return error::InvalidArgumentsSnafu {
321 err_msg: format!(
322 "Unexpected logical table route: table {} resolved to physical table regions",
323 table_id
324 ),
325 }
326 .fail();
327 }
328 }
329 }
330
331 let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
332 for (table_id, table_route) in table_routes {
333 let all_regions: HashSet<RegionId> = table_route
334 .region_routes
335 .iter()
336 .map(|r| r.region.id)
337 .collect();
338
339 table_all_regions.insert(table_id, all_regions);
340 }
341
342 let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
343 for region_id in regions {
344 let table_id = region_id.table_id();
345 if let Some(all_regions) = table_all_regions.get(&table_id) {
346 let mut related: HashSet<RegionId> = all_regions.clone();
347 related.remove(region_id);
348 related_regions.insert(*region_id, related);
349 } else {
350 related_regions.insert(*region_id, Default::default());
351 }
352 }
353
354 Ok(related_regions)
355 }
356
357 async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
360 let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
361 HashMap::new();
362 for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
363 cross_refs_grouped
364 .entry(src_region.table_id())
365 .or_default()
366 .entry(*src_region)
367 .or_default()
368 .extend(dst_regions.iter().copied());
369 }
370
371 let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
372 for (src_region, refs) in &self.data.file_refs.file_refs {
373 if refs.is_empty() {
374 continue;
375 }
376
377 tmp_refs_grouped
378 .entry(src_region.table_id())
379 .or_default()
380 .insert(*src_region);
381 }
382
383 let repart_mgr = self.table_metadata_manager.table_repart_manager();
384
385 let mut table_ids: HashSet<TableId> = cross_refs_grouped
386 .keys()
387 .copied()
388 .chain(tmp_refs_grouped.keys().copied())
389 .collect();
390 table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
391
392 for table_id in table_ids {
393 let table_lock = TableLock::Write(table_id).into();
394 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
395
396 let cross_refs = cross_refs_grouped
397 .get(&table_id)
398 .cloned()
399 .unwrap_or_default();
400 let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
401
402 let current = repart_mgr
403 .get_with_raw_bytes(table_id)
404 .await
405 .context(KvBackendSnafu)?;
406
407 let mut new_value = current
408 .as_ref()
409 .map(|v| (**v).clone())
410 .unwrap_or_else(TableRepartValue::new);
411
412 let batch_src_regions: HashSet<RegionId> = self
415 .data
416 .regions
417 .iter()
418 .copied()
419 .filter(|r| r.table_id() == table_id)
420 .collect();
421
422 let all_src_regions: HashSet<RegionId> = batch_src_regions;
426
427 for src_region in all_src_regions {
428 let cross_dst = cross_refs.get(&src_region);
429 let has_tmp_ref = tmp_refs.contains(&src_region);
430
431 if let Some(dst_regions) = cross_dst {
432 let mut set = BTreeSet::new();
433 set.extend(dst_regions.iter().copied());
434 new_value.src_to_dst.insert(src_region, set);
435 } else if has_tmp_ref {
436 new_value.src_to_dst.insert(src_region, BTreeSet::new());
439 } else {
440 new_value.src_to_dst.remove(&src_region);
441 }
442 }
443
444 if new_value.src_to_dst.is_empty() && current.is_none() {
446 continue;
447 }
448
449 repart_mgr
450 .upsert_value(table_id, current, &new_value)
451 .await
452 .context(KvBackendSnafu)?;
453 }
454
455 Ok(())
456 }
457
458 async fn discover_route_for_regions(
460 &self,
461 regions: &[RegionId],
462 ) -> Result<(Region2Peers, Peer2Regions)> {
463 let mut region_to_peer = HashMap::new();
464 let mut peer_to_regions = HashMap::new();
465
466 let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
468 for region_id in regions {
469 let table_id = region_id.table_id();
470 table_to_regions
471 .entry(table_id)
472 .or_default()
473 .push(*region_id);
474 }
475
476 for (table_id, table_regions) in table_to_regions {
478 match self.get_table_route(table_id).await {
479 Ok((_phy_table_id, table_route)) => {
480 table_route_to_region(
481 &table_route,
482 &table_regions,
483 &mut region_to_peer,
484 &mut peer_to_regions,
485 );
486 }
487 Err(e) => {
488 warn!(
491 "Failed to get table route for table {}: {}, skipping its regions",
492 table_id, e
493 );
494 continue;
495 }
496 }
497 }
498
499 Ok((region_to_peer, peer_to_regions))
500 }
501
502 async fn set_routes_and_related_regions(&mut self) -> Result<()> {
504 let related_regions = self.find_related_regions(&self.data.regions).await?;
505
506 self.data.related_regions = related_regions.clone();
507
508 let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
511
512 regions_set.extend(related_regions.keys().cloned());
513 regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
514
515 let regions_to_discover = regions_set.into_iter().collect_vec();
516
517 let (mut region_to_peer, _) = self
518 .discover_route_for_regions(®ions_to_discover)
519 .await?;
520
521 for (region_id, route) in &self.data.region_routes_override {
522 region_to_peer
523 .entry(*region_id)
524 .or_insert_with(|| route.clone());
525 }
526
527 self.data.region_routes = region_to_peer;
528
529 Ok(())
530 }
531
532 async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
534 let region_count = self.data.regions.len();
535 self.set_routes_and_related_regions()
536 .instrument(common_telemetry::tracing::info_span!(
537 "meta_gc_procedure_prepare_routes",
538 region_count = region_count
539 ))
540 .await?;
541
542 let query_regions = &self.data.regions;
543 let related_regions = &self.data.related_regions;
544 let region_routes = &self.data.region_routes;
545 let timeout = self.data.timeout;
546 let dropped_regions = self
547 .data
548 .region_routes_override
549 .keys()
550 .collect::<HashSet<_>>();
551
552 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
554
555 for region_id in query_regions {
556 if dropped_regions.contains(region_id) {
557 continue;
558 }
559 if let Some((leader, followers)) = region_routes.get(region_id) {
560 datanode2query_regions
561 .entry(leader.clone())
562 .or_default()
563 .push(*region_id);
564 for follower in followers {
566 datanode2query_regions
567 .entry(follower.clone())
568 .or_default()
569 .push(*region_id);
570 }
571 } else {
572 return error::UnexpectedSnafu {
573 violated: format!(
574 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
575 ),
576 }
577 .fail();
578 }
579 }
580
581 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
582 HashMap::new();
583 for (src_region, dst_regions) in related_regions {
584 for dst_region in dst_regions {
585 if let Some((leader, _followers)) = region_routes.get(dst_region) {
586 datanode2related_regions
587 .entry(leader.clone())
588 .or_default()
589 .entry(*src_region)
590 .or_default()
591 .insert(*dst_region);
592 } }
594 }
595
596 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
598 let mut all_manifest_versions = HashMap::new();
599 let mut all_cross_region_refs = HashMap::new();
600
601 let mut peers = HashSet::new();
602 peers.extend(datanode2query_regions.keys().cloned());
603 peers.extend(datanode2related_regions.keys().cloned());
604
605 let mailbox = &self.mailbox;
606 let server_addr = &self.data.server_addr;
607 let mut tasks = Vec::new();
608
609 for peer in peers {
610 let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
611 let related_regions_for_peer =
612 datanode2related_regions.remove(&peer).unwrap_or_default();
613
614 if regions.is_empty() && related_regions_for_peer.is_empty() {
615 continue;
616 }
617
618 tasks.push(async move {
619 let instruction = GetFileRefs {
620 query_regions: regions.clone(),
621 related_regions: related_regions_for_peer.clone(),
622 };
623
624 let reply =
625 send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout)
626 .await;
627
628 (peer, regions, related_regions_for_peer, reply)
629 });
630 }
631
632 let mut recv_tasks = Vec::new();
633 let mut first_error = None;
635 let mut record_get_file_refs_error = |e| {
636 METRIC_META_GC_DATANODE_CALLS_TOTAL
637 .with_label_values(&["get_file_refs", "error"])
638 .inc();
639 if first_error.is_none() {
640 first_error = Some(e);
641 }
642 };
643 for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await {
644 match reply {
645 Ok(mailbox_rx) => {
646 recv_tasks.push(async move {
647 let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await;
648 (peer, regions, related_regions_for_peer, reply)
649 });
650 }
651 Err(e) => record_get_file_refs_error(e),
652 }
653 }
654
655 let replies = join_all(recv_tasks).await;
656
657 for (peer, regions, related_regions_for_peer, reply) in replies {
658 let reply = match reply {
659 Ok(reply) => reply,
660 Err(e) => {
661 record_get_file_refs_error(e);
662 continue;
663 }
664 };
665 debug!(
666 "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
667 peer, regions, related_regions_for_peer, reply
668 );
669
670 if !reply.success {
671 METRIC_META_GC_DATANODE_CALLS_TOTAL
672 .with_label_values(&["get_file_refs", "error"])
673 .inc();
674 let err = if let Some(error) = &reply.error {
675 instruction_to_error(
676 error,
677 format!(
678 "Failed to get file references from datanode {}: {:?}",
679 peer, error
680 ),
681 )
682 } else {
683 error::UnexpectedSnafu {
684 violated: format!(
685 "Failed to get file references from datanode {}: {:?}",
686 peer, reply.error
687 ),
688 }
689 .build()
690 };
691 record_get_file_refs_error(err);
692 continue;
693 }
694 METRIC_META_GC_DATANODE_CALLS_TOTAL
695 .with_label_values(&["get_file_refs", "success"])
696 .inc();
697
698 for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
700 all_file_refs
701 .entry(region_id)
702 .or_default()
703 .extend(file_refs);
704 }
705
706 for (region_id, version) in reply.file_refs_manifest.manifest_version {
708 let entry = all_manifest_versions.entry(region_id).or_insert(version);
709 *entry = (*entry).min(version);
710 }
711
712 for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
713 let entry = all_cross_region_refs
714 .entry(region_id)
715 .or_insert_with(HashSet::new);
716 entry.extend(related_region_ids);
717 }
718 }
719
720 if let Some(e) = first_error {
721 return Err(e);
722 }
723
724 Ok(FileRefsManifest {
725 file_refs: all_file_refs,
726 manifest_version: all_manifest_versions,
727 cross_region_refs: all_cross_region_refs,
728 })
729 }
730
731 async fn send_gc_instructions(&self) -> Result<GcReport> {
734 let regions = &self.data.regions;
735 let region_routes = &self.data.region_routes;
736 let file_refs = &self.data.file_refs;
737 let timeout = self.data.timeout;
738
739 let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
741 let mut all_report = GcReport::default();
742
743 for region_id in regions {
744 if let Some((leader, _followers)) = region_routes.get(region_id) {
745 datanode2regions
746 .entry(leader.clone())
747 .or_default()
748 .push(*region_id);
749 } else {
750 return error::UnexpectedSnafu {
751 violated: format!(
752 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
753 ),
754 }
755 .fail();
756 }
757 }
758
759 let mut all_need_retry = HashSet::new();
760 let mailbox = &self.mailbox;
761 let server_addr = self.data.server_addr.as_str();
762 let full_file_listing = self.data.full_file_listing;
763 let tasks = datanode2regions
764 .into_iter()
765 .map(|(peer, regions_for_peer)| {
766 let gc_regions = GcRegions {
767 regions: regions_for_peer.clone(),
768 file_refs_manifest: file_refs.clone(),
772 full_file_listing,
773 };
774 let region_count = gc_regions.regions.len() as u64;
775
776 async move {
777 let report = send_gc_regions_inner(
778 mailbox,
779 &peer,
780 &gc_regions,
781 server_addr,
782 timeout,
783 "Batch GC",
784 )
785 .await;
786
787 (peer, gc_regions, region_count, report)
788 }
789 });
790
791 let mut recv_tasks = Vec::new();
792 let mut first_error = None;
793 let mut record_gc_error = |e, region_count| {
794 METRIC_META_GC_DATANODE_CALLS_TOTAL
795 .with_label_values(&["gc_regions", "error"])
796 .inc();
797 if region_count > 0 {
798 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count);
799 }
800 if first_error.is_none() {
801 first_error = Some(e);
802 }
803 };
804 for (peer, gc_regions, region_count, report) in join_all(tasks).await {
805 match report {
806 Ok(mailbox_rx) => {
807 recv_tasks.push(async move {
808 let report =
809 recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await;
810 (peer, region_count, report)
811 });
812 }
813 Err(e) => record_gc_error(e, region_count),
814 }
815 }
816
817 for (peer, region_count, report) in join_all(recv_tasks).await {
818 let report = match report {
819 Ok(report) => {
820 METRIC_META_GC_DATANODE_CALLS_TOTAL
821 .with_label_values(&["gc_regions", "success"])
822 .inc();
823 let need_retry_count = report.need_retry_regions.len() as u64;
824 if need_retry_count > 0 {
825 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
826 }
827 report
828 }
829 Err(e) => {
830 record_gc_error(e, region_count);
831 continue;
832 }
833 };
834
835 let success = report.deleted_files.keys().collect_vec();
836 let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
837
838 if need_retry.is_empty() {
839 info!(
840 "GC report from datanode {}: successfully deleted files for regions {:?}",
841 peer, success
842 );
843 } else {
844 warn!(
845 "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
846 peer, success, need_retry
847 );
848 }
849 all_need_retry.extend(report.need_retry_regions.clone());
850 all_report.merge(report);
851 }
852
853 if let Some(e) = first_error {
854 return Err(e);
855 }
856
857 if !all_need_retry.is_empty() {
858 warn!("Regions need retry after batch GC: {:?}", all_need_retry);
859 }
860
861 Ok(all_report)
862 }
863}
864
865#[async_trait::async_trait]
866impl Procedure for BatchGcProcedure {
867 fn type_name(&self) -> &str {
868 Self::TYPE_NAME
869 }
870
871 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
872 match self.data.state {
873 State::Start => {
874 let _regions_span = common_telemetry::tracing::debug_span!(
875 "meta_gc_procedure_regions",
876 state = "start",
877 regions = ?self.data.regions
878 )
879 .entered();
880 info!(
881 "Batch GC procedure transitioning from Start to Acquiring for {} regions",
882 self.data.regions.len()
883 );
884 self.data.state = State::Acquiring;
886 Ok(Status::executing(false))
887 }
888 State::Acquiring => {
889 let region_count = self.data.regions.len();
890 let full_file_listing = self.data.full_file_listing;
891 let regions = self.data.regions.clone();
892 info!(
893 "Batch GC procedure acquiring file references for {} regions",
894 region_count
895 );
896 match self
898 .get_file_references()
899 .instrument(common_telemetry::tracing::debug_span!(
900 "meta_gc_procedure_regions",
901 state = "acquiring",
902 regions = ?regions
903 ))
904 .instrument(common_telemetry::tracing::info_span!(
905 "meta_gc_procedure_get_file_references",
906 region_count = region_count,
907 full_file_listing = full_file_listing
908 ))
909 .await
910 {
911 Ok(file_refs) => {
912 info!(
913 "Batch GC procedure acquired file references for {} regions",
914 file_refs.file_refs.len()
915 );
916 self.data.file_refs = file_refs;
917 self.data.state = State::Gcing;
918 Ok(Status::executing(false))
919 }
920 Err(e) => {
921 error!(e; "Failed to get file references");
922 Err(ProcedureError::external(e))
923 }
924 }
925 }
926 State::Gcing => {
927 info!(
928 "Batch GC procedure sending GC instructions for {} regions",
929 self.data.regions.len()
930 );
931 match self
934 .send_gc_instructions()
935 .instrument(common_telemetry::tracing::debug_span!(
936 "meta_gc_procedure_regions",
937 state = "gcing",
938 regions = ?self.data.regions
939 ))
940 .instrument(common_telemetry::tracing::info_span!(
941 "meta_gc_procedure_send_gc_instructions",
942 region_count = self.data.regions.len(),
943 full_file_listing = self.data.full_file_listing
944 ))
945 .await
946 {
947 Ok(report) => {
948 info!(
949 "Batch GC procedure received GC report, retry region count: {}",
950 report.need_retry_regions.len()
951 );
952 self.data.state = State::UpdateRepartition;
953 self.data.gc_report = Some(report);
954 Ok(Status::executing(false))
955 }
956 Err(e) => {
957 error!(e; "Failed to send GC instructions");
958 Err(ProcedureError::external(e))
959 }
960 }
961 }
962 State::UpdateRepartition => match self
963 .cleanup_region_repartition(ctx)
964 .instrument(common_telemetry::tracing::debug_span!(
965 "meta_gc_procedure_regions",
966 state = "update_repartition",
967 regions = ?self.data.regions
968 ))
969 .instrument(common_telemetry::tracing::info_span!(
970 "meta_gc_procedure_update_repartition",
971 region_count = self.data.regions.len()
972 ))
973 .await
974 {
975 Ok(()) => {
976 debug!(
977 "Cleanup region repartition info completed successfully for regions {:?}",
978 self.data.regions
979 );
980 info!(
981 "Batch GC completed successfully for regions {:?}",
982 self.data.regions
983 );
984 let Some(report) = self.data.gc_report.take() else {
985 return common_procedure::error::UnexpectedSnafu {
986 err_msg: "GC report should be present after GC completion".to_string(),
987 }
988 .fail();
989 };
990 info!("GC report: {:?}", report);
991 Ok(Status::done_with_output(report))
992 }
993 Err(e) => {
994 error!(e; "Failed to cleanup region repartition info");
995 Err(ProcedureError::external(e))
996 }
997 },
998 }
999 }
1000
1001 fn dump(&self) -> ProcedureResult<String> {
1002 serde_json::to_string(&self.data).context(ToJsonSnafu)
1003 }
1004
1005 fn lock_key(&self) -> LockKey {
1008 let lock_key: Vec<_> = self
1009 .data
1010 .regions
1011 .iter()
1012 .sorted() .map(|id| RegionLock::Read(*id).into())
1014 .collect();
1015
1016 LockKey::new(lock_key)
1017 }
1018}