1use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22 AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
23 RemoveRegionFollowerRequest,
24};
25use common_query::error as query_error;
26use common_query::error::Result as QueryResult;
27use snafu::ResultExt;
28
29#[derive(Clone)]
31pub struct ProcedureServiceOperator {
32 procedure_executor: ProcedureExecutorRef,
33 catalog_manager: CatalogManagerRef,
34}
35
36impl ProcedureServiceOperator {
37 pub fn new(
38 procedure_executor: ProcedureExecutorRef,
39 catalog_manager: CatalogManagerRef,
40 ) -> Self {
41 Self {
42 procedure_executor,
43 catalog_manager,
44 }
45 }
46}
47
48#[async_trait]
49impl ProcedureServiceHandler for ProcedureServiceOperator {
50 async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
51 Ok(self
52 .procedure_executor
53 .migrate_region(&ExecutorContext::default(), request)
54 .await
55 .map_err(BoxedError::new)
56 .context(query_error::ProcedureServiceSnafu)?
57 .pid
58 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
59 }
60
61 async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
62 Ok(self
63 .procedure_executor
64 .reconcile(&ExecutorContext::default(), request)
65 .await
66 .map_err(BoxedError::new)
67 .context(query_error::ProcedureServiceSnafu)?
68 .pid
69 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
70 }
71
72 async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
73 self.procedure_executor
74 .query_procedure_state(&ExecutorContext::default(), pid)
75 .await
76 .map_err(BoxedError::new)
77 .context(query_error::ProcedureServiceSnafu)
78 }
79
80 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
81 self.procedure_executor
82 .add_region_follower(&ExecutorContext::default(), request)
83 .await
84 .map_err(BoxedError::new)
85 .context(query_error::ProcedureServiceSnafu)
86 }
87
88 async fn remove_region_follower(
89 &self,
90 request: RemoveRegionFollowerRequest,
91 ) -> QueryResult<()> {
92 self.procedure_executor
93 .remove_region_follower(&ExecutorContext::default(), request)
94 .await
95 .map_err(BoxedError::new)
96 .context(query_error::ProcedureServiceSnafu)
97 }
98
99 fn catalog_manager(&self) -> &CatalogManagerRef {
100 &self.catalog_manager
101 }
102}