meta_srv/service/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::{
19    procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
20    DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
21    ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
22    ResponseHeader,
23};
24use common_meta::ddl::ExecutorContext;
25use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
26use common_meta::rpc::procedure;
27use common_telemetry::warn;
28use snafu::{OptionExt, ResultExt};
29use tonic::{Request, Response};
30
31use crate::error;
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34    RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37
38#[async_trait::async_trait]
39impl procedure_service_server::ProcedureService for Metasrv {
40    async fn query(
41        &self,
42        request: Request<QueryProcedureRequest>,
43    ) -> GrpcResult<ProcedureStateResponse> {
44        if !self.is_leader() {
45            let resp = ProcedureStateResponse {
46                header: Some(ResponseHeader::failed(Error::is_not_leader())),
47                ..Default::default()
48            };
49
50            warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
51            return Ok(Response::new(resp));
52        }
53
54        let QueryProcedureRequest { header, pid, .. } = request.into_inner();
55        let _header = header.context(error::MissingRequestHeaderSnafu)?;
56        let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
57        let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
58
59        let state = self
60            .procedure_manager()
61            .procedure_state(pid)
62            .await
63            .context(error::QueryProcedureSnafu)?
64            .context(error::ProcedureNotFoundSnafu {
65                pid: pid.to_string(),
66            })?;
67
68        Ok(Response::new(procedure::procedure_state_to_pb_response(
69            &state,
70        )))
71    }
72
73    async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
74        if !self.is_leader() {
75            let resp = PbDdlTaskResponse {
76                header: Some(ResponseHeader::failed(Error::is_not_leader())),
77                ..Default::default()
78            };
79
80            warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
81            return Ok(Response::new(resp));
82        }
83
84        let PbDdlTaskRequest {
85            header,
86            query_context,
87            task,
88            ..
89        } = request.into_inner();
90
91        let header = header.context(error::MissingRequestHeaderSnafu)?;
92        let query_context = query_context
93            .context(error::MissingRequiredParameterSnafu {
94                param: "query_context",
95            })?
96            .into();
97        let task: DdlTask = task
98            .context(error::MissingRequiredParameterSnafu { param: "task" })?
99            .try_into()
100            .context(error::ConvertProtoDataSnafu)?;
101
102        let resp = self
103            .procedure_executor()
104            .submit_ddl_task(
105                &ExecutorContext {
106                    tracing_context: Some(header.tracing_context),
107                },
108                SubmitDdlTaskRequest {
109                    query_context: Arc::new(query_context),
110                    task,
111                },
112            )
113            .await
114            .context(error::SubmitDdlTaskSnafu)?
115            .into();
116
117        Ok(Response::new(resp))
118    }
119
120    async fn migrate(
121        &self,
122        request: Request<MigrateRegionRequest>,
123    ) -> GrpcResult<MigrateRegionResponse> {
124        if !self.is_leader() {
125            let resp = MigrateRegionResponse {
126                header: Some(ResponseHeader::failed(Error::is_not_leader())),
127                ..Default::default()
128            };
129
130            warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
131            return Ok(Response::new(resp));
132        }
133
134        let MigrateRegionRequest {
135            header,
136            region_id,
137            from_peer,
138            to_peer,
139            timeout_secs,
140        } = request.into_inner();
141
142        let _header = header.context(error::MissingRequestHeaderSnafu)?;
143        let from_peer = self
144            .lookup_datanode_peer(from_peer)
145            .await?
146            .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
147        let to_peer = self
148            .lookup_datanode_peer(to_peer)
149            .await?
150            .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
151
152        let pid = self
153            .region_migration_manager()
154            .submit_procedure(RegionMigrationProcedureTask {
155                region_id: region_id.into(),
156                from_peer,
157                to_peer,
158                timeout: Duration::from_secs(timeout_secs.into()),
159                trigger_reason: RegionMigrationTriggerReason::Manual,
160            })
161            .await?
162            .map(procedure::pid_to_pb_pid);
163
164        let resp = MigrateRegionResponse {
165            pid,
166            ..Default::default()
167        };
168
169        Ok(Response::new(resp))
170    }
171
172    async fn details(
173        &self,
174        request: Request<ProcedureDetailRequest>,
175    ) -> GrpcResult<ProcedureDetailResponse> {
176        if !self.is_leader() {
177            let resp = ProcedureDetailResponse {
178                header: Some(ResponseHeader::failed(Error::is_not_leader())),
179                ..Default::default()
180            };
181
182            warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
183            return Ok(Response::new(resp));
184        }
185
186        let ProcedureDetailRequest { header } = request.into_inner();
187        let _header = header.context(error::MissingRequestHeaderSnafu)?;
188        let metas = self
189            .procedure_manager()
190            .list_procedures()
191            .await
192            .context(error::QueryProcedureSnafu)?;
193        Ok(Response::new(procedure::procedure_details_to_pb_response(
194            metas,
195        )))
196    }
197}