common_meta/
procedure_executor.rs1use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24 ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28 self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
29 ProcedureStateResponse,
30};
31
32#[derive(Debug, Default)]
34pub struct ExecutorContext {
35 pub tracing_context: Option<W3cTrace>,
36}
37
38#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41 async fn submit_ddl_task(
43 &self,
44 ctx: &ExecutorContext,
45 request: SubmitDdlTaskRequest,
46 ) -> Result<SubmitDdlTaskResponse>;
47
48 async fn manage_region_follower(
50 &self,
51 _ctx: &ExecutorContext,
52 _request: ManageRegionFollowerRequest,
53 ) -> Result<()> {
54 UnsupportedSnafu {
55 operation: "manage_region_follower",
56 }
57 .fail()
58 }
59
60 async fn migrate_region(
62 &self,
63 ctx: &ExecutorContext,
64 request: MigrateRegionRequest,
65 ) -> Result<MigrateRegionResponse>;
66
67 async fn reconcile(
69 &self,
70 _ctx: &ExecutorContext,
71 request: ReconcileRequest,
72 ) -> Result<ReconcileResponse>;
73
74 async fn query_procedure_state(
76 &self,
77 ctx: &ExecutorContext,
78 pid: &str,
79 ) -> Result<ProcedureStateResponse>;
80
81 async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
82}
83
84pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
85
86pub struct LocalProcedureExecutor {
88 pub ddl_manager: DdlManagerRef,
89 pub procedure_manager: ProcedureManagerRef,
90}
91
92impl LocalProcedureExecutor {
93 pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
94 Self {
95 ddl_manager,
96 procedure_manager,
97 }
98 }
99}
100
101#[async_trait::async_trait]
102impl ProcedureExecutor for LocalProcedureExecutor {
103 async fn submit_ddl_task(
104 &self,
105 ctx: &ExecutorContext,
106 request: SubmitDdlTaskRequest,
107 ) -> Result<SubmitDdlTaskResponse> {
108 self.ddl_manager.submit_ddl_task(ctx, request).await
109 }
110
111 async fn migrate_region(
112 &self,
113 _ctx: &ExecutorContext,
114 _request: MigrateRegionRequest,
115 ) -> Result<MigrateRegionResponse> {
116 UnsupportedSnafu {
117 operation: "migrate_region",
118 }
119 .fail()
120 }
121
122 async fn reconcile(
123 &self,
124 _ctx: &ExecutorContext,
125 _request: ReconcileRequest,
126 ) -> Result<ReconcileResponse> {
127 UnsupportedSnafu {
128 operation: "reconcile",
129 }
130 .fail()
131 }
132
133 async fn query_procedure_state(
134 &self,
135 _ctx: &ExecutorContext,
136 pid: &str,
137 ) -> Result<ProcedureStateResponse> {
138 let pid =
139 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
140
141 let state = self
142 .procedure_manager
143 .procedure_state(pid)
144 .await
145 .context(QueryProcedureSnafu)?
146 .with_context(|| ProcedureNotFoundSnafu {
147 pid: pid.to_string(),
148 })?;
149
150 Ok(procedure::procedure_state_to_pb_response(&state))
151 }
152
153 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
154 let metas = self
155 .procedure_manager
156 .list_procedures()
157 .await
158 .context(QueryProcedureSnafu)?;
159 Ok(procedure::procedure_details_to_pb_response(metas))
160 }
161}