meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::{
19    procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
20    DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
21    ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
22    ResponseHeader,
23};
24use common_meta::ddl::ExecutorContext;
25use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
26use common_meta::rpc::procedure;
27use common_telemetry::warn;
28use snafu::{OptionExt, ResultExt};
29use tonic::{Request, Response};
30
31use crate::error;
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::RegionMigrationProcedureTask;
34use crate::service::GrpcResult;
35
36#[async_trait::async_trait]
37impl procedure_service_server::ProcedureService for Metasrv {
38    async fn query(
39        &self,
40        request: Request<QueryProcedureRequest>,
41    ) -> GrpcResult<ProcedureStateResponse> {
42        if !self.is_leader() {
43            let resp = ProcedureStateResponse {
44                header: Some(ResponseHeader::failed(Error::is_not_leader())),
45                ..Default::default()
46            };
47
48            warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
49            return Ok(Response::new(resp));
50        }
51
52        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53        let _header = header.context(error::MissingRequestHeaderSnafu)?;
54        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57        let state = self
58            .procedure_manager()
59            .procedure_state(pid)
60            .await
61            .context(error::QueryProcedureSnafu)?
62            .context(error::ProcedureNotFoundSnafu {
63                pid: pid.to_string(),
64            })?;
65
66        Ok(Response::new(procedure::procedure_state_to_pb_response(
67            &state,
68        )))
69    }
70
71    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72        if !self.is_leader() {
73            let resp = PbDdlTaskResponse {
74                header: Some(ResponseHeader::failed(Error::is_not_leader())),
75                ..Default::default()
76            };
77
78            warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
79            return Ok(Response::new(resp));
80        }
81
82        let PbDdlTaskRequest {
83            header,
84            query_context,
85            task,
86            ..
87        } = request.into_inner();
88
89        let header = header.context(error::MissingRequestHeaderSnafu)?;
90        let query_context = query_context
91            .context(error::MissingRequiredParameterSnafu {
92                param: "query_context",
93            })?
94            .into();
95        let task: DdlTask = task
96            .context(error::MissingRequiredParameterSnafu { param: "task" })?
97            .try_into()
98            .context(error::ConvertProtoDataSnafu)?;
99
100        let resp = self
101            .procedure_executor()
102            .submit_ddl_task(
103                &ExecutorContext {
104                    tracing_context: Some(header.tracing_context),
105                },
106                SubmitDdlTaskRequest {
107                    query_context: Arc::new(query_context),
108                    task,
109                },
110            )
111            .await
112            .context(error::SubmitDdlTaskSnafu)?
113            .into();
114
115        Ok(Response::new(resp))
116    }
117
118    async fn migrate(
119        &self,
120        request: Request<MigrateRegionRequest>,
121    ) -> GrpcResult<MigrateRegionResponse> {
122        if !self.is_leader() {
123            let resp = MigrateRegionResponse {
124                header: Some(ResponseHeader::failed(Error::is_not_leader())),
125                ..Default::default()
126            };
127
128            warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
129            return Ok(Response::new(resp));
130        }
131
132        let MigrateRegionRequest {
133            header,
134            region_id,
135            from_peer,
136            to_peer,
137            timeout_secs,
138        } = request.into_inner();
139
140        let _header = header.context(error::MissingRequestHeaderSnafu)?;
141        let from_peer = self
142            .lookup_peer(from_peer)
143            .await?
144            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
145        let to_peer = self
146            .lookup_peer(to_peer)
147            .await?
148            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
149
150        let pid = self
151            .region_migration_manager()
152            .submit_procedure(RegionMigrationProcedureTask {
153                region_id: region_id.into(),
154                from_peer,
155                to_peer,
156                timeout: Duration::from_secs(timeout_secs.into()),
157            })
158            .await?
159            .map(procedure::pid_to_pb_pid);
160
161        let resp = MigrateRegionResponse {
162            pid,
163            ..Default::default()
164        };
165
166        Ok(Response::new(resp))
167    }
168
169    async fn details(
170        &self,
171        request: Request<ProcedureDetailRequest>,
172    ) -> GrpcResult<ProcedureDetailResponse> {
173        if !self.is_leader() {
174            let resp = ProcedureDetailResponse {
175                header: Some(ResponseHeader::failed(Error::is_not_leader())),
176                ..Default::default()
177            };
178
179            warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
180            return Ok(Response::new(resp));
181        }
182
183        let ProcedureDetailRequest { header } = request.into_inner();
184        let _header = header.context(error::MissingRequestHeaderSnafu)?;
185        let metas = self
186            .procedure_manager()
187            .list_procedures()
188            .await
189            .context(error::QueryProcedureSnafu)?;
190        Ok(Response::new(procedure::procedure_details_to_pb_response(
191            metas,
192        )))
193    }
194}