meta_srv/service/
procedure.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::{
19 procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
20 DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
21 ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
22 ResponseHeader,
23};
24use common_meta::ddl::ExecutorContext;
25use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
26use common_meta::rpc::procedure;
27use common_telemetry::warn;
28use snafu::{OptionExt, ResultExt};
29use tonic::{Request, Response};
30
31use crate::error;
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::RegionMigrationProcedureTask;
34use crate::service::GrpcResult;
35
36#[async_trait::async_trait]
37impl procedure_service_server::ProcedureService for Metasrv {
38 async fn query(
39 &self,
40 request: Request<QueryProcedureRequest>,
41 ) -> GrpcResult<ProcedureStateResponse> {
42 if !self.is_leader() {
43 let resp = ProcedureStateResponse {
44 header: Some(ResponseHeader::failed(Error::is_not_leader())),
45 ..Default::default()
46 };
47
48 warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
49 return Ok(Response::new(resp));
50 }
51
52 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53 let _header = header.context(error::MissingRequestHeaderSnafu)?;
54 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57 let state = self
58 .procedure_manager()
59 .procedure_state(pid)
60 .await
61 .context(error::QueryProcedureSnafu)?
62 .context(error::ProcedureNotFoundSnafu {
63 pid: pid.to_string(),
64 })?;
65
66 Ok(Response::new(procedure::procedure_state_to_pb_response(
67 &state,
68 )))
69 }
70
71 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72 if !self.is_leader() {
73 let resp = PbDdlTaskResponse {
74 header: Some(ResponseHeader::failed(Error::is_not_leader())),
75 ..Default::default()
76 };
77
78 warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
79 return Ok(Response::new(resp));
80 }
81
82 let PbDdlTaskRequest {
83 header,
84 query_context,
85 task,
86 ..
87 } = request.into_inner();
88
89 let header = header.context(error::MissingRequestHeaderSnafu)?;
90 let query_context = query_context
91 .context(error::MissingRequiredParameterSnafu {
92 param: "query_context",
93 })?
94 .into();
95 let task: DdlTask = task
96 .context(error::MissingRequiredParameterSnafu { param: "task" })?
97 .try_into()
98 .context(error::ConvertProtoDataSnafu)?;
99
100 let resp = self
101 .procedure_executor()
102 .submit_ddl_task(
103 &ExecutorContext {
104 tracing_context: Some(header.tracing_context),
105 },
106 SubmitDdlTaskRequest {
107 query_context: Arc::new(query_context),
108 task,
109 },
110 )
111 .await
112 .context(error::SubmitDdlTaskSnafu)?
113 .into();
114
115 Ok(Response::new(resp))
116 }
117
118 async fn migrate(
119 &self,
120 request: Request<MigrateRegionRequest>,
121 ) -> GrpcResult<MigrateRegionResponse> {
122 if !self.is_leader() {
123 let resp = MigrateRegionResponse {
124 header: Some(ResponseHeader::failed(Error::is_not_leader())),
125 ..Default::default()
126 };
127
128 warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
129 return Ok(Response::new(resp));
130 }
131
132 let MigrateRegionRequest {
133 header,
134 region_id,
135 from_peer,
136 to_peer,
137 timeout_secs,
138 } = request.into_inner();
139
140 let _header = header.context(error::MissingRequestHeaderSnafu)?;
141 let from_peer = self
142 .lookup_peer(from_peer)
143 .await?
144 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
145 let to_peer = self
146 .lookup_peer(to_peer)
147 .await?
148 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
149
150 let pid = self
151 .region_migration_manager()
152 .submit_procedure(RegionMigrationProcedureTask {
153 region_id: region_id.into(),
154 from_peer,
155 to_peer,
156 timeout: Duration::from_secs(timeout_secs.into()),
157 })
158 .await?
159 .map(procedure::pid_to_pb_pid);
160
161 let resp = MigrateRegionResponse {
162 pid,
163 ..Default::default()
164 };
165
166 Ok(Response::new(resp))
167 }
168
169 async fn details(
170 &self,
171 request: Request<ProcedureDetailRequest>,
172 ) -> GrpcResult<ProcedureDetailResponse> {
173 if !self.is_leader() {
174 let resp = ProcedureDetailResponse {
175 header: Some(ResponseHeader::failed(Error::is_not_leader())),
176 ..Default::default()
177 };
178
179 warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
180 return Ok(Response::new(resp));
181 }
182
183 let ProcedureDetailRequest { header } = request.into_inner();
184 let _header = header.context(error::MissingRequestHeaderSnafu)?;
185 let metas = self
186 .procedure_manager()
187 .list_procedures()
188 .await
189 .context(error::QueryProcedureSnafu)?;
190 Ok(Response::new(procedure::procedure_details_to_pb_response(
191 metas,
192 )))
193 }
194}