1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::datanode::RegionStat;
20use common_meta::instruction::{
21 GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
22};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::peer::Peer;
26use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
27use common_telemetry::{debug, error, warn};
28use snafu::{OptionExt as _, ResultExt as _};
29use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
30use table::metadata::TableId;
31
32use crate::cluster::MetaPeerClientRef;
33use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
34use crate::gc::Region2Peers;
35use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure};
36use crate::handler::HeartbeatMailbox;
37use crate::service::mailbox::{Channel, MailboxRef};
38
39#[async_trait::async_trait]
40pub(crate) trait SchedulerCtx: Send + Sync {
41 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
42
43 async fn get_table_route(
44 &self,
45 table_id: TableId,
46 ) -> Result<(TableId, PhysicalTableRouteValue)>;
47
48 async fn get_file_references(
49 &self,
50 query_regions: &[RegionId],
51 related_regions: HashMap<RegionId, Vec<RegionId>>,
52 region_routes: &Region2Peers,
53 timeout: Duration,
54 ) -> Result<FileRefsManifest>;
55
56 async fn gc_regions(
57 &self,
58 peer: Peer,
59 region_ids: &[RegionId],
60 file_refs_manifest: &FileRefsManifest,
61 full_file_listing: bool,
62 timeout: Duration,
63 ) -> Result<GcReport>;
64}
65
66pub(crate) struct DefaultGcSchedulerCtx {
67 pub(crate) table_metadata_manager: TableMetadataManagerRef,
69 pub(crate) procedure_manager: ProcedureManagerRef,
71 pub(crate) meta_peer_client: MetaPeerClientRef,
73 pub(crate) mailbox: MailboxRef,
75 pub(crate) server_addr: String,
77}
78
79impl DefaultGcSchedulerCtx {
80 pub fn try_new(
81 table_metadata_manager: TableMetadataManagerRef,
82 procedure_manager: ProcedureManagerRef,
83 meta_peer_client: MetaPeerClientRef,
84 mailbox: MailboxRef,
85 server_addr: String,
86 ) -> Result<Self> {
87 Ok(Self {
88 table_metadata_manager,
89 procedure_manager,
90 meta_peer_client,
91 mailbox,
92 server_addr,
93 })
94 }
95}
96
97#[async_trait::async_trait]
98impl SchedulerCtx for DefaultGcSchedulerCtx {
99 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
100 let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
101 let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
102 for (_dn_id, stats) in dn_stats {
103 let mut stats = stats.stats;
104
105 let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
106 continue;
107 };
108
109 for region_stat in latest_stat.region_stats {
110 table_to_region_stats
111 .entry(region_stat.id.table_id())
112 .or_default()
113 .push(region_stat);
114 }
115 }
116 Ok(table_to_region_stats)
117 }
118
119 async fn get_table_route(
120 &self,
121 table_id: TableId,
122 ) -> Result<(TableId, PhysicalTableRouteValue)> {
123 self.table_metadata_manager
124 .table_route_manager()
125 .get_physical_table_route(table_id)
126 .await
127 .context(TableMetadataManagerSnafu)
128 }
129
130 async fn gc_regions(
131 &self,
132 peer: Peer,
133 region_ids: &[RegionId],
134 file_refs_manifest: &FileRefsManifest,
135 full_file_listing: bool,
136 timeout: Duration,
137 ) -> Result<GcReport> {
138 self.gc_regions_inner(
139 peer,
140 region_ids,
141 file_refs_manifest,
142 full_file_listing,
143 timeout,
144 )
145 .await
146 }
147
148 async fn get_file_references(
149 &self,
150 query_regions: &[RegionId],
151 related_regions: HashMap<RegionId, Vec<RegionId>>,
152 region_routes: &Region2Peers,
153 timeout: Duration,
154 ) -> Result<FileRefsManifest> {
155 debug!(
156 "Getting file references for {} regions",
157 query_regions.len()
158 );
159
160 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
162
163 for region_id in query_regions {
164 if let Some((leader, followers)) = region_routes.get(region_id) {
165 datanode2query_regions
166 .entry(leader.clone())
167 .or_default()
168 .push(*region_id);
169 for follower in followers {
171 datanode2query_regions
172 .entry(follower.clone())
173 .or_default()
174 .push(*region_id);
175 }
176 } else {
177 return error::UnexpectedSnafu {
178 violated: format!(
179 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
180 ),
181 }
182 .fail();
183 }
184 }
185 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
186 HashMap::new();
187 for (related_region, queries) in related_regions {
188 if let Some((leader, followers)) = region_routes.get(&related_region) {
189 datanode2related_regions
190 .entry(leader.clone())
191 .or_default()
192 .insert(related_region, queries.clone());
193 } }
195
196 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
198 let mut all_manifest_versions = HashMap::new();
199
200 for (peer, regions) in datanode2query_regions {
201 let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
202 match self
203 .send_get_file_refs_instruction(&peer, ®ions, related_regions, timeout)
204 .await
205 {
206 Ok(manifest) => {
207 for (region_id, file_refs) in manifest.file_refs {
212 all_file_refs
213 .entry(region_id)
214 .or_default()
215 .extend(file_refs);
216 }
217 for (region_id, version) in manifest.manifest_version {
219 let entry = all_manifest_versions.entry(region_id).or_insert(version);
220 *entry = (*entry).min(version);
221 }
222 }
223 Err(e) => {
224 warn!(
225 "Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
226 peer, e
227 );
228 continue;
230 }
231 }
232 }
233
234 Ok(FileRefsManifest {
235 file_refs: all_file_refs,
236 manifest_version: all_manifest_versions,
237 })
238 }
239}
240
241impl DefaultGcSchedulerCtx {
242 async fn gc_regions_inner(
243 &self,
244 peer: Peer,
245 region_ids: &[RegionId],
246 file_refs_manifest: &FileRefsManifest,
247 full_file_listing: bool,
248 timeout: Duration,
249 ) -> Result<GcReport> {
250 debug!(
251 "Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
252 peer,
253 region_ids.len(),
254 full_file_listing
255 );
256
257 let gc_regions = GcRegions {
258 regions: region_ids.to_vec(),
259 file_refs_manifest: file_refs_manifest.clone(),
260 full_file_listing,
261 };
262 let procedure = GcRegionProcedure::new(
263 self.mailbox.clone(),
264 self.server_addr.clone(),
265 peer,
266 gc_regions,
267 format!("GC for {} regions", region_ids.len()),
268 timeout,
269 );
270 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
271
272 let id = procedure_with_id.id;
273
274 let mut watcher = self
275 .procedure_manager
276 .submit(procedure_with_id)
277 .await
278 .context(error::SubmitProcedureSnafu)?;
279 let res = watcher::wait(&mut watcher)
280 .await
281 .context(error::WaitProcedureSnafu)?
282 .with_context(|| error::UnexpectedSnafu {
283 violated: format!(
284 "GC procedure {id} successfully completed but no result returned"
285 ),
286 })?;
287
288 let gc_report = GcRegionProcedure::cast_result(res)?;
289
290 Ok(gc_report)
291 }
292
293 async fn send_get_file_refs_instruction(
296 &self,
297 peer: &Peer,
298 query_regions: &[RegionId],
299 related_regions: HashMap<RegionId, Vec<RegionId>>,
300 timeout: Duration,
301 ) -> Result<FileRefsManifest> {
302 debug!(
303 "Sending GetFileRefs instruction to datanode {} for {} regions",
304 peer,
305 query_regions.len()
306 );
307
308 let instruction = Instruction::GetFileRefs(GetFileRefs {
309 query_regions: query_regions.to_vec(),
310 related_regions,
311 });
312
313 let reply = self
314 .send_instruction(peer, instruction, "Get file references", timeout)
315 .await?;
316
317 let InstructionReply::GetFileRefs(GetFileRefsReply {
318 file_refs_manifest,
319 success,
320 error,
321 }) = reply
322 else {
323 return error::UnexpectedInstructionReplySnafu {
324 mailbox_message: format!("{:?}", reply),
325 reason: "Unexpected reply of the GetFileRefs instruction",
326 }
327 .fail();
328 };
329
330 if !success {
331 return error::UnexpectedSnafu {
332 violated: format!(
333 "Failed to get file references from datanode {}: {:?}",
334 peer, error
335 ),
336 }
337 .fail();
338 }
339
340 Ok(file_refs_manifest)
341 }
342
343 async fn send_instruction(
344 &self,
345 peer: &Peer,
346 instruction: Instruction,
347 description: &str,
348 timeout: Duration,
349 ) -> Result<InstructionReply> {
350 let msg = MailboxMessage::json_message(
351 &format!("{}: {}", description, instruction),
352 &format!("Metasrv@{}", self.server_addr),
353 &format!("Datanode-{}@{}", peer.id, peer.addr),
354 common_time::util::current_time_millis(),
355 &instruction,
356 )
357 .with_context(|_| error::SerializeToJsonSnafu {
358 input: instruction.to_string(),
359 })?;
360
361 let mailbox_rx = self
362 .mailbox
363 .send(&Channel::Datanode(peer.id), msg, timeout)
364 .await?;
365
366 match mailbox_rx.await {
367 Ok(reply_msg) => {
368 let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
369 Ok(reply)
370 }
371 Err(e) => {
372 error!(
373 "Failed to receive reply from datanode {} for {}: {}",
374 peer, description, e
375 );
376 Err(e)
377 }
378 }
379 }
380}