1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::reconcile_request::Target;
19use api::v1::meta::{
20 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
21 MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
22 QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
23 ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
24};
25use common_meta::procedure_executor::ExecutorContext;
26use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
27use common_meta::rpc::procedure;
28use common_telemetry::warn;
29use snafu::{OptionExt, ResultExt};
30use table::table_reference::TableReference;
31use tonic::Request;
32
33use crate::metasrv::Metasrv;
34use crate::procedure::region_migration::manager::{
35 RegionMigrationProcedureTask, RegionMigrationTriggerReason,
36};
37use crate::service::GrpcResult;
38use crate::{check_leader, error};
39
40#[async_trait::async_trait]
41impl procedure_service_server::ProcedureService for Metasrv {
42 async fn query(
43 &self,
44 request: Request<QueryProcedureRequest>,
45 ) -> GrpcResult<ProcedureStateResponse> {
46 check_leader!(
47 self,
48 request,
49 ProcedureStateResponse,
50 "`query procedure state`"
51 );
52
53 let QueryProcedureRequest { header, pid, .. } = request.into_inner();
54 let _header = header.context(error::MissingRequestHeaderSnafu)?;
55 let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
56 let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?;
57
58 let state = self
59 .procedure_manager()
60 .procedure_state(pid)
61 .await
62 .context(error::QueryProcedureSnafu)?
63 .context(error::ProcedureNotFoundSnafu {
64 pid: pid.to_string(),
65 })?;
66
67 Ok(Response::new(procedure::procedure_state_to_pb_response(
68 &state,
69 )))
70 }
71
72 async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
73 check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
74
75 let PbDdlTaskRequest {
76 header,
77 query_context,
78 task,
79 ..
80 } = request.into_inner();
81
82 let header = header.context(error::MissingRequestHeaderSnafu)?;
83 let query_context = query_context
84 .context(error::MissingRequiredParameterSnafu {
85 param: "query_context",
86 })?
87 .into();
88 let task: DdlTask = task
89 .context(error::MissingRequiredParameterSnafu { param: "task" })?
90 .try_into()
91 .context(error::ConvertProtoDataSnafu)?;
92
93 let resp = self
94 .ddl_manager()
95 .submit_ddl_task(
96 &ExecutorContext {
97 tracing_context: Some(header.tracing_context),
98 },
99 SubmitDdlTaskRequest {
100 query_context: Arc::new(query_context),
101 task,
102 },
103 )
104 .await
105 .context(error::SubmitDdlTaskSnafu)?
106 .into();
107
108 Ok(Response::new(resp))
109 }
110
111 async fn migrate(
112 &self,
113 request: Request<MigrateRegionRequest>,
114 ) -> GrpcResult<MigrateRegionResponse> {
115 check_leader!(self, request, MigrateRegionResponse, "`migrate`");
116
117 let MigrateRegionRequest {
118 header,
119 region_id,
120 from_peer,
121 to_peer,
122 timeout_secs,
123 } = request.into_inner();
124
125 let _header = header.context(error::MissingRequestHeaderSnafu)?;
126 let from_peer = self
127 .lookup_datanode_peer(from_peer)
128 .await?
129 .context(error::PeerUnavailableSnafu { peer_id: from_peer })?;
130 let to_peer = self
131 .lookup_datanode_peer(to_peer)
132 .await?
133 .context(error::PeerUnavailableSnafu { peer_id: to_peer })?;
134
135 let pid = self
136 .region_migration_manager()
137 .submit_procedure(RegionMigrationProcedureTask {
138 region_id: region_id.into(),
139 from_peer,
140 to_peer,
141 timeout: Duration::from_secs(timeout_secs.into()),
142 trigger_reason: RegionMigrationTriggerReason::Manual,
143 })
144 .await?
145 .map(procedure::pid_to_pb_pid);
146
147 let resp = MigrateRegionResponse {
148 pid,
149 ..Default::default()
150 };
151
152 Ok(Response::new(resp))
153 }
154
155 async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
156 check_leader!(self, request, ReconcileResponse, "`reconcile`");
157
158 let ReconcileRequest { header, target } = request.into_inner();
159 let _header = header.context(error::MissingRequestHeaderSnafu)?;
160 let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
161 let parse_resolve_strategy = |resolve_strategy: i32| {
162 ResolveStrategy::try_from(resolve_strategy)
163 .ok()
164 .context(error::UnexpectedSnafu {
165 violated: format!("Invalid resolve strategy: {}", resolve_strategy),
166 })
167 };
168 let procedure_id = match target {
169 Target::ReconcileTable(table) => {
170 let ReconcileTable {
171 catalog_name,
172 schema_name,
173 table_name,
174 resolve_strategy,
175 } = table;
176 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
177 let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
178 self.reconciliation_manager()
179 .reconcile_table(table_ref, resolve_strategy.into())
180 .await
181 .context(error::SubmitReconcileProcedureSnafu)?
182 }
183 Target::ReconcileDatabase(database) => {
184 let ReconcileDatabase {
185 catalog_name,
186 database_name,
187 resolve_strategy,
188 parallelism,
189 } = database;
190 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
191 self.reconciliation_manager()
192 .reconcile_database(
193 catalog_name,
194 database_name,
195 resolve_strategy.into(),
196 parallelism as usize,
197 )
198 .await
199 .context(error::SubmitReconcileProcedureSnafu)?
200 }
201 Target::ReconcileCatalog(catalog) => {
202 let ReconcileCatalog {
203 catalog_name,
204 resolve_strategy,
205 parallelism,
206 } = catalog;
207 let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
208 self.reconciliation_manager()
209 .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize)
210 .await
211 .context(error::SubmitReconcileProcedureSnafu)?
212 }
213 };
214 Ok(Response::new(ReconcileResponse {
215 pid: Some(procedure::pid_to_pb_pid(procedure_id)),
216 ..Default::default()
217 }))
218 }
219
220 async fn details(
221 &self,
222 request: Request<ProcedureDetailRequest>,
223 ) -> GrpcResult<ProcedureDetailResponse> {
224 check_leader!(
225 self,
226 request,
227 ProcedureDetailResponse,
228 "`procedure details`"
229 );
230
231 let ProcedureDetailRequest { header } = request.into_inner();
232 let _header = header.context(error::MissingRequestHeaderSnafu)?;
233 let metas = self
234 .procedure_manager()
235 .list_procedures()
236 .await
237 .context(error::QueryProcedureSnafu)?;
238 Ok(Response::new(procedure::procedure_details_to_pb_response(
239 metas,
240 )))
241 }
242}