common_meta/
procedure_executor.rs1use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24 ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28 self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
29 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
30};
31
32#[derive(Debug, Default)]
34pub struct ExecutorContext {
35 pub tracing_context: Option<W3cTrace>,
36}
37
38#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41 async fn submit_ddl_task(
43 &self,
44 ctx: &ExecutorContext,
45 request: SubmitDdlTaskRequest,
46 ) -> Result<SubmitDdlTaskResponse>;
47
48 async fn manage_region_follower(
50 &self,
51 _ctx: &ExecutorContext,
52 _request: ManageRegionFollowerRequest,
53 ) -> Result<()> {
54 UnsupportedSnafu {
55 operation: "manage_region_follower",
56 }
57 .fail()
58 }
59
60 async fn migrate_region(
62 &self,
63 ctx: &ExecutorContext,
64 request: MigrateRegionRequest,
65 ) -> Result<MigrateRegionResponse>;
66
67 async fn reconcile(
69 &self,
70 _ctx: &ExecutorContext,
71 request: ReconcileRequest,
72 ) -> Result<ReconcileResponse>;
73
74 async fn query_procedure_state(
76 &self,
77 ctx: &ExecutorContext,
78 pid: &str,
79 ) -> Result<ProcedureStateResponse>;
80
81 async fn gc_regions(
83 &self,
84 _ctx: &ExecutorContext,
85 _request: GcRegionsRequest,
86 ) -> Result<GcResponse> {
87 UnsupportedSnafu {
88 operation: "gc_regions",
89 }
90 .fail()
91 }
92
93 async fn gc_table(
95 &self,
96 _ctx: &ExecutorContext,
97 _request: GcTableRequest,
98 ) -> Result<GcResponse> {
99 UnsupportedSnafu {
100 operation: "gc_table",
101 }
102 .fail()
103 }
104
105 async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
106}
107
108pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
109
110pub struct LocalProcedureExecutor {
112 pub ddl_manager: DdlManagerRef,
113 pub procedure_manager: ProcedureManagerRef,
114}
115
116impl LocalProcedureExecutor {
117 pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
118 Self {
119 ddl_manager,
120 procedure_manager,
121 }
122 }
123}
124
125#[async_trait::async_trait]
126impl ProcedureExecutor for LocalProcedureExecutor {
127 async fn submit_ddl_task(
128 &self,
129 ctx: &ExecutorContext,
130 request: SubmitDdlTaskRequest,
131 ) -> Result<SubmitDdlTaskResponse> {
132 self.ddl_manager.submit_ddl_task(ctx, request).await
133 }
134
135 async fn migrate_region(
136 &self,
137 _ctx: &ExecutorContext,
138 _request: MigrateRegionRequest,
139 ) -> Result<MigrateRegionResponse> {
140 UnsupportedSnafu {
141 operation: "migrate_region",
142 }
143 .fail()
144 }
145
146 async fn reconcile(
147 &self,
148 _ctx: &ExecutorContext,
149 _request: ReconcileRequest,
150 ) -> Result<ReconcileResponse> {
151 UnsupportedSnafu {
152 operation: "reconcile",
153 }
154 .fail()
155 }
156
157 async fn query_procedure_state(
158 &self,
159 _ctx: &ExecutorContext,
160 pid: &str,
161 ) -> Result<ProcedureStateResponse> {
162 let pid =
163 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
164
165 let state = self
166 .procedure_manager
167 .procedure_state(pid)
168 .await
169 .context(QueryProcedureSnafu)?
170 .with_context(|| ProcedureNotFoundSnafu {
171 pid: pid.to_string(),
172 })?;
173
174 Ok(procedure::procedure_state_to_pb_response(&state))
175 }
176
177 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
178 let metas = self
179 .procedure_manager
180 .list_procedures()
181 .await
182 .context(QueryProcedureSnafu)?;
183 Ok(procedure::procedure_details_to_pb_response(metas))
184 }
185}