meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21    MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22    QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23    ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use common_telemetry::warn;
29use snafu::{OptionExt, ResultExt};
30use table::table_reference::TableReference;
31use tonic::Request;
32
33use crate::metasrv::Metasrv;
34use crate::procedure::region_migration::manager::{
35    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
36};
37use crate::service::GrpcResult;
38use crate::{check_leader, error};
39
40#[async_trait::async_trait]
41impl procedure_service_server::ProcedureService for Metasrv {
42    async fn query(
43        &self,
44        request: Request<QueryProcedureRequest>,
45    ) -> GrpcResult<ProcedureStateResponse> {
46        check_leader!(
47            self,
48            request,
49            ProcedureStateResponse,
50            "`query procedure state`"
51        );
52
53        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
54        let _header = header.context(error::MissingRequestHeaderSnafu)?;
55        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
56        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
57
58        let state = self
59            .procedure_manager()
60            .procedure_state(pid)
61            .await
62            .context(error::QueryProcedureSnafu)?
63            .context(error::ProcedureNotFoundSnafu {
64                pid: pid.to_string(),
65            })?;
66
67        Ok(Response::new(procedure::procedure_state_to_pb_response(
68            &state,
69        )))
70    }
71
72    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
73        check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
74
75        let PbDdlTaskRequest {
76            header,
77            query_context,
78            task,
79            ..
80        } = request.into_inner();
81
82        let header = header.context(error::MissingRequestHeaderSnafu)?;
83        let query_context = query_context
84            .context(error::MissingRequiredParameterSnafu {
85                param: "query_context",
86            })?
87            .into();
88        let task: DdlTask = task
89            .context(error::MissingRequiredParameterSnafu { param: "task" })?
90            .try_into()
91            .context(error::ConvertProtoDataSnafu)?;
92
93        let resp = self
94            .ddl_manager()
95            .submit_ddl_task(
96                &ExecutorContext {
97                    tracing_context: Some(header.tracing_context),
98                },
99                SubmitDdlTaskRequest {
100                    query_context: Arc::new(query_context),
101                    task,
102                },
103            )
104            .await
105            .context(error::SubmitDdlTaskSnafu)?
106            .into();
107
108        Ok(Response::new(resp))
109    }
110
111    async fn migrate(
112        &self,
113        request: Request<MigrateRegionRequest>,
114    ) -> GrpcResult<MigrateRegionResponse> {
115        check_leader!(self, request, MigrateRegionResponse, "`migrate`");
116
117        let MigrateRegionRequest {
118            header,
119            region_id,
120            from_peer,
121            to_peer,
122            timeout_secs,
123        } = request.into_inner();
124
125        let _header = header.context(error::MissingRequestHeaderSnafu)?;
126        let from_peer = self
127            .lookup_datanode_peer(from_peer)
128            .await?
129            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
130        let to_peer = self
131            .lookup_datanode_peer(to_peer)
132            .await?
133            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
134
135        let pid = self
136            .region_migration_manager()
137            .submit_procedure(RegionMigrationProcedureTask {
138                region_id: region_id.into(),
139                from_peer,
140                to_peer,
141                timeout: Duration::from_secs(timeout_secs.into()),
142                trigger_reason: RegionMigrationTriggerReason::Manual,
143            })
144            .await?
145            .map(procedure::pid_to_pb_pid);
146
147        let resp = MigrateRegionResponse {
148            pid,
149            ..Default::default()
150        };
151
152        Ok(Response::new(resp))
153    }
154
155    async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
156        check_leader!(self, request, ReconcileResponse, "`reconcile`");
157
158        let ReconcileRequest { header, target } = request.into_inner();
159        let _header = header.context(error::MissingRequestHeaderSnafu)?;
160        let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
161        let parse_resolve_strategy = |resolve_strategy: i32| {
162            ResolveStrategy::try_from(resolve_strategy)
163                .ok()
164                .context(error::UnexpectedSnafu {
165                    violated: format!("Invalid resolve strategy: {}", resolve_strategy),
166                })
167        };
168        let procedure_id = match target {
169            Target::ReconcileTable(table) => {
170                let ReconcileTable {
171                    catalog_name,
172                    schema_name,
173                    table_name,
174                    resolve_strategy,
175                } = table;
176                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
177                let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
178                self.reconciliation_manager()
179                    .reconcile_table(table_ref, resolve_strategy.into())
180                    .await
181                    .context(error::SubmitReconcileProcedureSnafu)?
182            }
183            Target::ReconcileDatabase(database) => {
184                let ReconcileDatabase {
185                    catalog_name,
186                    database_name,
187                    resolve_strategy,
188                    parallelism,
189                } = database;
190                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
191                self.reconciliation_manager()
192                    .reconcile_database(
193                        catalog_name,
194                        database_name,
195                        resolve_strategy.into(),
196                        parallelism as usize,
197                    )
198                    .await
199                    .context(error::SubmitReconcileProcedureSnafu)?
200            }
201            Target::ReconcileCatalog(catalog) => {
202                let ReconcileCatalog {
203                    catalog_name,
204                    resolve_strategy,
205                    parallelism,
206                } = catalog;
207                let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
208                self.reconciliation_manager()
209                    .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
210                    .await
211                    .context(error::SubmitReconcileProcedureSnafu)?
212            }
213        };
214        Ok(Response::new(ReconcileResponse {
215            pid: Some(procedure::pid_to_pb_pid(procedure_id)),
216            ..Default::default()
217        }))
218    }
219
220    async fn details(
221        &self,
222        request: Request<ProcedureDetailRequest>,
223    ) -> GrpcResult<ProcedureDetailResponse> {
224        check_leader!(
225            self,
226            request,
227            ProcedureDetailResponse,
228            "`procedure details`"
229        );
230
231        let ProcedureDetailRequest { header } = request.into_inner();
232        let _header = header.context(error::MissingRequestHeaderSnafu)?;
233        let metas = self
234            .procedure_manager()
235            .list_procedures()
236            .await
237            .context(error::QueryProcedureSnafu)?;
238        Ok(Response::new(procedure::procedure_details_to_pb_response(
239            metas,
240        )))
241    }
242}