1use std::any::Any;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{self, GcRegions, InstructionReply};
21use common_meta::lock_key::RegionLock;
22use common_meta::peer::Peer;
23use common_procedure::error::ToJsonSnafu;
24use common_procedure::{
25 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
26 Result as ProcedureResult, Status,
27};
28use common_telemetry::error;
29use itertools::Itertools as _;
30use serde::{Deserialize, Serialize};
31use snafu::ResultExt as _;
32use store_api::storage::GcReport;
33
34use crate::error::{self, Result, SerializeToJsonSnafu};
35use crate::handler::HeartbeatMailbox;
36use crate::service::mailbox::{Channel, MailboxRef};
37
38pub struct GcRegionProcedure {
40 mailbox: MailboxRef,
41 data: GcRegionData,
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct GcRegionData {
46 server_addr: String,
47 peer: Peer,
48 gc_regions: GcRegions,
49 description: String,
50 timeout: Duration,
51}
52
53impl GcRegionProcedure {
54 pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
55
56 pub fn new(
57 mailbox: MailboxRef,
58 server_addr: String,
59 peer: Peer,
60 gc_regions: GcRegions,
61 description: String,
62 timeout: Duration,
63 ) -> Self {
64 Self {
65 mailbox,
66 data: GcRegionData {
67 peer,
68 server_addr,
69 gc_regions,
70 description,
71 timeout,
72 },
73 }
74 }
75
76 async fn send_gc_instr(&self) -> Result<GcReport> {
77 let peer = &self.data.peer;
78 let instruction = instruction::Instruction::GcRegions(self.data.gc_regions.clone());
79 let msg = MailboxMessage::json_message(
80 &format!("{}: {}", self.data.description, instruction),
81 &format!("Metasrv@{}", self.data.server_addr),
82 &format!("Datanode-{}@{}", peer.id, peer.addr),
83 common_time::util::current_time_millis(),
84 &instruction,
85 )
86 .with_context(|_| SerializeToJsonSnafu {
87 input: instruction.to_string(),
88 })?;
89
90 let mailbox_rx = self
91 .mailbox
92 .send(&Channel::Datanode(peer.id), msg, self.data.timeout)
93 .await?;
94
95 let reply = match mailbox_rx.await {
96 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
97 Err(e) => {
98 error!(
99 "Failed to receive reply from datanode {} for {}: {}",
100 peer, self.data.description, e
101 );
102 return Err(e);
103 }
104 };
105
106 let InstructionReply::GcRegions(reply) = reply else {
107 return error::UnexpectedInstructionReplySnafu {
108 mailbox_message: format!("{:?}", reply),
109 reason: "Unexpected reply of the GcRegions instruction",
110 }
111 .fail();
112 };
113
114 let res = reply.result;
115 match res {
116 Ok(report) => Ok(report),
117 Err(e) => {
118 error!(
119 "Datanode {} reported error during GC for regions {:?}: {}",
120 peer, self.data.gc_regions, e
121 );
122 Err(error::UnexpectedSnafu {
123 violated: format!(
124 "Datanode {} reported error during GC for regions {:?}: {}",
125 peer, self.data.gc_regions, e
126 ),
127 }
128 .fail()?)
129 }
130 }
131 }
132
133 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
134 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
135 error::UnexpectedSnafu {
136 violated: format!(
137 "Failed to downcast procedure result to GcReport, got {:?}",
138 std::any::type_name_of_val(&res.as_ref())
139 ),
140 }
141 .build()
142 })
143 }
144}
145
146#[async_trait::async_trait]
147impl Procedure for GcRegionProcedure {
148 fn type_name(&self) -> &str {
149 Self::TYPE_NAME
150 }
151
152 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
153 let reply = self
156 .send_gc_instr()
157 .await
158 .map_err(ProcedureError::external)?;
159
160 Ok(Status::done_with_output(reply))
161 }
162
163 fn dump(&self) -> ProcedureResult<String> {
164 serde_json::to_string(&self.data).context(ToJsonSnafu)
165 }
166
167 fn lock_key(&self) -> LockKey {
173 let lock_key: Vec<_> = self
174 .data
175 .gc_regions
176 .regions
177 .iter()
178 .sorted() .map(|id| RegionLock::Read(*id).into())
180 .collect();
181
182 LockKey::new(lock_key)
183 }
184}