1use std::collections::HashMap;
16use std::time::Duration;
17
18use common_meta::datanode::RegionStat;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_repart::TableRepartValue;
21use common_meta::key::table_route::PhysicalTableRouteValue;
22use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
23use common_telemetry::debug;
24use snafu::{OptionExt as _, ResultExt as _};
25use store_api::storage::{GcReport, RegionId};
26use table::metadata::TableId;
27
28use crate::cluster::MetaPeerClientRef;
29use crate::error::{self, Result, TableMetadataManagerSnafu};
30use crate::gc::Region2Peers;
31use crate::gc::procedure::BatchGcProcedure;
32use crate::service::mailbox::MailboxRef;
33
34#[async_trait::async_trait]
35pub(crate) trait SchedulerCtx: Send + Sync {
36 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
37
38 async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>>;
39
40 async fn get_table_route(
41 &self,
42 table_id: TableId,
43 ) -> Result<(TableId, PhysicalTableRouteValue)>;
44
45 async fn batch_get_table_route(
46 &self,
47 table_ids: &[TableId],
48 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>>;
49
50 async fn gc_regions(
51 &self,
52 region_ids: &[RegionId],
53 full_file_listing: bool,
54 timeout: Duration,
55 region_routes_override: Region2Peers,
56 ) -> Result<GcReport>;
57}
58
59pub(crate) struct DefaultGcSchedulerCtx {
60 pub(crate) table_metadata_manager: TableMetadataManagerRef,
62 pub(crate) procedure_manager: ProcedureManagerRef,
64 pub(crate) meta_peer_client: MetaPeerClientRef,
66 pub(crate) mailbox: MailboxRef,
68 pub(crate) server_addr: String,
70}
71
72impl DefaultGcSchedulerCtx {
73 pub fn try_new(
74 table_metadata_manager: TableMetadataManagerRef,
75 procedure_manager: ProcedureManagerRef,
76 meta_peer_client: MetaPeerClientRef,
77 mailbox: MailboxRef,
78 server_addr: String,
79 ) -> Result<Self> {
80 Ok(Self {
81 table_metadata_manager,
82 procedure_manager,
83 meta_peer_client,
84 mailbox,
85 server_addr,
86 })
87 }
88}
89
90#[async_trait::async_trait]
91impl SchedulerCtx for DefaultGcSchedulerCtx {
92 async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
93 let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
94 let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
95 for (_dn_id, stats) in dn_stats {
96 let stats = stats.stats;
97
98 let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
99 continue;
100 };
101
102 for region_stat in latest_stat.region_stats {
103 table_to_region_stats
104 .entry(region_stat.id.table_id())
105 .or_default()
106 .push(region_stat);
107 }
108 }
109 Ok(table_to_region_stats)
110 }
111
112 async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
113 self.table_metadata_manager
114 .table_repart_manager()
115 .table_reparts()
116 .await
117 .context(TableMetadataManagerSnafu)
118 }
119
120 async fn get_table_route(
121 &self,
122 table_id: TableId,
123 ) -> Result<(TableId, PhysicalTableRouteValue)> {
124 self.table_metadata_manager
125 .table_route_manager()
126 .get_physical_table_route(table_id)
127 .await
128 .context(TableMetadataManagerSnafu)
129 }
130
131 async fn batch_get_table_route(
132 &self,
133 table_ids: &[TableId],
134 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
135 self.table_metadata_manager
136 .table_route_manager()
137 .batch_get_physical_table_routes(table_ids)
138 .await
139 .context(TableMetadataManagerSnafu)
140 }
141
142 async fn gc_regions(
143 &self,
144 region_ids: &[RegionId],
145 full_file_listing: bool,
146 timeout: Duration,
147 region_routes_override: Region2Peers,
148 ) -> Result<GcReport> {
149 self.gc_regions_inner(
150 region_ids,
151 full_file_listing,
152 timeout,
153 region_routes_override,
154 )
155 .await
156 }
157}
158
159impl DefaultGcSchedulerCtx {
160 async fn gc_regions_inner(
161 &self,
162 region_ids: &[RegionId],
163 full_file_listing: bool,
164 timeout: Duration,
165 region_routes_override: Region2Peers,
166 ) -> Result<GcReport> {
167 debug!(
168 "Sending GC instruction for {} regions (full_file_listing: {})",
169 region_ids.len(),
170 full_file_listing
171 );
172
173 let procedure = BatchGcProcedure::new(
174 self.mailbox.clone(),
175 self.table_metadata_manager.clone(),
176 self.server_addr.clone(),
177 region_ids.to_vec(),
178 full_file_listing,
179 timeout,
180 region_routes_override,
181 );
182 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
183
184 let id = procedure_with_id.id;
185
186 let mut watcher = self
187 .procedure_manager
188 .submit(procedure_with_id)
189 .await
190 .context(error::SubmitProcedureSnafu)?;
191 let res = watcher::wait(&mut watcher)
192 .await
193 .context(error::WaitProcedureSnafu)?
194 .with_context(|| error::UnexpectedSnafu {
195 violated: format!(
196 "GC procedure {id} successfully completed but no result returned"
197 ),
198 })?;
199
200 let gc_report = BatchGcProcedure::cast_result(res)?;
201
202 Ok(gc_report)
203 }
204}