meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::reconcile_request::Target;
18use api::v1::meta::{
19    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest,
20    GcRegionsResponse, GcStats, GcTableRequest, GcTableResponse, MigrateRegionRequest,
21    MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22    QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23    ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::table_name::TableNameKey;
27use common_meta::procedure_executor::ExecutorContext;
28use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
29use common_meta::rpc::procedure::{
30    self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse,
31    GcTableRequest as MetaGcTableRequest,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::storage::RegionId;
35use table::table_reference::TableReference;
36use tonic::Request;
37
38use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu};
39use crate::metasrv::Metasrv;
40use crate::procedure::region_migration::manager::{
41    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
42};
43use crate::service::GrpcResult;
44use crate::{check_leader, error, gc};
45
46#[async_trait::async_trait]
47impl procedure_service_server::ProcedureService for Metasrv {
48    async fn query(
49        &self,
50        request: Request<QueryProcedureRequest>,
51    ) -> GrpcResult<ProcedureStateResponse> {
52        check_leader!(
53            self,
54            request,
55            ProcedureStateResponse,
56            "`query procedure state`"
57        );
58
59        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
60        let _header = header.context(error::MissingRequestHeaderSnafu)?;
61        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
62        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
63
64        let state = self
65            .procedure_manager()
66            .procedure_state(pid)
67            .await
68            .context(error::QueryProcedureSnafu)?
69            .context(error::ProcedureNotFoundSnafu {
70                pid: pid.to_string(),
71            })?;
72
73        Ok(Response::new(procedure::procedure_state_to_pb_response(
74            &state,
75        )))
76    }
77
78    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
79        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
80
81        let PbDdlTaskRequest {
82            header,
83            query_context,
84            task,
85            wait,
86            timeout_secs,
87        } = request.into_inner();
88
89        let header = header.context(error::MissingRequestHeaderSnafu)?;
90        let query_context = query_context
91            .context(error::MissingRequiredParameterSnafu {
92                param: "query_context",
93            })?
94            .into();
95        let task: DdlTask = task
96            .context(error::MissingRequiredParameterSnafu { param: "task" })?
97            .try_into()
98            .context(error::ConvertProtoDataSnafu)?;
99
100        let resp = self
101            .ddl_manager()
102            .submit_ddl_task(
103                &ExecutorContext {
104                    tracing_context: Some(header.tracing_context),
105                },
106                SubmitDdlTaskRequest {
107                    query_context,
108                    wait,
109                    timeout: Duration::from_secs(timeout_secs.into()),
110                    task,
111                },
112            )
113            .await
114            .context(error::SubmitDdlTaskSnafu)?
115            .into();
116
117        Ok(Response::new(resp))
118    }
119
120    async fn migrate(
121        &self,
122        request: Request<MigrateRegionRequest>,
123    ) -> GrpcResult<MigrateRegionResponse> {
124        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
125
126        let MigrateRegionRequest {
127            header,
128            region_id,
129            from_peer,
130            to_peer,
131            timeout_secs,
132        } = request.into_inner();
133
134        let _header = header.context(error::MissingRequestHeaderSnafu)?;
135        let from_peer = self
136            .lookup_datanode_peer(from_peer)
137            .await?
138            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
139        let to_peer = self
140            .lookup_datanode_peer(to_peer)
141            .await?
142            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
143
144        let pid = self
145            .region_migration_manager()
146            .submit_procedure(RegionMigrationProcedureTask {
147                region_id: region_id.into(),
148                from_peer,
149                to_peer,
150                timeout: Duration::from_secs(timeout_secs.into()),
151                trigger_reason: RegionMigrationTriggerReason::Manual,
152            })
153            .await?
154            .map(procedure::pid_to_pb_pid);
155
156        let resp = MigrateRegionResponse {
157            pid,
158            ..Default::default()
159        };
160
161        Ok(Response::new(resp))
162    }
163
164    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
165        check_leader!(self, request, ReconcileResponse, "`reconcile`");
166
167        let ReconcileRequest { header, target } = request.into_inner();
168        let _header = header.context(error::MissingRequestHeaderSnafu)?;
169        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
170        let parse_resolve_strategy = |resolve_strategy: i32| {
171            ResolveStrategy::try_from(resolve_strategy)
172                .ok()
173                .context(error::UnexpectedSnafu {
174                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
175                })
176        };
177        let procedure_id = match target {
178            Target::ReconcileTable(table) => {
179                let ReconcileTable {
180                    catalog_name,
181                    schema_name,
182                    table_name,
183                    resolve_strategy,
184                } = table;
185                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
186                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
187                self.reconciliation_manager()
188                    .reconcile_table(table_ref, resolve_strategy.into())
189                    .await
190                    .context(error::SubmitReconcileProcedureSnafu)?
191            }
192            Target::ReconcileDatabase(database) => {
193                let ReconcileDatabase {
194                    catalog_name,
195                    database_name,
196                    resolve_strategy,
197                    parallelism,
198                } = database;
199                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
200                self.reconciliation_manager()
201                    .reconcile_database(
202                        catalog_name,
203                        database_name,
204                        resolve_strategy.into(),
205                        parallelism as usize,
206                    )
207                    .await
208                    .context(error::SubmitReconcileProcedureSnafu)?
209            }
210            Target::ReconcileCatalog(catalog) => {
211                let ReconcileCatalog {
212                    catalog_name,
213                    resolve_strategy,
214                    parallelism,
215                } = catalog;
216                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
217                self.reconciliation_manager()
218                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
219                    .await
220                    .context(error::SubmitReconcileProcedureSnafu)?
221            }
222        };
223        Ok(Response::new(ReconcileResponse {
224            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
225            ..Default::default()
226        }))
227    }
228
229    async fn details(
230        &self,
231        request: Request<ProcedureDetailRequest>,
232    ) -> GrpcResult<ProcedureDetailResponse> {
233        check_leader!(
234            self,
235            request,
236            ProcedureDetailResponse,
237            "`procedure details`"
238        );
239
240        let ProcedureDetailRequest { header } = request.into_inner();
241        let _header = header.context(error::MissingRequestHeaderSnafu)?;
242        let metas = self
243            .procedure_manager()
244            .list_procedures()
245            .await
246            .context(error::QueryProcedureSnafu)?;
247        Ok(Response::new(procedure::procedure_details_to_pb_response(
248            metas,
249        )))
250    }
251
252    async fn gc_regions(
253        &self,
254        request: Request<GcRegionsRequest>,
255    ) -> GrpcResult<GcRegionsResponse> {
256        check_leader!(self, request, GcRegionsResponse, "`gc_regions`");
257
258        let GcRegionsRequest {
259            header,
260            region_ids,
261            full_file_listing,
262            timeout_secs,
263        } = request.into_inner();
264
265        let _header = header.context(error::MissingRequestHeaderSnafu)?;
266
267        let response = self
268            .handle_gc_regions(MetaGcRegionsRequest {
269                region_ids,
270                full_file_listing,
271                timeout: Duration::from_secs(timeout_secs as u64),
272            })
273            .await?;
274
275        Ok(Response::new(gc_response_to_regions_pb(response)))
276    }
277
278    async fn gc_table(&self, request: Request<GcTableRequest>) -> GrpcResult<GcTableResponse> {
279        check_leader!(self, request, GcTableResponse, "`gc_table`");
280
281        let GcTableRequest {
282            header,
283            catalog_name,
284            schema_name,
285            table_name,
286            full_file_listing,
287            timeout_secs,
288        } = request.into_inner();
289
290        let _header = header.context(error::MissingRequestHeaderSnafu)?;
291
292        let response = self
293            .handle_gc_table(MetaGcTableRequest {
294                catalog_name,
295                schema_name,
296                table_name,
297                full_file_listing,
298                timeout: Duration::from_secs(timeout_secs as u64),
299            })
300            .await?;
301
302        Ok(Response::new(gc_response_to_table_pb(response)))
303    }
304}
305
306impl Metasrv {
307    fn normalize_gc_timeout(timeout: Duration) -> Option<Duration> {
308        if timeout.is_zero() {
309            None
310        } else {
311            Some(timeout)
312        }
313    }
314
315    async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result<GcResponse> {
316        let region_ids: Vec<RegionId> = request
317            .region_ids
318            .into_iter()
319            .map(RegionId::from_u64)
320            .collect();
321        self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
322            .await
323    }
324
325    async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result<GcResponse> {
326        let table_name_key = TableNameKey::new(
327            &request.catalog_name,
328            &request.schema_name,
329            &request.table_name,
330        );
331
332        let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager();
333        let table_id = table_metadata_manager
334            .table_name_manager()
335            .get(table_name_key)
336            .await
337            .context(TableMetadataManagerSnafu)?
338            .context(TableNotFoundSnafu {
339                name: request.table_name.clone(),
340            })?
341            .table_id();
342
343        let (_phy_table_id, route) = table_metadata_manager
344            .table_route_manager()
345            .get_physical_table_route(table_id)
346            .await
347            .context(TableMetadataManagerSnafu)?;
348
349        let region_ids: Vec<RegionId> = route.region_routes.iter().map(|r| r.region.id).collect();
350        self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
351            .await
352    }
353
354    /// Triggers manual GC for specified regions and returns the GC response.
355    async fn trigger_gc_for_regions(
356        &self,
357        region_ids: Vec<RegionId>,
358        full_file_listing: bool,
359        timeout: Duration,
360    ) -> error::Result<GcResponse> {
361        let timeout = Self::normalize_gc_timeout(timeout);
362        let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
363            violated: "GC ticker not available".to_string(),
364        })?;
365
366        let (tx, rx) = tokio::sync::oneshot::channel();
367        gc_ticker
368            .sender
369            .send(gc::Event::Manually {
370                sender: tx,
371                region_ids: Some(region_ids),
372                full_file_listing: Some(full_file_listing),
373                timeout,
374            })
375            .await
376            .map_err(|_| {
377                error::UnexpectedSnafu {
378                    violated: "Failed to send GC event".to_string(),
379                }
380                .build()
381            })?;
382
383        let job_report = rx.await.map_err(|_| {
384            error::UnexpectedSnafu {
385                violated: "GC job channel closed unexpectedly".to_string(),
386            }
387            .build()
388        })??;
389
390        let report = gc_job_report_to_gc_report(job_report);
391
392        Ok(gc_report_to_response(&report))
393    }
394}
395
396fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport {
397    job_report.merge_to_report()
398}
399
400fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {
401    let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum();
402    let deleted_indexes = report
403        .deleted_indexes
404        .values()
405        .map(|v| v.len() as u64)
406        .sum();
407    GcResponse {
408        processed_regions: report.processed_regions.len() as u64,
409        need_retry_regions: report
410            .need_retry_regions
411            .iter()
412            .map(|id| id.as_u64())
413            .collect(),
414        deleted_files,
415        deleted_indexes,
416    }
417}
418
419fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse {
420    GcRegionsResponse {
421        stats: Some(GcStats {
422            processed_regions: resp.processed_regions,
423            need_retry_regions: resp.need_retry_regions,
424            deleted_files: resp.deleted_files,
425            deleted_indexes: resp.deleted_indexes,
426        }),
427        ..Default::default()
428    }
429}
430
431fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse {
432    GcTableResponse {
433        stats: Some(GcStats {
434            processed_regions: resp.processed_regions,
435            need_retry_regions: resp.need_retry_regions,
436            deleted_files: resp.deleted_files,
437            deleted_indexes: resp.deleted_indexes,
438        }),
439        ..Default::default()
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use std::time::Duration;
446
447    use super::Metasrv;
448
449    #[test]
450    fn test_normalize_gc_timeout() {
451        assert_eq!(Metasrv::normalize_gc_timeout(Duration::ZERO), None);
452        assert_eq!(
453            Metasrv::normalize_gc_timeout(Duration::from_secs(10)),
454            Some(Duration::from_secs(10))
455        );
456    }
457}