meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21    MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22    QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23    ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use snafu::{OptionExt, ResultExt};
29use table::table_reference::TableReference;
30use tonic::Request;
31
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37use crate::{check_leader, error};
38
39#[async_trait::async_trait]
40impl procedure_service_server::ProcedureService for Metasrv {
41    async fn query(
42        &self,
43        request: Request<QueryProcedureRequest>,
44    ) -> GrpcResult<ProcedureStateResponse> {
45        check_leader!(
46            self,
47            request,
48            ProcedureStateResponse,
49            "`query procedure state`"
50        );
51
52        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53        let _header = header.context(error::MissingRequestHeaderSnafu)?;
54        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57        let state = self
58            .procedure_manager()
59            .procedure_state(pid)
60            .await
61            .context(error::QueryProcedureSnafu)?
62            .context(error::ProcedureNotFoundSnafu {
63                pid: pid.to_string(),
64            })?;
65
66        Ok(Response::new(procedure::procedure_state_to_pb_response(
67            &state,
68        )))
69    }
70
71    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
73
74        let PbDdlTaskRequest {
75            header,
76            query_context,
77            task,
78            ..
79        } = request.into_inner();
80
81        let header = header.context(error::MissingRequestHeaderSnafu)?;
82        let query_context = query_context
83            .context(error::MissingRequiredParameterSnafu {
84                param: "query_context",
85            })?
86            .into();
87        let task: DdlTask = task
88            .context(error::MissingRequiredParameterSnafu { param: "task" })?
89            .try_into()
90            .context(error::ConvertProtoDataSnafu)?;
91
92        let resp = self
93            .ddl_manager()
94            .submit_ddl_task(
95                &ExecutorContext {
96                    tracing_context: Some(header.tracing_context),
97                },
98                SubmitDdlTaskRequest {
99                    query_context: Arc::new(query_context),
100                    task,
101                },
102            )
103            .await
104            .context(error::SubmitDdlTaskSnafu)?
105            .into();
106
107        Ok(Response::new(resp))
108    }
109
110    async fn migrate(
111        &self,
112        request: Request<MigrateRegionRequest>,
113    ) -> GrpcResult<MigrateRegionResponse> {
114        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
115
116        let MigrateRegionRequest {
117            header,
118            region_id,
119            from_peer,
120            to_peer,
121            timeout_secs,
122        } = request.into_inner();
123
124        let _header = header.context(error::MissingRequestHeaderSnafu)?;
125        let from_peer = self
126            .lookup_datanode_peer(from_peer)
127            .await?
128            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
129        let to_peer = self
130            .lookup_datanode_peer(to_peer)
131            .await?
132            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
133
134        let pid = self
135            .region_migration_manager()
136            .submit_procedure(RegionMigrationProcedureTask {
137                region_id: region_id.into(),
138                from_peer,
139                to_peer,
140                timeout: Duration::from_secs(timeout_secs.into()),
141                trigger_reason: RegionMigrationTriggerReason::Manual,
142            })
143            .await?
144            .map(procedure::pid_to_pb_pid);
145
146        let resp = MigrateRegionResponse {
147            pid,
148            ..Default::default()
149        };
150
151        Ok(Response::new(resp))
152    }
153
154    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
155        check_leader!(self, request, ReconcileResponse, "`reconcile`");
156
157        let ReconcileRequest { header, target } = request.into_inner();
158        let _header = header.context(error::MissingRequestHeaderSnafu)?;
159        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
160        let parse_resolve_strategy = |resolve_strategy: i32| {
161            ResolveStrategy::try_from(resolve_strategy)
162                .ok()
163                .context(error::UnexpectedSnafu {
164                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
165                })
166        };
167        let procedure_id = match target {
168            Target::ReconcileTable(table) => {
169                let ReconcileTable {
170                    catalog_name,
171                    schema_name,
172                    table_name,
173                    resolve_strategy,
174                } = table;
175                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
176                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
177                self.reconciliation_manager()
178                    .reconcile_table(table_ref, resolve_strategy.into())
179                    .await
180                    .context(error::SubmitReconcileProcedureSnafu)?
181            }
182            Target::ReconcileDatabase(database) => {
183                let ReconcileDatabase {
184                    catalog_name,
185                    database_name,
186                    resolve_strategy,
187                    parallelism,
188                } = database;
189                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
190                self.reconciliation_manager()
191                    .reconcile_database(
192                        catalog_name,
193                        database_name,
194                        resolve_strategy.into(),
195                        parallelism as usize,
196                    )
197                    .await
198                    .context(error::SubmitReconcileProcedureSnafu)?
199            }
200            Target::ReconcileCatalog(catalog) => {
201                let ReconcileCatalog {
202                    catalog_name,
203                    resolve_strategy,
204                    parallelism,
205                } = catalog;
206                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
207                self.reconciliation_manager()
208                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
209                    .await
210                    .context(error::SubmitReconcileProcedureSnafu)?
211            }
212        };
213        Ok(Response::new(ReconcileResponse {
214            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
215            ..Default::default()
216        }))
217    }
218
219    async fn details(
220        &self,
221        request: Request<ProcedureDetailRequest>,
222    ) -> GrpcResult<ProcedureDetailResponse> {
223        check_leader!(
224            self,
225            request,
226            ProcedureDetailResponse,
227            "`procedure details`"
228        );
229
230        let ProcedureDetailRequest { header } = request.into_inner();
231        let _header = header.context(error::MissingRequestHeaderSnafu)?;
232        let metas = self
233            .procedure_manager()
234            .list_procedures()
235            .await
236            .context(error::QueryProcedureSnafu)?;
237        Ok(Response::new(procedure::procedure_details_to_pb_response(
238            metas,
239        )))
240    }
241}