common_meta/
procedure_executor.rs1use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24 ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28 self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
29 ProcedureStateResponse, RemoveRegionFollowerRequest,
30};
31
32#[derive(Debug, Default)]
34pub struct ExecutorContext {
35 pub tracing_context: Option<W3cTrace>,
36}
37
38#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41 async fn submit_ddl_task(
43 &self,
44 ctx: &ExecutorContext,
45 request: SubmitDdlTaskRequest,
46 ) -> Result<SubmitDdlTaskResponse>;
47
48 async fn add_region_follower(
50 &self,
51 _ctx: &ExecutorContext,
52 _request: AddRegionFollowerRequest,
53 ) -> Result<()> {
54 UnsupportedSnafu {
55 operation: "add_region_follower",
56 }
57 .fail()
58 }
59
60 async fn remove_region_follower(
62 &self,
63 _ctx: &ExecutorContext,
64 _request: RemoveRegionFollowerRequest,
65 ) -> Result<()> {
66 UnsupportedSnafu {
67 operation: "remove_region_follower",
68 }
69 .fail()
70 }
71
72 async fn migrate_region(
74 &self,
75 ctx: &ExecutorContext,
76 request: MigrateRegionRequest,
77 ) -> Result<MigrateRegionResponse>;
78
79 async fn reconcile(
81 &self,
82 _ctx: &ExecutorContext,
83 request: ReconcileRequest,
84 ) -> Result<ReconcileResponse>;
85
86 async fn query_procedure_state(
88 &self,
89 ctx: &ExecutorContext,
90 pid: &str,
91 ) -> Result<ProcedureStateResponse>;
92
93 async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
94}
95
96pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
97
98pub struct LocalProcedureExecutor {
100 pub ddl_manager: DdlManagerRef,
101 pub procedure_manager: ProcedureManagerRef,
102}
103
104impl LocalProcedureExecutor {
105 pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
106 Self {
107 ddl_manager,
108 procedure_manager,
109 }
110 }
111}
112
113#[async_trait::async_trait]
114impl ProcedureExecutor for LocalProcedureExecutor {
115 async fn submit_ddl_task(
116 &self,
117 ctx: &ExecutorContext,
118 request: SubmitDdlTaskRequest,
119 ) -> Result<SubmitDdlTaskResponse> {
120 self.ddl_manager.submit_ddl_task(ctx, request).await
121 }
122
123 async fn migrate_region(
124 &self,
125 _ctx: &ExecutorContext,
126 _request: MigrateRegionRequest,
127 ) -> Result<MigrateRegionResponse> {
128 UnsupportedSnafu {
129 operation: "migrate_region",
130 }
131 .fail()
132 }
133
134 async fn reconcile(
135 &self,
136 _ctx: &ExecutorContext,
137 _request: ReconcileRequest,
138 ) -> Result<ReconcileResponse> {
139 UnsupportedSnafu {
140 operation: "reconcile",
141 }
142 .fail()
143 }
144
145 async fn query_procedure_state(
146 &self,
147 _ctx: &ExecutorContext,
148 pid: &str,
149 ) -> Result<ProcedureStateResponse> {
150 let pid =
151 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
152
153 let state = self
154 .procedure_manager
155 .procedure_state(pid)
156 .await
157 .context(QueryProcedureSnafu)?
158 .with_context(|| ProcedureNotFoundSnafu {
159 pid: pid.to_string(),
160 })?;
161
162 Ok(procedure::procedure_state_to_pb_response(&state))
163 }
164
165 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
166 let metas = self
167 .procedure_manager
168 .list_procedures()
169 .await
170 .context(QueryProcedureSnafu)?;
171 Ok(procedure::procedure_details_to_pb_response(metas))
172 }
173}