meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::reconcile_request::Target;
18use api::v1::meta::{
19    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
20    MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
21    QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
22    ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
23};
24use common_meta::procedure_executor::ExecutorContext;
25use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
26use common_meta::rpc::procedure;
27use snafu::{OptionExt, ResultExt};
28use table::table_reference::TableReference;
29use tonic::Request;
30
31use crate::metasrv::Metasrv;
32use crate::procedure::region_migration::manager::{
33    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
34};
35use crate::service::GrpcResult;
36use crate::{check_leader, error};
37
38#[async_trait::async_trait]
39impl procedure_service_server::ProcedureService for Metasrv {
40    async fn query(
41        &self,
42        request: Request<QueryProcedureRequest>,
43    ) -> GrpcResult<ProcedureStateResponse> {
44        check_leader!(
45            self,
46            request,
47            ProcedureStateResponse,
48            "`query procedure state`"
49        );
50
51        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
52        let _header = header.context(error::MissingRequestHeaderSnafu)?;
53        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
54        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
55
56        let state = self
57            .procedure_manager()
58            .procedure_state(pid)
59            .await
60            .context(error::QueryProcedureSnafu)?
61            .context(error::ProcedureNotFoundSnafu {
62                pid: pid.to_string(),
63            })?;
64
65        Ok(Response::new(procedure::procedure_state_to_pb_response(
66            &state,
67        )))
68    }
69
70    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
71        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
72
73        let PbDdlTaskRequest {
74            header,
75            query_context,
76            task,
77            wait,
78            timeout_secs,
79        } = request.into_inner();
80
81        let header = header.context(error::MissingRequestHeaderSnafu)?;
82        let query_context = query_context
83            .context(error::MissingRequiredParameterSnafu {
84                param: "query_context",
85            })?
86            .into();
87        let task: DdlTask = task
88            .context(error::MissingRequiredParameterSnafu { param: "task" })?
89            .try_into()
90            .context(error::ConvertProtoDataSnafu)?;
91
92        let resp = self
93            .ddl_manager()
94            .submit_ddl_task(
95                &ExecutorContext {
96                    tracing_context: Some(header.tracing_context),
97                },
98                SubmitDdlTaskRequest {
99                    query_context,
100                    wait,
101                    timeout: Duration::from_secs(timeout_secs.into()),
102                    task,
103                },
104            )
105            .await
106            .context(error::SubmitDdlTaskSnafu)?
107            .into();
108
109        Ok(Response::new(resp))
110    }
111
112    async fn migrate(
113        &self,
114        request: Request<MigrateRegionRequest>,
115    ) -> GrpcResult<MigrateRegionResponse> {
116        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
117
118        let MigrateRegionRequest {
119            header,
120            region_id,
121            from_peer,
122            to_peer,
123            timeout_secs,
124        } = request.into_inner();
125
126        let _header = header.context(error::MissingRequestHeaderSnafu)?;
127        let from_peer = self
128            .lookup_datanode_peer(from_peer)
129            .await?
130            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
131        let to_peer = self
132            .lookup_datanode_peer(to_peer)
133            .await?
134            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
135
136        let pid = self
137            .region_migration_manager()
138            .submit_procedure(RegionMigrationProcedureTask {
139                region_id: region_id.into(),
140                from_peer,
141                to_peer,
142                timeout: Duration::from_secs(timeout_secs.into()),
143                trigger_reason: RegionMigrationTriggerReason::Manual,
144            })
145            .await?
146            .map(procedure::pid_to_pb_pid);
147
148        let resp = MigrateRegionResponse {
149            pid,
150            ..Default::default()
151        };
152
153        Ok(Response::new(resp))
154    }
155
156    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
157        check_leader!(self, request, ReconcileResponse, "`reconcile`");
158
159        let ReconcileRequest { header, target } = request.into_inner();
160        let _header = header.context(error::MissingRequestHeaderSnafu)?;
161        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
162        let parse_resolve_strategy = |resolve_strategy: i32| {
163            ResolveStrategy::try_from(resolve_strategy)
164                .ok()
165                .context(error::UnexpectedSnafu {
166                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
167                })
168        };
169        let procedure_id = match target {
170            Target::ReconcileTable(table) => {
171                let ReconcileTable {
172                    catalog_name,
173                    schema_name,
174                    table_name,
175                    resolve_strategy,
176                } = table;
177                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
178                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
179                self.reconciliation_manager()
180                    .reconcile_table(table_ref, resolve_strategy.into())
181                    .await
182                    .context(error::SubmitReconcileProcedureSnafu)?
183            }
184            Target::ReconcileDatabase(database) => {
185                let ReconcileDatabase {
186                    catalog_name,
187                    database_name,
188                    resolve_strategy,
189                    parallelism,
190                } = database;
191                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
192                self.reconciliation_manager()
193                    .reconcile_database(
194                        catalog_name,
195                        database_name,
196                        resolve_strategy.into(),
197                        parallelism as usize,
198                    )
199                    .await
200                    .context(error::SubmitReconcileProcedureSnafu)?
201            }
202            Target::ReconcileCatalog(catalog) => {
203                let ReconcileCatalog {
204                    catalog_name,
205                    resolve_strategy,
206                    parallelism,
207                } = catalog;
208                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
209                self.reconciliation_manager()
210                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
211                    .await
212                    .context(error::SubmitReconcileProcedureSnafu)?
213            }
214        };
215        Ok(Response::new(ReconcileResponse {
216            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
217            ..Default::default()
218        }))
219    }
220
221    async fn details(
222        &self,
223        request: Request<ProcedureDetailRequest>,
224    ) -> GrpcResult<ProcedureDetailResponse> {
225        check_leader!(
226            self,
227            request,
228            ProcedureDetailResponse,
229            "`procedure details`"
230        );
231
232        let ProcedureDetailRequest { header } = request.into_inner();
233        let _header = header.context(error::MissingRequestHeaderSnafu)?;
234        let metas = self
235            .procedure_manager()
236            .list_procedures()
237            .await
238            .context(error::QueryProcedureSnafu)?;
239        Ok(Response::new(procedure::procedure_details_to_pb_response(
240            metas,
241        )))
242    }
243}