meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20    procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
21    DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22    ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
23    ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable,
24    ResolveStrategy,
25};
26use common_meta::procedure_executor::ExecutorContext;
27use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
28use common_meta::rpc::procedure;
29use common_telemetry::warn;
30use snafu::{OptionExt, ResultExt};
31use table::table_reference::TableReference;
32use tonic::Request;
33
34use crate::metasrv::Metasrv;
35use crate::procedure::region_migration::manager::{
36    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
37};
38use crate::service::GrpcResult;
39use crate::{check_leader, error};
40
41#[async_trait::async_trait]
42impl procedure_service_server::ProcedureService for Metasrv {
43    async fn query(
44        &self,
45        request: Request<QueryProcedureRequest>,
46    ) -> GrpcResult<ProcedureStateResponse> {
47        check_leader!(
48            self,
49            request,
50            ProcedureStateResponse,
51            "`query procedure state`"
52        );
53
54        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
55        let _header = header.context(error::MissingRequestHeaderSnafu)?;
56        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
57        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
58
59        let state = self
60            .procedure_manager()
61            .procedure_state(pid)
62            .await
63            .context(error::QueryProcedureSnafu)?
64            .context(error::ProcedureNotFoundSnafu {
65                pid: pid.to_string(),
66            })?;
67
68        Ok(Response::new(procedure::procedure_state_to_pb_response(
69            &state,
70        )))
71    }
72
73    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
74        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
75
76        let PbDdlTaskRequest {
77            header,
78            query_context,
79            task,
80            ..
81        } = request.into_inner();
82
83        let header = header.context(error::MissingRequestHeaderSnafu)?;
84        let query_context = query_context
85            .context(error::MissingRequiredParameterSnafu {
86                param: "query_context",
87            })?
88            .into();
89        let task: DdlTask = task
90            .context(error::MissingRequiredParameterSnafu { param: "task" })?
91            .try_into()
92            .context(error::ConvertProtoDataSnafu)?;
93
94        let resp = self
95            .ddl_manager()
96            .submit_ddl_task(
97                &ExecutorContext {
98                    tracing_context: Some(header.tracing_context),
99                },
100                SubmitDdlTaskRequest {
101                    query_context: Arc::new(query_context),
102                    task,
103                },
104            )
105            .await
106            .context(error::SubmitDdlTaskSnafu)?
107            .into();
108
109        Ok(Response::new(resp))
110    }
111
112    async fn migrate(
113        &self,
114        request: Request<MigrateRegionRequest>,
115    ) -> GrpcResult<MigrateRegionResponse> {
116        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
117
118        let MigrateRegionRequest {
119            header,
120            region_id,
121            from_peer,
122            to_peer,
123            timeout_secs,
124        } = request.into_inner();
125
126        let _header = header.context(error::MissingRequestHeaderSnafu)?;
127        let from_peer = self
128            .lookup_datanode_peer(from_peer)
129            .await?
130            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
131        let to_peer = self
132            .lookup_datanode_peer(to_peer)
133            .await?
134            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
135
136        let pid = self
137            .region_migration_manager()
138            .submit_procedure(RegionMigrationProcedureTask {
139                region_id: region_id.into(),
140                from_peer,
141                to_peer,
142                timeout: Duration::from_secs(timeout_secs.into()),
143                trigger_reason: RegionMigrationTriggerReason::Manual,
144            })
145            .await?
146            .map(procedure::pid_to_pb_pid);
147
148        let resp = MigrateRegionResponse {
149            pid,
150            ..Default::default()
151        };
152
153        Ok(Response::new(resp))
154    }
155
156    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
157        check_leader!(self, request, ReconcileResponse, "`reconcile`");
158
159        let ReconcileRequest { header, target } = request.into_inner();
160        let _header = header.context(error::MissingRequestHeaderSnafu)?;
161        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
162        let parse_resolve_strategy = |resolve_strategy: i32| {
163            ResolveStrategy::try_from(resolve_strategy)
164                .ok()
165                .context(error::UnexpectedSnafu {
166                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
167                })
168        };
169        let procedure_id = match target {
170            Target::ReconcileTable(table) => {
171                let ReconcileTable {
172                    catalog_name,
173                    schema_name,
174                    table_name,
175                    resolve_strategy,
176                } = table;
177                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
178                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
179                self.reconciliation_manager()
180                    .reconcile_table(table_ref, resolve_strategy.into())
181                    .await
182                    .context(error::SubmitReconcileProcedureSnafu)?
183            }
184            Target::ReconcileDatabase(database) => {
185                let ReconcileDatabase {
186                    catalog_name,
187                    database_name,
188                    resolve_strategy,
189                    parallelism,
190                } = database;
191                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
192                self.reconciliation_manager()
193                    .reconcile_database(
194                        catalog_name,
195                        database_name,
196                        resolve_strategy.into(),
197                        parallelism as usize,
198                    )
199                    .await
200                    .context(error::SubmitReconcileProcedureSnafu)?
201            }
202            Target::ReconcileCatalog(catalog) => {
203                let ReconcileCatalog {
204                    catalog_name,
205                    resolve_strategy,
206                    parallelism,
207                } = catalog;
208                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
209                self.reconciliation_manager()
210                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
211                    .await
212                    .context(error::SubmitReconcileProcedureSnafu)?
213            }
214        };
215        Ok(Response::new(ReconcileResponse {
216            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
217            ..Default::default()
218        }))
219    }
220
221    async fn details(
222        &self,
223        request: Request<ProcedureDetailRequest>,
224    ) -> GrpcResult<ProcedureDetailResponse> {
225        check_leader!(
226            self,
227            request,
228            ProcedureDetailResponse,
229            "`procedure details`"
230        );
231
232        let ProcedureDetailRequest { header } = request.into_inner();
233        let _header = header.context(error::MissingRequestHeaderSnafu)?;
234        let metas = self
235            .procedure_manager()
236            .list_procedures()
237            .await
238            .context(error::QueryProcedureSnafu)?;
239        Ok(Response::new(procedure::procedure_details_to_pb_response(
240            metas,
241        )))
242    }
243}