1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21 MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22 QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23 ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use snafu::{OptionExt, ResultExt};
29use table::table_reference::TableReference;
30use tonic::Request;
31
32use crate::metasrv::Metasrv;
33use crate::procedure::region_migration::manager::{
34 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
35};
36use crate::service::GrpcResult;
37use crate::{check_leader, error};
38
39#[async_trait::async_trait]
40impl procedure_service_server::ProcedureService for Metasrv {
41 async fn query(
42 &self,
43 request: Request<QueryProcedureRequest>,
44 ) -> GrpcResult<ProcedureStateResponse> {
45 check_leader!(
46 self,
47 request,
48 ProcedureStateResponse,
49 "`query procedure state`"
50 );
51
52 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
53 let _header = header.context(error::MissingRequestHeaderSnafu)?;
54 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
55 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
56
57 let state = self
58 .procedure_manager()
59 .procedure_state(pid)
60 .await
61 .context(error::QueryProcedureSnafu)?
62 .context(error::ProcedureNotFoundSnafu {
63 pid: pid.to_string(),
64 })?;
65
66 Ok(Response::new(procedure::procedure_state_to_pb_response(
67 &state,
68 )))
69 }
70
71 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
72 check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
73
74 let PbDdlTaskRequest {
75 header,
76 query_context,
77 task,
78 ..
79 } = request.into_inner();
80
81 let header = header.context(error::MissingRequestHeaderSnafu)?;
82 let query_context = query_context
83 .context(error::MissingRequiredParameterSnafu {
84 param: "query_context",
85 })?
86 .into();
87 let task: DdlTask = task
88 .context(error::MissingRequiredParameterSnafu { param: "task" })?
89 .try_into()
90 .context(error::ConvertProtoDataSnafu)?;
91
92 let resp = self
93 .ddl_manager()
94 .submit_ddl_task(
95 &ExecutorContext {
96 tracing_context: Some(header.tracing_context),
97 },
98 SubmitDdlTaskRequest {
99 query_context: Arc::new(query_context),
100 task,
101 },
102 )
103 .await
104 .context(error::SubmitDdlTaskSnafu)?
105 .into();
106
107 Ok(Response::new(resp))
108 }
109
110 async fn migrate(
111 &self,
112 request: Request<MigrateRegionRequest>,
113 ) -> GrpcResult<MigrateRegionResponse> {
114 check_leader!(self, request, MigrateRegionResponse, "`migrate`");
115
116 let MigrateRegionRequest {
117 header,
118 region_id,
119 from_peer,
120 to_peer,
121 timeout_secs,
122 } = request.into_inner();
123
124 let _header = header.context(error::MissingRequestHeaderSnafu)?;
125 let from_peer = self
126 .lookup_datanode_peer(from_peer)
127 .await?
128 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
129 let to_peer = self
130 .lookup_datanode_peer(to_peer)
131 .await?
132 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
133
134 let pid = self
135 .region_migration_manager()
136 .submit_procedure(RegionMigrationProcedureTask {
137 region_id: region_id.into(),
138 from_peer,
139 to_peer,
140 timeout: Duration::from_secs(timeout_secs.into()),
141 trigger_reason: RegionMigrationTriggerReason::Manual,
142 })
143 .await?
144 .map(procedure::pid_to_pb_pid);
145
146 let resp = MigrateRegionResponse {
147 pid,
148 ..Default::default()
149 };
150
151 Ok(Response::new(resp))
152 }
153
154 async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
155 check_leader!(self, request, ReconcileResponse, "`reconcile`");
156
157 let ReconcileRequest { header, target } = request.into_inner();
158 let _header = header.context(error::MissingRequestHeaderSnafu)?;
159 let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
160 let parse_resolve_strategy = |resolve_strategy: i32| {
161 ResolveStrategy::try_from(resolve_strategy)
162 .ok()
163 .context(error::UnexpectedSnafu {
164 violated: format!("Invalid resolve strategy: {}", resolve_strategy),
165 })
166 };
167 let procedure_id = match target {
168 Target::ReconcileTable(table) => {
169 let ReconcileTable {
170 catalog_name,
171 schema_name,
172 table_name,
173 resolve_strategy,
174 } = table;
175 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
176 let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
177 self.reconciliation_manager()
178 .reconcile_table(table_ref, resolve_strategy.into())
179 .await
180 .context(error::SubmitReconcileProcedureSnafu)?
181 }
182 Target::ReconcileDatabase(database) => {
183 let ReconcileDatabase {
184 catalog_name,
185 database_name,
186 resolve_strategy,
187 parallelism,
188 } = database;
189 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
190 self.reconciliation_manager()
191 .reconcile_database(
192 catalog_name,
193 database_name,
194 resolve_strategy.into(),
195 parallelism as usize,
196 )
197 .await
198 .context(error::SubmitReconcileProcedureSnafu)?
199 }
200 Target::ReconcileCatalog(catalog) => {
201 let ReconcileCatalog {
202 catalog_name,
203 resolve_strategy,
204 parallelism,
205 } = catalog;
206 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
207 self.reconciliation_manager()
208 .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
209 .await
210 .context(error::SubmitReconcileProcedureSnafu)?
211 }
212 };
213 Ok(Response::new(ReconcileResponse {
214 pid: Some(procedure::pid_to_pb_pid(procedure_id)),
215 ..Default::default()
216 }))
217 }
218
219 async fn details(
220 &self,
221 request: Request<ProcedureDetailRequest>,
222 ) -> GrpcResult<ProcedureDetailResponse> {
223 check_leader!(
224 self,
225 request,
226 ProcedureDetailResponse,
227 "`procedure details`"
228 );
229
230 let ProcedureDetailRequest { header } = request.into_inner();
231 let _header = header.context(error::MissingRequestHeaderSnafu)?;
232 let metas = self
233 .procedure_manager()
234 .list_procedures()
235 .await
236 .context(error::QueryProcedureSnafu)?;
237 Ok(Response::new(procedure::procedure_details_to_pb_response(
238 metas,
239 )))
240 }
241}