meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21    MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22    QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23    ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use snafu::{OptionExt, ResultExt};
29use table::table_reference::TableReference;
30use tonic::Request;
31
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37use crate::{check_leader, error};
38
39#[async_trait::async_trait]
40impl procedure_service_server::ProcedureService for Metasrv {
41    async fn query(
42        &self,
43        request: Request<QueryProcedureRequest>,
44    ) -> GrpcResult<ProcedureStateResponse> {
45        check_leader!(
46            self,
47            request,
48            ProcedureStateResponse,
49            "`query procedure state`"
50        );
51
52        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53        let _header = header.context(error::MissingRequestHeaderSnafu)?;
54        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57        let state = self
58            .procedure_manager()
59            .procedure_state(pid)
60            .await
61            .context(error::QueryProcedureSnafu)?
62            .context(error::ProcedureNotFoundSnafu {
63                pid: pid.to_string(),
64            })?;
65
66        Ok(Response::new(procedure::procedure_state_to_pb_response(
67            &state,
68        )))
69    }
70
71    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
73
74        let PbDdlTaskRequest {
75            header,
76            query_context,
77            task,
78            wait,
79            timeout_secs,
80        } = request.into_inner();
81
82        let header = header.context(error::MissingRequestHeaderSnafu)?;
83        let query_context = query_context
84            .context(error::MissingRequiredParameterSnafu {
85                param: "query_context",
86            })?
87            .into();
88        let task: DdlTask = task
89            .context(error::MissingRequiredParameterSnafu { param: "task" })?
90            .try_into()
91            .context(error::ConvertProtoDataSnafu)?;
92
93        let resp = self
94            .ddl_manager()
95            .submit_ddl_task(
96                &ExecutorContext {
97                    tracing_context: Some(header.tracing_context),
98                },
99                SubmitDdlTaskRequest {
100                    query_context: Arc::new(query_context),
101                    wait,
102                    timeout: Duration::from_secs(timeout_secs.into()),
103                    task,
104                },
105            )
106            .await
107            .context(error::SubmitDdlTaskSnafu)?
108            .into();
109
110        Ok(Response::new(resp))
111    }
112
113    async fn migrate(
114        &self,
115        request: Request<MigrateRegionRequest>,
116    ) -> GrpcResult<MigrateRegionResponse> {
117        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
118
119        let MigrateRegionRequest {
120            header,
121            region_id,
122            from_peer,
123            to_peer,
124            timeout_secs,
125        } = request.into_inner();
126
127        let _header = header.context(error::MissingRequestHeaderSnafu)?;
128        let from_peer = self
129            .lookup_datanode_peer(from_peer)
130            .await?
131            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
132        let to_peer = self
133            .lookup_datanode_peer(to_peer)
134            .await?
135            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
136
137        let pid = self
138            .region_migration_manager()
139            .submit_procedure(RegionMigrationProcedureTask {
140                region_id: region_id.into(),
141                from_peer,
142                to_peer,
143                timeout: Duration::from_secs(timeout_secs.into()),
144                trigger_reason: RegionMigrationTriggerReason::Manual,
145            })
146            .await?
147            .map(procedure::pid_to_pb_pid);
148
149        let resp = MigrateRegionResponse {
150            pid,
151            ..Default::default()
152        };
153
154        Ok(Response::new(resp))
155    }
156
157    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
158        check_leader!(self, request, ReconcileResponse, "`reconcile`");
159
160        let ReconcileRequest { header, target } = request.into_inner();
161        let _header = header.context(error::MissingRequestHeaderSnafu)?;
162        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
163        let parse_resolve_strategy = |resolve_strategy: i32| {
164            ResolveStrategy::try_from(resolve_strategy)
165                .ok()
166                .context(error::UnexpectedSnafu {
167                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
168                })
169        };
170        let procedure_id = match target {
171            Target::ReconcileTable(table) => {
172                let ReconcileTable {
173                    catalog_name,
174                    schema_name,
175                    table_name,
176                    resolve_strategy,
177                } = table;
178                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
179                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
180                self.reconciliation_manager()
181                    .reconcile_table(table_ref, resolve_strategy.into())
182                    .await
183                    .context(error::SubmitReconcileProcedureSnafu)?
184            }
185            Target::ReconcileDatabase(database) => {
186                let ReconcileDatabase {
187                    catalog_name,
188                    database_name,
189                    resolve_strategy,
190                    parallelism,
191                } = database;
192                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
193                self.reconciliation_manager()
194                    .reconcile_database(
195                        catalog_name,
196                        database_name,
197                        resolve_strategy.into(),
198                        parallelism as usize,
199                    )
200                    .await
201                    .context(error::SubmitReconcileProcedureSnafu)?
202            }
203            Target::ReconcileCatalog(catalog) => {
204                let ReconcileCatalog {
205                    catalog_name,
206                    resolve_strategy,
207                    parallelism,
208                } = catalog;
209                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
210                self.reconciliation_manager()
211                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
212                    .await
213                    .context(error::SubmitReconcileProcedureSnafu)?
214            }
215        };
216        Ok(Response::new(ReconcileResponse {
217            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
218            ..Default::default()
219        }))
220    }
221
222    async fn details(
223        &self,
224        request: Request<ProcedureDetailRequest>,
225    ) -> GrpcResult<ProcedureDetailResponse> {
226        check_leader!(
227            self,
228            request,
229            ProcedureDetailResponse,
230            "`procedure details`"
231        );
232
233        let ProcedureDetailRequest { header } = request.into_inner();
234        let _header = header.context(error::MissingRequestHeaderSnafu)?;
235        let metas = self
236            .procedure_manager()
237            .list_procedures()
238            .await
239            .context(error::QueryProcedureSnafu)?;
240        Ok(Response::new(procedure::procedure_details_to_pb_response(
241            metas,
242        )))
243    }
244}