1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::datanode::RegionStat;
20use common_meta::instruction::{
21 GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
22};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::peer::Peer;
26use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
27use common_telemetry::{debug, error, warn};
28use snafu::{OptionExt as _, ResultExt as _};
29use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
30use table::metadata::TableId;
31
32use crate::cluster::MetaPeerClientRef;
33use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
34use crate::gc::Region2Peers;
35use crate::gc::procedure::GcRegionProcedure;
36use crate::handler::HeartbeatMailbox;
37use crate::service::mailbox::{Channel, MailboxRef};
38
39#[async_trait::async_trait]
40pub(crate) trait SchedulerCtx: Send + Sync {
41 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
42
43 async fn get_table_route(
44 &self,
45 table_id: TableId,
46 ) -> Result<(TableId, PhysicalTableRouteValue)>;
47
48 async fn get_file_references(
49 &self,
50 query_regions: &[RegionId],
51 related_regions: HashMap<RegionId, Vec<RegionId>>,
52 region_routes: &Region2Peers,
53 timeout: Duration,
54 ) -> Result<FileRefsManifest>;
55
56 async fn gc_regions(
57 &self,
58 peer: Peer,
59 region_ids: &[RegionId],
60 file_refs_manifest: &FileRefsManifest,
61 full_file_listing: bool,
62 timeout: Duration,
63 ) -> Result<GcReport>;
64}
65
66pub(crate) struct DefaultGcSchedulerCtx {
67 pub(crate) table_metadata_manager: TableMetadataManagerRef,
69 pub(crate) procedure_manager: ProcedureManagerRef,
71 pub(crate) meta_peer_client: MetaPeerClientRef,
73 pub(crate) mailbox: MailboxRef,
75 pub(crate) server_addr: String,
77}
78
79impl DefaultGcSchedulerCtx {
80 pub fn try_new(
81 table_metadata_manager: TableMetadataManagerRef,
82 procedure_manager: ProcedureManagerRef,
83 meta_peer_client: MetaPeerClientRef,
84 mailbox: MailboxRef,
85 server_addr: String,
86 ) -> Result<Self> {
87 procedure_manager
90 .register_loader(
91 GcRegionProcedure::TYPE_NAME,
92 Box::new(move |json| {
93 common_procedure::error::ProcedureLoaderNotImplementedSnafu {
94 type_name: GcRegionProcedure::TYPE_NAME.to_string(),
95 reason:
96 "GC procedure should be retried by scheduler, not reloaded from storage"
97 .to_string(),
98 }
99 .fail()
100 }),
101 )
102 .context(error::RegisterProcedureLoaderSnafu {
103 type_name: GcRegionProcedure::TYPE_NAME,
104 });
105
106 Ok(Self {
107 table_metadata_manager,
108 procedure_manager,
109 meta_peer_client,
110 mailbox,
111 server_addr,
112 })
113 }
114}
115
116#[async_trait::async_trait]
117impl SchedulerCtx for DefaultGcSchedulerCtx {
118 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
119 let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
120 let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
121 for (_dn_id, stats) in dn_stats {
122 let mut stats = stats.stats;
123
124 let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
125 continue;
126 };
127
128 for region_stat in latest_stat.region_stats {
129 table_to_region_stats
130 .entry(region_stat.id.table_id())
131 .or_default()
132 .push(region_stat);
133 }
134 }
135 Ok(table_to_region_stats)
136 }
137
138 async fn get_table_route(
139 &self,
140 table_id: TableId,
141 ) -> Result<(TableId, PhysicalTableRouteValue)> {
142 self.table_metadata_manager
143 .table_route_manager()
144 .get_physical_table_route(table_id)
145 .await
146 .context(TableMetadataManagerSnafu)
147 }
148
149 async fn gc_regions(
150 &self,
151 peer: Peer,
152 region_ids: &[RegionId],
153 file_refs_manifest: &FileRefsManifest,
154 full_file_listing: bool,
155 timeout: Duration,
156 ) -> Result<GcReport> {
157 self.gc_regions_inner(
158 peer,
159 region_ids,
160 file_refs_manifest,
161 full_file_listing,
162 timeout,
163 )
164 .await
165 }
166
167 async fn get_file_references(
168 &self,
169 query_regions: &[RegionId],
170 related_regions: HashMap<RegionId, Vec<RegionId>>,
171 region_routes: &Region2Peers,
172 timeout: Duration,
173 ) -> Result<FileRefsManifest> {
174 debug!(
175 "Getting file references for {} regions",
176 query_regions.len()
177 );
178
179 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
181
182 for region_id in query_regions {
183 if let Some((leader, followers)) = region_routes.get(region_id) {
184 datanode2query_regions
185 .entry(leader.clone())
186 .or_default()
187 .push(*region_id);
188 for follower in followers {
190 datanode2query_regions
191 .entry(follower.clone())
192 .or_default()
193 .push(*region_id);
194 }
195 } else {
196 return error::UnexpectedSnafu {
197 violated: format!(
198 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
199 ),
200 }
201 .fail();
202 }
203 }
204 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
205 HashMap::new();
206 for (related_region, queries) in related_regions {
207 if let Some((leader, followers)) = region_routes.get(&related_region) {
208 datanode2related_regions
209 .entry(leader.clone())
210 .or_default()
211 .insert(related_region, queries.clone());
212 } }
214
215 let mut all_file_refs: HashMap<RegionId, HashSet<FileId>> = HashMap::new();
217 let mut all_manifest_versions = HashMap::new();
218
219 for (peer, regions) in datanode2query_regions {
220 let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
221 match self
222 .send_get_file_refs_instruction(&peer, ®ions, related_regions, timeout)
223 .await
224 {
225 Ok(manifest) => {
226 for (region_id, file_refs) in manifest.file_refs {
231 all_file_refs
232 .entry(region_id)
233 .or_default()
234 .extend(file_refs);
235 }
236 for (region_id, version) in manifest.manifest_version {
238 let entry = all_manifest_versions.entry(region_id).or_insert(version);
239 *entry = (*entry).min(version);
240 }
241 }
242 Err(e) => {
243 warn!(
244 "Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
245 peer, e
246 );
247 continue;
249 }
250 }
251 }
252
253 Ok(FileRefsManifest {
254 file_refs: all_file_refs,
255 manifest_version: all_manifest_versions,
256 })
257 }
258}
259
260impl DefaultGcSchedulerCtx {
261 async fn gc_regions_inner(
262 &self,
263 peer: Peer,
264 region_ids: &[RegionId],
265 file_refs_manifest: &FileRefsManifest,
266 full_file_listing: bool,
267 timeout: Duration,
268 ) -> Result<GcReport> {
269 debug!(
270 "Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
271 peer,
272 region_ids.len(),
273 full_file_listing
274 );
275
276 let gc_regions = GcRegions {
277 regions: region_ids.to_vec(),
278 file_refs_manifest: file_refs_manifest.clone(),
279 full_file_listing,
280 };
281 let procedure = GcRegionProcedure::new(
282 self.mailbox.clone(),
283 self.server_addr.clone(),
284 peer,
285 gc_regions,
286 format!("GC for {} regions", region_ids.len()),
287 timeout,
288 );
289 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291 let id = procedure_with_id.id;
292
293 let mut watcher = self
294 .procedure_manager
295 .submit(procedure_with_id)
296 .await
297 .context(error::SubmitProcedureSnafu)?;
298 let res = watcher::wait(&mut watcher)
299 .await
300 .context(error::WaitProcedureSnafu)?
301 .with_context(|| error::UnexpectedSnafu {
302 violated: format!(
303 "GC procedure {id} successfully completed but no result returned"
304 ),
305 })?;
306
307 let gc_report = GcRegionProcedure::cast_result(res)?;
308
309 Ok(gc_report)
310 }
311
312 async fn send_get_file_refs_instruction(
315 &self,
316 peer: &Peer,
317 query_regions: &[RegionId],
318 related_regions: HashMap<RegionId, Vec<RegionId>>,
319 timeout: Duration,
320 ) -> Result<FileRefsManifest> {
321 debug!(
322 "Sending GetFileRefs instruction to datanode {} for {} regions",
323 peer,
324 query_regions.len()
325 );
326
327 let instruction = Instruction::GetFileRefs(GetFileRefs {
328 query_regions: query_regions.to_vec(),
329 related_regions,
330 });
331
332 let reply = self
333 .send_instruction(peer, instruction, "Get file references", timeout)
334 .await?;
335
336 let InstructionReply::GetFileRefs(GetFileRefsReply {
337 file_refs_manifest,
338 success,
339 error,
340 }) = reply
341 else {
342 return error::UnexpectedInstructionReplySnafu {
343 mailbox_message: format!("{:?}", reply),
344 reason: "Unexpected reply of the GetFileRefs instruction",
345 }
346 .fail();
347 };
348
349 if !success {
350 return error::UnexpectedSnafu {
351 violated: format!(
352 "Failed to get file references from datanode {}: {:?}",
353 peer, error
354 ),
355 }
356 .fail();
357 }
358
359 Ok(file_refs_manifest)
360 }
361
362 async fn send_instruction(
363 &self,
364 peer: &Peer,
365 instruction: Instruction,
366 description: &str,
367 timeout: Duration,
368 ) -> Result<InstructionReply> {
369 let msg = MailboxMessage::json_message(
370 &format!("{}: {}", description, instruction),
371 &format!("Metasrv@{}", self.server_addr),
372 &format!("Datanode-{}@{}", peer.id, peer.addr),
373 common_time::util::current_time_millis(),
374 &instruction,
375 )
376 .with_context(|_| error::SerializeToJsonSnafu {
377 input: instruction.to_string(),
378 })?;
379
380 let mailbox_rx = self
381 .mailbox
382 .send(&Channel::Datanode(peer.id), msg, timeout)
383 .await?;
384
385 match mailbox_rx.await {
386 Ok(reply_msg) => {
387 let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
388 Ok(reply)
389 }
390 Err(e) => {
391 error!(
392 "Failed to receive reply from datanode {} for {}: {}",
393 peer, description, e
394 );
395 Err(e)
396 }
397 }
398 }
399}