1use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30 Result as ProcedureResult, Status,
31};
32use common_telemetry::{debug, error, info, warn};
33use itertools::Itertools as _;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt as _;
36use store_api::storage::{FileRefsManifest, GcReport, RegionId};
37use table::metadata::TableId;
38
39use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
40use crate::gc::util::table_route_to_region;
41use crate::gc::{Peer2Regions, Region2Peers};
42use crate::handler::HeartbeatMailbox;
43use crate::service::mailbox::{Channel, MailboxRef};
44
45async fn send_get_file_refs(
47 mailbox: &MailboxRef,
48 server_addr: &str,
49 peer: &Peer,
50 instruction: GetFileRefs,
51 timeout: Duration,
52) -> Result<GetFileRefsReply> {
53 let instruction = instruction::Instruction::GetFileRefs(instruction);
54 let msg = MailboxMessage::json_message(
55 &format!("Get file references: {}", instruction),
56 &format!("Metasrv@{}", server_addr),
57 &format!("Datanode-{}@{}", peer.id, peer.addr),
58 common_time::util::current_time_millis(),
59 &instruction,
60 )
61 .with_context(|_| SerializeToJsonSnafu {
62 input: instruction.to_string(),
63 })?;
64
65 let mailbox_rx = mailbox
66 .send(&Channel::Datanode(peer.id), msg, timeout)
67 .await?;
68
69 let reply = match mailbox_rx.await {
70 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
71 Err(e) => {
72 error!(
73 e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
74 peer,
75 );
76 return Err(e);
77 }
78 };
79
80 let InstructionReply::GetFileRefs(reply) = reply else {
81 return error::UnexpectedInstructionReplySnafu {
82 mailbox_message: format!("{:?}", reply),
83 reason: "Unexpected reply of the GetFileRefs instruction",
84 }
85 .fail();
86 };
87
88 Ok(reply)
89}
90
91async fn send_gc_regions(
93 mailbox: &MailboxRef,
94 peer: &Peer,
95 gc_regions: GcRegions,
96 server_addr: &str,
97 timeout: Duration,
98 description: &str,
99) -> Result<GcReport> {
100 let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
101 let msg = MailboxMessage::json_message(
102 &format!("{}: {}", description, instruction),
103 &format!("Metasrv@{}", server_addr),
104 &format!("Datanode-{}@{}", peer.id, peer.addr),
105 common_time::util::current_time_millis(),
106 &instruction,
107 )
108 .with_context(|_| SerializeToJsonSnafu {
109 input: instruction.to_string(),
110 })?;
111
112 let mailbox_rx = mailbox
113 .send(&Channel::Datanode(peer.id), msg, timeout)
114 .await?;
115
116 let reply = match mailbox_rx.await {
117 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
118 Err(e) => {
119 error!(
120 e; "Failed to receive reply from datanode {} for {}",
121 peer, description
122 );
123 return Err(e);
124 }
125 };
126
127 let InstructionReply::GcRegions(reply) = reply else {
128 return error::UnexpectedInstructionReplySnafu {
129 mailbox_message: format!("{:?}", reply),
130 reason: "Unexpected reply of the GcRegions instruction",
131 }
132 .fail();
133 };
134
135 let res = reply.result;
136 match res {
137 Ok(report) => Ok(report),
138 Err(e) => {
139 error!(
140 e; "Datanode {} reported error during GC for regions {:?}",
141 peer, gc_regions
142 );
143 error::UnexpectedSnafu {
144 violated: format!(
145 "Datanode {} reported error during GC for regions {:?}: {}",
146 peer, gc_regions, e
147 ),
148 }
149 .fail()
150 }
151 }
152}
153
154pub struct BatchGcProcedure {
157 mailbox: MailboxRef,
158 table_metadata_manager: TableMetadataManagerRef,
159 data: BatchGcData,
160}
161
162#[derive(Serialize, Deserialize)]
163pub struct BatchGcData {
164 state: State,
165 server_addr: String,
167 regions: Vec<RegionId>,
169 full_file_listing: bool,
170 region_routes: Region2Peers,
171 #[serde(default)]
173 region_routes_override: Region2Peers,
174 related_regions: HashMap<RegionId, HashSet<RegionId>>,
177 file_refs: FileRefsManifest,
179 timeout: Duration,
181 gc_report: Option<GcReport>,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub enum State {
186 Start,
188 Acquiring,
190 Gcing,
192 UpdateRepartition,
194}
195
196impl BatchGcProcedure {
197 pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
198
199 pub fn new(
200 mailbox: MailboxRef,
201 table_metadata_manager: TableMetadataManagerRef,
202 server_addr: String,
203 regions: Vec<RegionId>,
204 full_file_listing: bool,
205 timeout: Duration,
206 region_routes_override: Region2Peers,
207 ) -> Self {
208 Self {
209 mailbox,
210 table_metadata_manager,
211 data: BatchGcData {
212 state: State::Start,
213 server_addr,
214 regions,
215 full_file_listing,
216 timeout,
217 region_routes: HashMap::new(),
218 region_routes_override,
219 related_regions: HashMap::new(),
220 file_refs: FileRefsManifest::default(),
221 gc_report: None,
222 },
223 }
224 }
225
226 #[cfg(feature = "mock")]
230 pub fn new_update_repartition_for_test(
231 mailbox: MailboxRef,
232 table_metadata_manager: TableMetadataManagerRef,
233 server_addr: String,
234 regions: Vec<RegionId>,
235 file_refs: FileRefsManifest,
236 timeout: Duration,
237 ) -> Self {
238 Self {
239 mailbox,
240 table_metadata_manager,
241 data: BatchGcData {
242 state: State::UpdateRepartition,
243 server_addr,
244 regions,
245 full_file_listing: false,
246 timeout,
247 region_routes: HashMap::new(),
248 region_routes_override: HashMap::new(),
249 related_regions: HashMap::new(),
250 file_refs,
251 gc_report: Some(GcReport::default()),
252 },
253 }
254 }
255
256 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
257 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
258 error::UnexpectedSnafu {
259 violated: format!(
260 "Failed to downcast procedure result to GcReport, got {:?}",
261 std::any::type_name_of_val(&res.as_ref())
262 ),
263 }
264 .build()
265 })
266 }
267
268 async fn get_table_route(
269 &self,
270 table_id: TableId,
271 ) -> Result<(TableId, PhysicalTableRouteValue)> {
272 self.table_metadata_manager
273 .table_route_manager()
274 .get_physical_table_route(table_id)
275 .await
276 .context(TableMetadataManagerSnafu)
277 }
278
279 async fn find_related_regions(
283 &self,
284 regions: &[RegionId],
285 ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
286 let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
287 let table_ids = table_ids.into_iter().collect::<Vec<_>>();
288
289 let table_routes = self
290 .table_metadata_manager
291 .table_route_manager()
292 .batch_get_physical_table_routes(&table_ids)
293 .await
294 .context(TableMetadataManagerSnafu)?;
295
296 if table_routes.len() != table_ids.len() {
297 for table_id in &table_ids {
299 if !table_routes.contains_key(table_id) {
300 return error::InvalidArgumentsSnafu {
302 err_msg: format!(
303 "Unexpected logical table route: table {} resolved to physical table regions",
304 table_id
305 ),
306 }
307 .fail();
308 }
309 }
310 }
311
312 let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
313 for (table_id, table_route) in table_routes {
314 let all_regions: HashSet<RegionId> = table_route
315 .region_routes
316 .iter()
317 .map(|r| r.region.id)
318 .collect();
319
320 table_all_regions.insert(table_id, all_regions);
321 }
322
323 let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
324 for region_id in regions {
325 let table_id = region_id.table_id();
326 if let Some(all_regions) = table_all_regions.get(&table_id) {
327 let mut related: HashSet<RegionId> = all_regions.clone();
328 related.remove(region_id);
329 related_regions.insert(*region_id, related);
330 } else {
331 related_regions.insert(*region_id, Default::default());
332 }
333 }
334
335 Ok(related_regions)
336 }
337
338 async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
341 let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
342 HashMap::new();
343 for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
344 cross_refs_grouped
345 .entry(src_region.table_id())
346 .or_default()
347 .entry(*src_region)
348 .or_default()
349 .extend(dst_regions.iter().copied());
350 }
351
352 let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
353 for (src_region, refs) in &self.data.file_refs.file_refs {
354 if refs.is_empty() {
355 continue;
356 }
357
358 tmp_refs_grouped
359 .entry(src_region.table_id())
360 .or_default()
361 .insert(*src_region);
362 }
363
364 let repart_mgr = self.table_metadata_manager.table_repart_manager();
365
366 let mut table_ids: HashSet<TableId> = cross_refs_grouped
367 .keys()
368 .copied()
369 .chain(tmp_refs_grouped.keys().copied())
370 .collect();
371 table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
372
373 for table_id in table_ids {
374 let table_lock = TableLock::Write(table_id).into();
375 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
376
377 let cross_refs = cross_refs_grouped
378 .get(&table_id)
379 .cloned()
380 .unwrap_or_default();
381 let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
382
383 let current = repart_mgr
384 .get_with_raw_bytes(table_id)
385 .await
386 .context(KvBackendSnafu)?;
387
388 let mut new_value = current
389 .as_ref()
390 .map(|v| (**v).clone())
391 .unwrap_or_else(TableRepartValue::new);
392
393 let batch_src_regions: HashSet<RegionId> = self
396 .data
397 .regions
398 .iter()
399 .copied()
400 .filter(|r| r.table_id() == table_id)
401 .collect();
402
403 let all_src_regions: HashSet<RegionId> = batch_src_regions;
407
408 for src_region in all_src_regions {
409 let cross_dst = cross_refs.get(&src_region);
410 let has_tmp_ref = tmp_refs.contains(&src_region);
411
412 if let Some(dst_regions) = cross_dst {
413 let mut set = BTreeSet::new();
414 set.extend(dst_regions.iter().copied());
415 new_value.src_to_dst.insert(src_region, set);
416 } else if has_tmp_ref {
417 new_value.src_to_dst.insert(src_region, BTreeSet::new());
420 } else {
421 new_value.src_to_dst.remove(&src_region);
422 }
423 }
424
425 if new_value.src_to_dst.is_empty() && current.is_none() {
427 continue;
428 }
429
430 repart_mgr
431 .upsert_value(table_id, current, &new_value)
432 .await
433 .context(KvBackendSnafu)?;
434 }
435
436 Ok(())
437 }
438
439 async fn discover_route_for_regions(
441 &self,
442 regions: &[RegionId],
443 ) -> Result<(Region2Peers, Peer2Regions)> {
444 let mut region_to_peer = HashMap::new();
445 let mut peer_to_regions = HashMap::new();
446
447 let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
449 for region_id in regions {
450 let table_id = region_id.table_id();
451 table_to_regions
452 .entry(table_id)
453 .or_default()
454 .push(*region_id);
455 }
456
457 for (table_id, table_regions) in table_to_regions {
459 match self.get_table_route(table_id).await {
460 Ok((_phy_table_id, table_route)) => {
461 table_route_to_region(
462 &table_route,
463 &table_regions,
464 &mut region_to_peer,
465 &mut peer_to_regions,
466 );
467 }
468 Err(e) => {
469 warn!(
472 "Failed to get table route for table {}: {}, skipping its regions",
473 table_id, e
474 );
475 continue;
476 }
477 }
478 }
479
480 Ok((region_to_peer, peer_to_regions))
481 }
482
483 async fn set_routes_and_related_regions(&mut self) -> Result<()> {
485 let related_regions = self.find_related_regions(&self.data.regions).await?;
486
487 self.data.related_regions = related_regions.clone();
488
489 let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
492
493 regions_set.extend(related_regions.keys().cloned());
494 regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
495
496 let regions_to_discover = regions_set.into_iter().collect_vec();
497
498 let (mut region_to_peer, _) = self
499 .discover_route_for_regions(®ions_to_discover)
500 .await?;
501
502 for (region_id, route) in &self.data.region_routes_override {
503 region_to_peer
504 .entry(*region_id)
505 .or_insert_with(|| route.clone());
506 }
507
508 self.data.region_routes = region_to_peer;
509
510 Ok(())
511 }
512
513 async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
515 self.set_routes_and_related_regions().await?;
516
517 let query_regions = &self.data.regions;
518 let related_regions = &self.data.related_regions;
519 let region_routes = &self.data.region_routes;
520 let timeout = self.data.timeout;
521 let dropped_regions = self
522 .data
523 .region_routes_override
524 .keys()
525 .collect::<HashSet<_>>();
526
527 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
529
530 for region_id in query_regions {
531 if dropped_regions.contains(region_id) {
532 continue;
533 }
534 if let Some((leader, followers)) = region_routes.get(region_id) {
535 datanode2query_regions
536 .entry(leader.clone())
537 .or_default()
538 .push(*region_id);
539 for follower in followers {
541 datanode2query_regions
542 .entry(follower.clone())
543 .or_default()
544 .push(*region_id);
545 }
546 } else {
547 return error::UnexpectedSnafu {
548 violated: format!(
549 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
550 ),
551 }
552 .fail();
553 }
554 }
555
556 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
557 HashMap::new();
558 for (src_region, dst_regions) in related_regions {
559 for dst_region in dst_regions {
560 if let Some((leader, _followers)) = region_routes.get(dst_region) {
561 datanode2related_regions
562 .entry(leader.clone())
563 .or_default()
564 .entry(*src_region)
565 .or_default()
566 .insert(*dst_region);
567 } }
569 }
570
571 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
573 let mut all_manifest_versions = HashMap::new();
574 let mut all_cross_region_refs = HashMap::new();
575
576 let mut peers = HashSet::new();
577 peers.extend(datanode2query_regions.keys().cloned());
578 peers.extend(datanode2related_regions.keys().cloned());
579
580 for peer in peers {
581 let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
582 let related_regions_for_peer =
583 datanode2related_regions.remove(&peer).unwrap_or_default();
584
585 if regions.is_empty() && related_regions_for_peer.is_empty() {
586 continue;
587 }
588
589 let instruction = GetFileRefs {
590 query_regions: regions.clone(),
591 related_regions: related_regions_for_peer.clone(),
592 };
593
594 let reply = send_get_file_refs(
595 &self.mailbox,
596 &self.data.server_addr,
597 &peer,
598 instruction,
599 timeout,
600 )
601 .await?;
602 debug!(
603 "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
604 peer, regions, related_regions_for_peer, reply
605 );
606
607 if !reply.success {
608 return error::UnexpectedSnafu {
609 violated: format!(
610 "Failed to get file references from datanode {}: {:?}",
611 peer, reply.error
612 ),
613 }
614 .fail();
615 }
616
617 for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
619 all_file_refs
620 .entry(region_id)
621 .or_default()
622 .extend(file_refs);
623 }
624
625 for (region_id, version) in reply.file_refs_manifest.manifest_version {
627 let entry = all_manifest_versions.entry(region_id).or_insert(version);
628 *entry = (*entry).min(version);
629 }
630
631 for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
632 let entry = all_cross_region_refs
633 .entry(region_id)
634 .or_insert_with(HashSet::new);
635 entry.extend(related_region_ids);
636 }
637 }
638
639 Ok(FileRefsManifest {
640 file_refs: all_file_refs,
641 manifest_version: all_manifest_versions,
642 cross_region_refs: all_cross_region_refs,
643 })
644 }
645
646 async fn send_gc_instructions(&self) -> Result<GcReport> {
649 let regions = &self.data.regions;
650 let region_routes = &self.data.region_routes;
651 let file_refs = &self.data.file_refs;
652 let timeout = self.data.timeout;
653
654 let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
656 let mut all_report = GcReport::default();
657
658 for region_id in regions {
659 if let Some((leader, _followers)) = region_routes.get(region_id) {
660 datanode2regions
661 .entry(leader.clone())
662 .or_default()
663 .push(*region_id);
664 } else {
665 return error::UnexpectedSnafu {
666 violated: format!(
667 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
668 ),
669 }
670 .fail();
671 }
672 }
673
674 let mut all_need_retry = HashSet::new();
675 for (peer, regions_for_peer) in datanode2regions {
677 let gc_regions = GcRegions {
678 regions: regions_for_peer.clone(),
679 file_refs_manifest: file_refs.clone(),
681 full_file_listing: self.data.full_file_listing,
682 };
683
684 let report = send_gc_regions(
685 &self.mailbox,
686 &peer,
687 gc_regions,
688 self.data.server_addr.as_str(),
689 timeout,
690 "Batch GC",
691 )
692 .await?;
693
694 let success = report.deleted_files.keys().collect_vec();
695 let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
696
697 if need_retry.is_empty() {
698 info!(
699 "GC report from datanode {}: successfully deleted files for regions {:?}",
700 peer, success
701 );
702 } else {
703 warn!(
704 "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
705 peer, success, need_retry
706 );
707 }
708 all_need_retry.extend(report.need_retry_regions.clone());
709 all_report.merge(report);
710 }
711
712 if !all_need_retry.is_empty() {
713 warn!("Regions need retry after batch GC: {:?}", all_need_retry);
714 }
715
716 Ok(all_report)
717 }
718}
719
720#[async_trait::async_trait]
721impl Procedure for BatchGcProcedure {
722 fn type_name(&self) -> &str {
723 Self::TYPE_NAME
724 }
725
726 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
727 match self.data.state {
728 State::Start => {
729 self.data.state = State::Acquiring;
731 Ok(Status::executing(false))
732 }
733 State::Acquiring => {
734 match self.get_file_references().await {
736 Ok(file_refs) => {
737 self.data.file_refs = file_refs;
738 self.data.state = State::Gcing;
739 Ok(Status::executing(false))
740 }
741 Err(e) => {
742 error!(e; "Failed to get file references");
743 Err(ProcedureError::external(e))
744 }
745 }
746 }
747 State::Gcing => {
748 match self.send_gc_instructions().await {
751 Ok(report) => {
752 self.data.state = State::UpdateRepartition;
753 self.data.gc_report = Some(report);
754 Ok(Status::executing(false))
755 }
756 Err(e) => {
757 error!(e; "Failed to send GC instructions");
758 Err(ProcedureError::external(e))
759 }
760 }
761 }
762 State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
763 Ok(()) => {
764 info!(
765 "Cleanup region repartition info completed successfully for regions {:?}",
766 self.data.regions
767 );
768 info!(
769 "Batch GC completed successfully for regions {:?}",
770 self.data.regions
771 );
772 let Some(report) = self.data.gc_report.take() else {
773 return common_procedure::error::UnexpectedSnafu {
774 err_msg: "GC report should be present after GC completion".to_string(),
775 }
776 .fail();
777 };
778 info!("GC report: {:?}", report);
779 Ok(Status::done_with_output(report))
780 }
781 Err(e) => {
782 error!(e; "Failed to cleanup region repartition info");
783 Err(ProcedureError::external(e))
784 }
785 },
786 }
787 }
788
789 fn dump(&self) -> ProcedureResult<String> {
790 serde_json::to_string(&self.data).context(ToJsonSnafu)
791 }
792
793 fn lock_key(&self) -> LockKey {
796 let lock_key: Vec<_> = self
797 .data
798 .regions
799 .iter()
800 .sorted() .map(|id| RegionLock::Read(*id).into())
802 .collect();
803
804 LockKey::new(lock_key)
805 }
806}