1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20 procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
21 DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22 ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
23 ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable,
24 ResolveStrategy,
25};
26use common_meta::procedure_executor::ExecutorContext;
27use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
28use common_meta::rpc::procedure;
29use common_telemetry::warn;
30use snafu::{OptionExt, ResultExt};
31use table::table_reference::TableReference;
32use tonic::Request;
33
34use crate::metasrv::Metasrv;
35use crate::procedure::region_migration::manager::{
36 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
37};
38use crate::service::GrpcResult;
39use crate::{check_leader, error};
40
41#[async_trait::async_trait]
42impl procedure_service_server::ProcedureService for Metasrv {
43 async fn query(
44 &self,
45 request: Request<QueryProcedureRequest>,
46 ) -> GrpcResult<ProcedureStateResponse> {
47 check_leader!(
48 self,
49 request,
50 ProcedureStateResponse,
51 "`query procedure state`"
52 );
53
54 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
55 let _header = header.context(error::MissingRequestHeaderSnafu)?;
56 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
57 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
58
59 let state = self
60 .procedure_manager()
61 .procedure_state(pid)
62 .await
63 .context(error::QueryProcedureSnafu)?
64 .context(error::ProcedureNotFoundSnafu {
65 pid: pid.to_string(),
66 })?;
67
68 Ok(Response::new(procedure::procedure_state_to_pb_response(
69 &state,
70 )))
71 }
72
73 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
74 check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
75
76 let PbDdlTaskRequest {
77 header,
78 query_context,
79 task,
80 ..
81 } = request.into_inner();
82
83 let header = header.context(error::MissingRequestHeaderSnafu)?;
84 let query_context = query_context
85 .context(error::MissingRequiredParameterSnafu {
86 param: "query_context",
87 })?
88 .into();
89 let task: DdlTask = task
90 .context(error::MissingRequiredParameterSnafu { param: "task" })?
91 .try_into()
92 .context(error::ConvertProtoDataSnafu)?;
93
94 let resp = self
95 .ddl_manager()
96 .submit_ddl_task(
97 &ExecutorContext {
98 tracing_context: Some(header.tracing_context),
99 },
100 SubmitDdlTaskRequest {
101 query_context: Arc::new(query_context),
102 task,
103 },
104 )
105 .await
106 .context(error::SubmitDdlTaskSnafu)?
107 .into();
108
109 Ok(Response::new(resp))
110 }
111
112 async fn migrate(
113 &self,
114 request: Request<MigrateRegionRequest>,
115 ) -> GrpcResult<MigrateRegionResponse> {
116 check_leader!(self, request, MigrateRegionResponse, "`migrate`");
117
118 let MigrateRegionRequest {
119 header,
120 region_id,
121 from_peer,
122 to_peer,
123 timeout_secs,
124 } = request.into_inner();
125
126 let _header = header.context(error::MissingRequestHeaderSnafu)?;
127 let from_peer = self
128 .lookup_datanode_peer(from_peer)
129 .await?
130 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
131 let to_peer = self
132 .lookup_datanode_peer(to_peer)
133 .await?
134 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
135
136 let pid = self
137 .region_migration_manager()
138 .submit_procedure(RegionMigrationProcedureTask {
139 region_id: region_id.into(),
140 from_peer,
141 to_peer,
142 timeout: Duration::from_secs(timeout_secs.into()),
143 trigger_reason: RegionMigrationTriggerReason::Manual,
144 })
145 .await?
146 .map(procedure::pid_to_pb_pid);
147
148 let resp = MigrateRegionResponse {
149 pid,
150 ..Default::default()
151 };
152
153 Ok(Response::new(resp))
154 }
155
156 async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
157 check_leader!(self, request, ReconcileResponse, "`reconcile`");
158
159 let ReconcileRequest { header, target } = request.into_inner();
160 let _header = header.context(error::MissingRequestHeaderSnafu)?;
161 let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
162 let parse_resolve_strategy = |resolve_strategy: i32| {
163 ResolveStrategy::try_from(resolve_strategy)
164 .ok()
165 .context(error::UnexpectedSnafu {
166 violated: format!("Invalid resolve strategy: {}", resolve_strategy),
167 })
168 };
169 let procedure_id = match target {
170 Target::ReconcileTable(table) => {
171 let ReconcileTable {
172 catalog_name,
173 schema_name,
174 table_name,
175 resolve_strategy,
176 } = table;
177 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
178 let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
179 self.reconciliation_manager()
180 .reconcile_table(table_ref, resolve_strategy.into())
181 .await
182 .context(error::SubmitReconcileProcedureSnafu)?
183 }
184 Target::ReconcileDatabase(database) => {
185 let ReconcileDatabase {
186 catalog_name,
187 database_name,
188 resolve_strategy,
189 parallelism,
190 } = database;
191 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
192 self.reconciliation_manager()
193 .reconcile_database(
194 catalog_name,
195 database_name,
196 resolve_strategy.into(),
197 parallelism as usize,
198 )
199 .await
200 .context(error::SubmitReconcileProcedureSnafu)?
201 }
202 Target::ReconcileCatalog(catalog) => {
203 let ReconcileCatalog {
204 catalog_name,
205 resolve_strategy,
206 parallelism,
207 } = catalog;
208 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
209 self.reconciliation_manager()
210 .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
211 .await
212 .context(error::SubmitReconcileProcedureSnafu)?
213 }
214 };
215 Ok(Response::new(ReconcileResponse {
216 pid: Some(procedure::pid_to_pb_pid(procedure_id)),
217 ..Default::default()
218 }))
219 }
220
221 async fn details(
222 &self,
223 request: Request<ProcedureDetailRequest>,
224 ) -> GrpcResult<ProcedureDetailResponse> {
225 check_leader!(
226 self,
227 request,
228 ProcedureDetailResponse,
229 "`procedure details`"
230 );
231
232 let ProcedureDetailRequest { header } = request.into_inner();
233 let _header = header.context(error::MissingRequestHeaderSnafu)?;
234 let metas = self
235 .procedure_manager()
236 .list_procedures()
237 .await
238 .context(error::QueryProcedureSnafu)?;
239 Ok(Response::new(procedure::procedure_details_to_pb_response(
240 metas,
241 )))
242 }
243}