meta_srv/service/
procedure.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::{
19 procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
20 DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
21 ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
22 ResponseHeader,
23};
24use common_meta::ddl::ExecutorContext;
25use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
26use common_meta::rpc::procedure;
27use common_telemetry::warn;
28use snafu::{OptionExt, ResultExt};
29use tonic::{Request, Response};
30
31use crate::error;
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37
38#[async_trait::async_trait]
39impl procedure_service_server::ProcedureService for Metasrv {
40 async fn query(
41 &self,
42 request: Request<QueryProcedureRequest>,
43 ) -> GrpcResult<ProcedureStateResponse> {
44 if !self.is_leader() {
45 let resp = ProcedureStateResponse {
46 header: Some(ResponseHeader::failed(Error::is_not_leader())),
47 ..Default::default()
48 };
49
50 warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
51 return Ok(Response::new(resp));
52 }
53
54 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
55 let _header = header.context(error::MissingRequestHeaderSnafu)?;
56 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
57 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
58
59 let state = self
60 .procedure_manager()
61 .procedure_state(pid)
62 .await
63 .context(error::QueryProcedureSnafu)?
64 .context(error::ProcedureNotFoundSnafu {
65 pid: pid.to_string(),
66 })?;
67
68 Ok(Response::new(procedure::procedure_state_to_pb_response(
69 &state,
70 )))
71 }
72
73 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
74 if !self.is_leader() {
75 let resp = PbDdlTaskResponse {
76 header: Some(ResponseHeader::failed(Error::is_not_leader())),
77 ..Default::default()
78 };
79
80 warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
81 return Ok(Response::new(resp));
82 }
83
84 let PbDdlTaskRequest {
85 header,
86 query_context,
87 task,
88 ..
89 } = request.into_inner();
90
91 let header = header.context(error::MissingRequestHeaderSnafu)?;
92 let query_context = query_context
93 .context(error::MissingRequiredParameterSnafu {
94 param: "query_context",
95 })?
96 .into();
97 let task: DdlTask = task
98 .context(error::MissingRequiredParameterSnafu { param: "task" })?
99 .try_into()
100 .context(error::ConvertProtoDataSnafu)?;
101
102 let resp = self
103 .procedure_executor()
104 .submit_ddl_task(
105 &ExecutorContext {
106 tracing_context: Some(header.tracing_context),
107 },
108 SubmitDdlTaskRequest {
109 query_context: Arc::new(query_context),
110 task,
111 },
112 )
113 .await
114 .context(error::SubmitDdlTaskSnafu)?
115 .into();
116
117 Ok(Response::new(resp))
118 }
119
120 async fn migrate(
121 &self,
122 request: Request<MigrateRegionRequest>,
123 ) -> GrpcResult<MigrateRegionResponse> {
124 if !self.is_leader() {
125 let resp = MigrateRegionResponse {
126 header: Some(ResponseHeader::failed(Error::is_not_leader())),
127 ..Default::default()
128 };
129
130 warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
131 return Ok(Response::new(resp));
132 }
133
134 let MigrateRegionRequest {
135 header,
136 region_id,
137 from_peer,
138 to_peer,
139 timeout_secs,
140 } = request.into_inner();
141
142 let _header = header.context(error::MissingRequestHeaderSnafu)?;
143 let from_peer = self
144 .lookup_datanode_peer(from_peer)
145 .await?
146 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
147 let to_peer = self
148 .lookup_datanode_peer(to_peer)
149 .await?
150 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
151
152 let pid = self
153 .region_migration_manager()
154 .submit_procedure(RegionMigrationProcedureTask {
155 region_id: region_id.into(),
156 from_peer,
157 to_peer,
158 timeout: Duration::from_secs(timeout_secs.into()),
159 trigger_reason: RegionMigrationTriggerReason::Manual,
160 })
161 .await?
162 .map(procedure::pid_to_pb_pid);
163
164 let resp = MigrateRegionResponse {
165 pid,
166 ..Default::default()
167 };
168
169 Ok(Response::new(resp))
170 }
171
172 async fn details(
173 &self,
174 request: Request<ProcedureDetailRequest>,
175 ) -> GrpcResult<ProcedureDetailResponse> {
176 if !self.is_leader() {
177 let resp = ProcedureDetailResponse {
178 header: Some(ResponseHeader::failed(Error::is_not_leader())),
179 ..Default::default()
180 };
181
182 warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
183 return Ok(Response::new(resp));
184 }
185
186 let ProcedureDetailRequest { header } = request.into_inner();
187 let _header = header.context(error::MissingRequestHeaderSnafu)?;
188 let metas = self
189 .procedure_manager()
190 .list_procedures()
191 .await
192 .context(error::QueryProcedureSnafu)?;
193 Ok(Response::new(procedure::procedure_details_to_pb_response(
194 metas,
195 )))
196 }
197}