1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21 MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22 QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23 ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use snafu::{OptionExt, ResultExt};
29use table::table_reference::TableReference;
30use tonic::Request;
31
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37use crate::{check_leader, error};
38
39#[async_trait::async_trait]
40impl procedure_service_server::ProcedureService for Metasrv {
41 async fn query(
42 &self,
43 request: Request<QueryProcedureRequest>,
44 ) -> GrpcResult<ProcedureStateResponse> {
45 check_leader!(
46 self,
47 request,
48 ProcedureStateResponse,
49 "`query procedure state`"
50 );
51
52 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53 let _header = header.context(error::MissingRequestHeaderSnafu)?;
54 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57 let state = self
58 .procedure_manager()
59 .procedure_state(pid)
60 .await
61 .context(error::QueryProcedureSnafu)?
62 .context(error::ProcedureNotFoundSnafu {
63 pid: pid.to_string(),
64 })?;
65
66 Ok(Response::new(procedure::procedure_state_to_pb_response(
67 &state,
68 )))
69 }
70
71 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72 check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
73
74 let PbDdlTaskRequest {
75 header,
76 query_context,
77 task,
78 wait,
79 timeout_secs,
80 } = request.into_inner();
81
82 let header = header.context(error::MissingRequestHeaderSnafu)?;
83 let query_context = query_context
84 .context(error::MissingRequiredParameterSnafu {
85 param: "query_context",
86 })?
87 .into();
88 let task: DdlTask = task
89 .context(error::MissingRequiredParameterSnafu { param: "task" })?
90 .try_into()
91 .context(error::ConvertProtoDataSnafu)?;
92
93 let resp = self
94 .ddl_manager()
95 .submit_ddl_task(
96 &ExecutorContext {
97 tracing_context: Some(header.tracing_context),
98 },
99 SubmitDdlTaskRequest {
100 query_context: Arc::new(query_context),
101 wait,
102 timeout: Duration::from_secs(timeout_secs.into()),
103 task,
104 },
105 )
106 .await
107 .context(error::SubmitDdlTaskSnafu)?
108 .into();
109
110 Ok(Response::new(resp))
111 }
112
113 async fn migrate(
114 &self,
115 request: Request<MigrateRegionRequest>,
116 ) -> GrpcResult<MigrateRegionResponse> {
117 check_leader!(self, request, MigrateRegionResponse, "`migrate`");
118
119 let MigrateRegionRequest {
120 header,
121 region_id,
122 from_peer,
123 to_peer,
124 timeout_secs,
125 } = request.into_inner();
126
127 let _header = header.context(error::MissingRequestHeaderSnafu)?;
128 let from_peer = self
129 .lookup_datanode_peer(from_peer)
130 .await?
131 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
132 let to_peer = self
133 .lookup_datanode_peer(to_peer)
134 .await?
135 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
136
137 let pid = self
138 .region_migration_manager()
139 .submit_procedure(RegionMigrationProcedureTask {
140 region_id: region_id.into(),
141 from_peer,
142 to_peer,
143 timeout: Duration::from_secs(timeout_secs.into()),
144 trigger_reason: RegionMigrationTriggerReason::Manual,
145 })
146 .await?
147 .map(procedure::pid_to_pb_pid);
148
149 let resp = MigrateRegionResponse {
150 pid,
151 ..Default::default()
152 };
153
154 Ok(Response::new(resp))
155 }
156
157 async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
158 check_leader!(self, request, ReconcileResponse, "`reconcile`");
159
160 let ReconcileRequest { header, target } = request.into_inner();
161 let _header = header.context(error::MissingRequestHeaderSnafu)?;
162 let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
163 let parse_resolve_strategy = |resolve_strategy: i32| {
164 ResolveStrategy::try_from(resolve_strategy)
165 .ok()
166 .context(error::UnexpectedSnafu {
167 violated: format!("Invalid resolve strategy: {}", resolve_strategy),
168 })
169 };
170 let procedure_id = match target {
171 Target::ReconcileTable(table) => {
172 let ReconcileTable {
173 catalog_name,
174 schema_name,
175 table_name,
176 resolve_strategy,
177 } = table;
178 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
179 let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
180 self.reconciliation_manager()
181 .reconcile_table(table_ref, resolve_strategy.into())
182 .await
183 .context(error::SubmitReconcileProcedureSnafu)?
184 }
185 Target::ReconcileDatabase(database) => {
186 let ReconcileDatabase {
187 catalog_name,
188 database_name,
189 resolve_strategy,
190 parallelism,
191 } = database;
192 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
193 self.reconciliation_manager()
194 .reconcile_database(
195 catalog_name,
196 database_name,
197 resolve_strategy.into(),
198 parallelism as usize,
199 )
200 .await
201 .context(error::SubmitReconcileProcedureSnafu)?
202 }
203 Target::ReconcileCatalog(catalog) => {
204 let ReconcileCatalog {
205 catalog_name,
206 resolve_strategy,
207 parallelism,
208 } = catalog;
209 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
210 self.reconciliation_manager()
211 .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
212 .await
213 .context(error::SubmitReconcileProcedureSnafu)?
214 }
215 };
216 Ok(Response::new(ReconcileResponse {
217 pid: Some(procedure::pid_to_pb_pid(procedure_id)),
218 ..Default::default()
219 }))
220 }
221
222 async fn details(
223 &self,
224 request: Request<ProcedureDetailRequest>,
225 ) -> GrpcResult<ProcedureDetailResponse> {
226 check_leader!(
227 self,
228 request,
229 ProcedureDetailResponse,
230 "`procedure details`"
231 );
232
233 let ProcedureDetailRequest { header } = request.into_inner();
234 let _header = header.context(error::MissingRequestHeaderSnafu)?;
235 let metas = self
236 .procedure_manager()
237 .list_procedures()
238 .await
239 .context(error::QueryProcedureSnafu)?;
240 Ok(Response::new(procedure::procedure_details_to_pb_response(
241 metas,
242 )))
243 }
244}