1use std::time::Duration;
16
17use api::v1::meta::reconcile_request::Target;
18use api::v1::meta::{
19 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest,
20 GcRegionsResponse, GcStats, GcTableRequest, GcTableResponse, MigrateRegionRequest,
21 MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22 QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23 ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::table_name::TableNameKey;
27use common_meta::procedure_executor::ExecutorContext;
28use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
29use common_meta::rpc::procedure::{
30 self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse,
31 GcTableRequest as MetaGcTableRequest,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::storage::RegionId;
35use table::table_reference::TableReference;
36use tonic::Request;
37
38use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu};
39use crate::metasrv::Metasrv;
40use crate::procedure::region_migration::manager::{
41 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
42};
43use crate::service::GrpcResult;
44use crate::{check_leader, error, gc};
45
46#[async_trait::async_trait]
47impl procedure_service_server::ProcedureService for Metasrv {
48 async fn query(
49 &self,
50 request: Request<QueryProcedureRequest>,
51 ) -> GrpcResult<ProcedureStateResponse> {
52 check_leader!(
53 self,
54 request,
55 ProcedureStateResponse,
56 "`query procedure state`"
57 );
58
59 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
60 let _header = header.context(error::MissingRequestHeaderSnafu)?;
61 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
62 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
63
64 let state = self
65 .procedure_manager()
66 .procedure_state(pid)
67 .await
68 .context(error::QueryProcedureSnafu)?
69 .context(error::ProcedureNotFoundSnafu {
70 pid: pid.to_string(),
71 })?;
72
73 Ok(Response::new(procedure::procedure_state_to_pb_response(
74 &state,
75 )))
76 }
77
78 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
79 check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
80
81 let PbDdlTaskRequest {
82 header,
83 query_context,
84 task,
85 wait,
86 timeout_secs,
87 } = request.into_inner();
88
89 let header = header.context(error::MissingRequestHeaderSnafu)?;
90 let query_context = query_context
91 .context(error::MissingRequiredParameterSnafu {
92 param: "query_context",
93 })?
94 .into();
95 let task: DdlTask = task
96 .context(error::MissingRequiredParameterSnafu { param: "task" })?
97 .try_into()
98 .context(error::ConvertProtoDataSnafu)?;
99
100 let resp = self
101 .ddl_manager()
102 .submit_ddl_task(
103 &ExecutorContext {
104 tracing_context: Some(header.tracing_context),
105 },
106 SubmitDdlTaskRequest {
107 query_context,
108 wait,
109 timeout: Duration::from_secs(timeout_secs.into()),
110 task,
111 },
112 )
113 .await
114 .context(error::SubmitDdlTaskSnafu)?
115 .into();
116
117 Ok(Response::new(resp))
118 }
119
120 async fn migrate(
121 &self,
122 request: Request<MigrateRegionRequest>,
123 ) -> GrpcResult<MigrateRegionResponse> {
124 check_leader!(self, request, MigrateRegionResponse, "`migrate`");
125
126 let MigrateRegionRequest {
127 header,
128 region_id,
129 from_peer,
130 to_peer,
131 timeout_secs,
132 } = request.into_inner();
133
134 let _header = header.context(error::MissingRequestHeaderSnafu)?;
135 let from_peer = self
136 .lookup_datanode_peer(from_peer)
137 .await?
138 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
139 let to_peer = self
140 .lookup_datanode_peer(to_peer)
141 .await?
142 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
143
144 let pid = self
145 .region_migration_manager()
146 .submit_procedure(RegionMigrationProcedureTask {
147 region_id: region_id.into(),
148 from_peer,
149 to_peer,
150 timeout: Duration::from_secs(timeout_secs.into()),
151 trigger_reason: RegionMigrationTriggerReason::Manual,
152 })
153 .await?
154 .map(procedure::pid_to_pb_pid);
155
156 let resp = MigrateRegionResponse {
157 pid,
158 ..Default::default()
159 };
160
161 Ok(Response::new(resp))
162 }
163
164 async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
165 check_leader!(self, request, ReconcileResponse, "`reconcile`");
166
167 let ReconcileRequest { header, target } = request.into_inner();
168 let _header = header.context(error::MissingRequestHeaderSnafu)?;
169 let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
170 let parse_resolve_strategy = |resolve_strategy: i32| {
171 ResolveStrategy::try_from(resolve_strategy)
172 .ok()
173 .context(error::UnexpectedSnafu {
174 violated: format!("Invalid resolve strategy: {}", resolve_strategy),
175 })
176 };
177 let procedure_id = match target {
178 Target::ReconcileTable(table) => {
179 let ReconcileTable {
180 catalog_name,
181 schema_name,
182 table_name,
183 resolve_strategy,
184 } = table;
185 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
186 let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
187 self.reconciliation_manager()
188 .reconcile_table(table_ref, resolve_strategy.into())
189 .await
190 .context(error::SubmitReconcileProcedureSnafu)?
191 }
192 Target::ReconcileDatabase(database) => {
193 let ReconcileDatabase {
194 catalog_name,
195 database_name,
196 resolve_strategy,
197 parallelism,
198 } = database;
199 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
200 self.reconciliation_manager()
201 .reconcile_database(
202 catalog_name,
203 database_name,
204 resolve_strategy.into(),
205 parallelism as usize,
206 )
207 .await
208 .context(error::SubmitReconcileProcedureSnafu)?
209 }
210 Target::ReconcileCatalog(catalog) => {
211 let ReconcileCatalog {
212 catalog_name,
213 resolve_strategy,
214 parallelism,
215 } = catalog;
216 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
217 self.reconciliation_manager()
218 .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
219 .await
220 .context(error::SubmitReconcileProcedureSnafu)?
221 }
222 };
223 Ok(Response::new(ReconcileResponse {
224 pid: Some(procedure::pid_to_pb_pid(procedure_id)),
225 ..Default::default()
226 }))
227 }
228
229 async fn details(
230 &self,
231 request: Request<ProcedureDetailRequest>,
232 ) -> GrpcResult<ProcedureDetailResponse> {
233 check_leader!(
234 self,
235 request,
236 ProcedureDetailResponse,
237 "`procedure details`"
238 );
239
240 let ProcedureDetailRequest { header } = request.into_inner();
241 let _header = header.context(error::MissingRequestHeaderSnafu)?;
242 let metas = self
243 .procedure_manager()
244 .list_procedures()
245 .await
246 .context(error::QueryProcedureSnafu)?;
247 Ok(Response::new(procedure::procedure_details_to_pb_response(
248 metas,
249 )))
250 }
251
252 async fn gc_regions(
253 &self,
254 request: Request<GcRegionsRequest>,
255 ) -> GrpcResult<GcRegionsResponse> {
256 check_leader!(self, request, GcRegionsResponse, "`gc_regions`");
257
258 let GcRegionsRequest {
259 header,
260 region_ids,
261 full_file_listing,
262 timeout_secs,
263 } = request.into_inner();
264
265 let _header = header.context(error::MissingRequestHeaderSnafu)?;
266
267 let response = self
268 .handle_gc_regions(MetaGcRegionsRequest {
269 region_ids,
270 full_file_listing,
271 timeout: Duration::from_secs(timeout_secs as u64),
272 })
273 .await?;
274
275 Ok(Response::new(gc_response_to_regions_pb(response)))
276 }
277
278 async fn gc_table(&self, request: Request<GcTableRequest>) -> GrpcResult<GcTableResponse> {
279 check_leader!(self, request, GcTableResponse, "`gc_table`");
280
281 let GcTableRequest {
282 header,
283 catalog_name,
284 schema_name,
285 table_name,
286 full_file_listing,
287 timeout_secs,
288 } = request.into_inner();
289
290 let _header = header.context(error::MissingRequestHeaderSnafu)?;
291
292 let response = self
293 .handle_gc_table(MetaGcTableRequest {
294 catalog_name,
295 schema_name,
296 table_name,
297 full_file_listing,
298 timeout: Duration::from_secs(timeout_secs as u64),
299 })
300 .await?;
301
302 Ok(Response::new(gc_response_to_table_pb(response)))
303 }
304}
305
306impl Metasrv {
307 fn normalize_gc_timeout(timeout: Duration) -> Option<Duration> {
308 if timeout.is_zero() {
309 None
310 } else {
311 Some(timeout)
312 }
313 }
314
315 async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result<GcResponse> {
316 let region_ids: Vec<RegionId> = request
317 .region_ids
318 .into_iter()
319 .map(RegionId::from_u64)
320 .collect();
321 self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
322 .await
323 }
324
325 async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result<GcResponse> {
326 let table_name_key = TableNameKey::new(
327 &request.catalog_name,
328 &request.schema_name,
329 &request.table_name,
330 );
331
332 let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager();
333 let table_id = table_metadata_manager
334 .table_name_manager()
335 .get(table_name_key)
336 .await
337 .context(TableMetadataManagerSnafu)?
338 .context(TableNotFoundSnafu {
339 name: request.table_name.clone(),
340 })?
341 .table_id();
342
343 let (_phy_table_id, route) = table_metadata_manager
344 .table_route_manager()
345 .get_physical_table_route(table_id)
346 .await
347 .context(TableMetadataManagerSnafu)?;
348
349 let region_ids: Vec<RegionId> = route.region_routes.iter().map(|r| r.region.id).collect();
350 self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
351 .await
352 }
353
354 async fn trigger_gc_for_regions(
356 &self,
357 region_ids: Vec<RegionId>,
358 full_file_listing: bool,
359 timeout: Duration,
360 ) -> error::Result<GcResponse> {
361 let timeout = Self::normalize_gc_timeout(timeout);
362 let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
363 violated: "GC ticker not available".to_string(),
364 })?;
365
366 let (tx, rx) = tokio::sync::oneshot::channel();
367 gc_ticker
368 .sender
369 .send(gc::Event::Manually {
370 sender: tx,
371 region_ids: Some(region_ids),
372 full_file_listing: Some(full_file_listing),
373 timeout,
374 })
375 .await
376 .map_err(|_| {
377 error::UnexpectedSnafu {
378 violated: "Failed to send GC event".to_string(),
379 }
380 .build()
381 })?;
382
383 let job_report = rx.await.map_err(|_| {
384 error::UnexpectedSnafu {
385 violated: "GC job channel closed unexpectedly".to_string(),
386 }
387 .build()
388 })??;
389
390 let report = gc_job_report_to_gc_report(job_report);
391
392 Ok(gc_report_to_response(&report))
393 }
394}
395
396fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport {
397 job_report.merge_to_report()
398}
399
400fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {
401 let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum();
402 let deleted_indexes = report
403 .deleted_indexes
404 .values()
405 .map(|v| v.len() as u64)
406 .sum();
407 GcResponse {
408 processed_regions: report.processed_regions.len() as u64,
409 need_retry_regions: report
410 .need_retry_regions
411 .iter()
412 .map(|id| id.as_u64())
413 .collect(),
414 deleted_files,
415 deleted_indexes,
416 }
417}
418
419fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse {
420 GcRegionsResponse {
421 stats: Some(GcStats {
422 processed_regions: resp.processed_regions,
423 need_retry_regions: resp.need_retry_regions,
424 deleted_files: resp.deleted_files,
425 deleted_indexes: resp.deleted_indexes,
426 }),
427 ..Default::default()
428 }
429}
430
431fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse {
432 GcTableResponse {
433 stats: Some(GcStats {
434 processed_regions: resp.processed_regions,
435 need_retry_regions: resp.need_retry_regions,
436 deleted_files: resp.deleted_files,
437 deleted_indexes: resp.deleted_indexes,
438 }),
439 ..Default::default()
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use std::time::Duration;
446
447 use super::Metasrv;
448
449 #[test]
450 fn test_normalize_gc_timeout() {
451 assert_eq!(Metasrv::normalize_gc_timeout(Duration::ZERO), None);
452 assert_eq!(
453 Metasrv::normalize_gc_timeout(Duration::from_secs(10)),
454 Some(Duration::from_secs(10))
455 );
456 }
457}