1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::lock_key::RegionLock;
23use common_meta::peer::Peer;
24use common_procedure::error::ToJsonSnafu;
25use common_procedure::{
26 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
27 Result as ProcedureResult, Status,
28};
29use common_telemetry::{debug, error, info, warn};
30use itertools::Itertools as _;
31use serde::{Deserialize, Serialize};
32use snafu::ResultExt as _;
33use store_api::storage::{FileRefsManifest, GcReport, RegionId};
34
35use crate::error::{self, Result, SerializeToJsonSnafu};
36use crate::gc::Region2Peers;
37use crate::handler::HeartbeatMailbox;
38use crate::service::mailbox::{Channel, MailboxRef};
39
40async fn send_get_file_refs(
42 mailbox: &MailboxRef,
43 server_addr: &str,
44 peer: &Peer,
45 instruction: GetFileRefs,
46 timeout: Duration,
47) -> Result<GetFileRefsReply> {
48 let instruction = instruction::Instruction::GetFileRefs(instruction);
49 let msg = MailboxMessage::json_message(
50 &format!("Get file references: {}", instruction),
51 &format!("Metasrv@{}", server_addr),
52 &format!("Datanode-{}@{}", peer.id, peer.addr),
53 common_time::util::current_time_millis(),
54 &instruction,
55 )
56 .with_context(|_| SerializeToJsonSnafu {
57 input: instruction.to_string(),
58 })?;
59
60 let mailbox_rx = mailbox
61 .send(&Channel::Datanode(peer.id), msg, timeout)
62 .await?;
63
64 let reply = match mailbox_rx.await {
65 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
66 Err(e) => {
67 error!(
68 "Failed to receive reply from datanode {} for GetFileRefs: {}",
69 peer, e
70 );
71 return Err(e);
72 }
73 };
74
75 let InstructionReply::GetFileRefs(reply) = reply else {
76 return error::UnexpectedInstructionReplySnafu {
77 mailbox_message: format!("{:?}", reply),
78 reason: "Unexpected reply of the GetFileRefs instruction",
79 }
80 .fail();
81 };
82
83 Ok(reply)
84}
85
86async fn send_gc_regions(
88 mailbox: &MailboxRef,
89 peer: &Peer,
90 gc_regions: GcRegions,
91 server_addr: &str,
92 timeout: Duration,
93 description: &str,
94) -> Result<GcReport> {
95 let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
96 let msg = MailboxMessage::json_message(
97 &format!("{}: {}", description, instruction),
98 &format!("Metasrv@{}", server_addr),
99 &format!("Datanode-{}@{}", peer.id, peer.addr),
100 common_time::util::current_time_millis(),
101 &instruction,
102 )
103 .with_context(|_| SerializeToJsonSnafu {
104 input: instruction.to_string(),
105 })?;
106
107 let mailbox_rx = mailbox
108 .send(&Channel::Datanode(peer.id), msg, timeout)
109 .await?;
110
111 let reply = match mailbox_rx.await {
112 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
113 Err(e) => {
114 error!(
115 "Failed to receive reply from datanode {} for {}: {}",
116 peer, description, e
117 );
118 return Err(e);
119 }
120 };
121
122 let InstructionReply::GcRegions(reply) = reply else {
123 return error::UnexpectedInstructionReplySnafu {
124 mailbox_message: format!("{:?}", reply),
125 reason: "Unexpected reply of the GcRegions instruction",
126 }
127 .fail();
128 };
129
130 let res = reply.result;
131 match res {
132 Ok(report) => Ok(report),
133 Err(e) => {
134 error!(
135 "Datanode {} reported error during GC for regions {:?}: {}",
136 peer, gc_regions, e
137 );
138 error::UnexpectedSnafu {
139 violated: format!(
140 "Datanode {} reported error during GC for regions {:?}: {}",
141 peer, gc_regions, e
142 ),
143 }
144 .fail()
145 }
146 }
147}
148
149pub struct GcRegionProcedure {
151 mailbox: MailboxRef,
152 data: GcRegionData,
153}
154
155#[derive(Serialize, Deserialize)]
156pub struct GcRegionData {
157 server_addr: String,
158 peer: Peer,
159 gc_regions: GcRegions,
160 description: String,
161 timeout: Duration,
162}
163
164impl GcRegionProcedure {
165 pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
166
167 pub fn new(
168 mailbox: MailboxRef,
169 server_addr: String,
170 peer: Peer,
171 gc_regions: GcRegions,
172 description: String,
173 timeout: Duration,
174 ) -> Self {
175 Self {
176 mailbox,
177 data: GcRegionData {
178 peer,
179 server_addr,
180 gc_regions,
181 description,
182 timeout,
183 },
184 }
185 }
186
187 async fn send_gc_instr(&self) -> Result<GcReport> {
188 send_gc_regions(
189 &self.mailbox,
190 &self.data.peer,
191 self.data.gc_regions.clone(),
192 &self.data.server_addr,
193 self.data.timeout,
194 &self.data.description,
195 )
196 .await
197 }
198
199 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
200 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
201 error::UnexpectedSnafu {
202 violated: format!(
203 "Failed to downcast procedure result to GcReport, got {:?}",
204 std::any::type_name_of_val(&res.as_ref())
205 ),
206 }
207 .build()
208 })
209 }
210}
211
212#[async_trait::async_trait]
213impl Procedure for GcRegionProcedure {
214 fn type_name(&self) -> &str {
215 Self::TYPE_NAME
216 }
217
218 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
219 let reply = self
222 .send_gc_instr()
223 .await
224 .map_err(ProcedureError::external)?;
225
226 Ok(Status::done_with_output(reply))
227 }
228
229 fn dump(&self) -> ProcedureResult<String> {
230 serde_json::to_string(&self.data).context(ToJsonSnafu)
231 }
232
233 fn lock_key(&self) -> LockKey {
239 let lock_key: Vec<_> = self
240 .data
241 .gc_regions
242 .regions
243 .iter()
244 .sorted() .map(|id| RegionLock::Read(*id).into())
246 .collect();
247
248 LockKey::new(lock_key)
249 }
250}
251
252pub struct BatchGcProcedure {
255 mailbox: MailboxRef,
256 data: BatchGcData,
257}
258
259#[derive(Serialize, Deserialize)]
260pub struct BatchGcData {
261 state: State,
262 server_addr: String,
263 regions: Vec<RegionId>,
265 full_file_listing: bool,
266 region_routes: Region2Peers,
267 related_regions: HashMap<RegionId, Vec<RegionId>>,
269 file_refs: FileRefsManifest,
271 timeout: Duration,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
276pub enum State {
277 Start,
279 Acquiring,
281 Gcing,
283}
284
285impl BatchGcProcedure {
286 pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
287
288 pub fn new(
289 mailbox: MailboxRef,
290 server_addr: String,
291 regions: Vec<RegionId>,
292 full_file_listing: bool,
293 region_routes: Region2Peers,
294 related_regions: HashMap<RegionId, Vec<RegionId>>,
295 timeout: Duration,
296 ) -> Self {
297 Self {
298 mailbox,
299 data: BatchGcData {
300 state: State::Start,
301 server_addr,
302 regions,
303 full_file_listing,
304 region_routes,
305 related_regions,
306 file_refs: FileRefsManifest::default(),
307 timeout,
308 },
309 }
310 }
311
312 async fn get_file_references(&self) -> Result<FileRefsManifest> {
314 use std::collections::{HashMap, HashSet};
315
316 let query_regions = &self.data.regions;
317 let related_regions = &self.data.related_regions;
318 let region_routes = &self.data.region_routes;
319 let timeout = self.data.timeout;
320
321 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
323
324 for region_id in query_regions {
325 if let Some((leader, followers)) = region_routes.get(region_id) {
326 datanode2query_regions
327 .entry(leader.clone())
328 .or_default()
329 .push(*region_id);
330 for follower in followers {
332 datanode2query_regions
333 .entry(follower.clone())
334 .or_default()
335 .push(*region_id);
336 }
337 } else {
338 return error::UnexpectedSnafu {
339 violated: format!(
340 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
341 ),
342 }
343 .fail();
344 }
345 }
346
347 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
348 HashMap::new();
349 for (related_region, queries) in related_regions {
350 if let Some((leader, _followers)) = region_routes.get(related_region) {
351 datanode2related_regions
352 .entry(leader.clone())
353 .or_default()
354 .insert(*related_region, queries.clone());
355 } }
357
358 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
360 let mut all_manifest_versions = HashMap::new();
361
362 for (peer, regions) in datanode2query_regions {
363 let related_regions_for_peer =
364 datanode2related_regions.remove(&peer).unwrap_or_default();
365
366 let instruction = GetFileRefs {
367 query_regions: regions.clone(),
368 related_regions: related_regions_for_peer,
369 };
370
371 let reply = send_get_file_refs(
372 &self.mailbox,
373 &self.data.server_addr,
374 &peer,
375 instruction,
376 timeout,
377 )
378 .await?;
379
380 if !reply.success {
381 return error::UnexpectedSnafu {
382 violated: format!(
383 "Failed to get file references from datanode {}: {:?}",
384 peer, reply.error
385 ),
386 }
387 .fail();
388 }
389
390 for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
392 all_file_refs
393 .entry(region_id)
394 .or_default()
395 .extend(file_refs);
396 }
397
398 for (region_id, version) in reply.file_refs_manifest.manifest_version {
400 let entry = all_manifest_versions.entry(region_id).or_insert(version);
401 *entry = (*entry).min(version);
402 }
403 }
404
405 Ok(FileRefsManifest {
406 file_refs: all_file_refs,
407 manifest_version: all_manifest_versions,
408 })
409 }
410
411 async fn send_gc_instructions(&self) -> Result<Vec<RegionId>> {
414 let regions = &self.data.regions;
415 let region_routes = &self.data.region_routes;
416 let file_refs = &self.data.file_refs;
417 let timeout = self.data.timeout;
418
419 let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
421
422 for region_id in regions {
423 if let Some((leader, _followers)) = region_routes.get(region_id) {
424 datanode2regions
425 .entry(leader.clone())
426 .or_default()
427 .push(*region_id);
428 } else {
429 return error::UnexpectedSnafu {
430 violated: format!(
431 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
432 ),
433 }
434 .fail();
435 }
436 }
437
438 let mut all_need_retry = HashSet::new();
439 for (peer, regions_for_peer) in datanode2regions {
441 let gc_regions = GcRegions {
442 regions: regions_for_peer.clone(),
443 file_refs_manifest: file_refs.clone(),
445 full_file_listing: self.data.full_file_listing,
446 };
447
448 let report = send_gc_regions(
449 &self.mailbox,
450 &peer,
451 gc_regions,
452 self.data.server_addr.as_str(),
453 timeout,
454 "Batch GC",
455 )
456 .await?;
457
458 let success = report.deleted_files.keys().collect_vec();
459 let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
460
461 if need_retry.is_empty() {
462 info!(
463 "GC report from datanode {}: successfully deleted files for regions {:?}",
464 peer, success
465 );
466 } else {
467 warn!(
468 "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
469 peer, success, need_retry
470 );
471 }
472 all_need_retry.extend(report.need_retry_regions);
473 }
474
475 Ok(all_need_retry.into_iter().collect())
476 }
477}
478
479#[async_trait::async_trait]
480impl Procedure for BatchGcProcedure {
481 fn type_name(&self) -> &str {
482 Self::TYPE_NAME
483 }
484
485 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
486 match self.data.state {
487 State::Start => {
488 self.data.state = State::Acquiring;
490 Ok(Status::executing(false))
491 }
492 State::Acquiring => {
493 match self.get_file_references().await {
495 Ok(file_refs) => {
496 self.data.file_refs = file_refs;
497 self.data.state = State::Gcing;
498 Ok(Status::executing(false))
499 }
500 Err(e) => {
501 error!("Failed to get file references: {}", e);
502 Err(ProcedureError::external(e))
503 }
504 }
505 }
506 State::Gcing => {
507 match self.send_gc_instructions().await {
510 Ok(_) => {
511 info!(
512 "Batch GC completed successfully for regions {:?}",
513 self.data.regions
514 );
515 Ok(Status::done())
516 }
517 Err(e) => {
518 error!("Failed to send GC instructions: {}", e);
519 Err(ProcedureError::external(e))
520 }
521 }
522 }
523 }
524 }
525
526 fn dump(&self) -> ProcedureResult<String> {
527 serde_json::to_string(&self.data).context(ToJsonSnafu)
528 }
529
530 fn lock_key(&self) -> LockKey {
533 let lock_key: Vec<_> = self
534 .data
535 .regions
536 .iter()
537 .sorted() .map(|id| RegionLock::Read(*id).into())
539 .collect();
540
541 LockKey::new(lock_key)
542 }
543}