1use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22 ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
23};
24use common_query::error as query_error;
25use common_query::error::Result as QueryResult;
26use snafu::ResultExt;
27
28#[derive(Clone)]
30pub struct ProcedureServiceOperator {
31 procedure_executor: ProcedureExecutorRef,
32 catalog_manager: CatalogManagerRef,
33}
34
35impl ProcedureServiceOperator {
36 pub fn new(
37 procedure_executor: ProcedureExecutorRef,
38 catalog_manager: CatalogManagerRef,
39 ) -> Self {
40 Self {
41 procedure_executor,
42 catalog_manager,
43 }
44 }
45}
46
47#[async_trait]
48impl ProcedureServiceHandler for ProcedureServiceOperator {
49 async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
50 Ok(self
51 .procedure_executor
52 .migrate_region(&ExecutorContext::default(), request)
53 .await
54 .map_err(BoxedError::new)
55 .context(query_error::ProcedureServiceSnafu)?
56 .pid
57 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
58 }
59
60 async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
61 Ok(self
62 .procedure_executor
63 .reconcile(&ExecutorContext::default(), request)
64 .await
65 .map_err(BoxedError::new)
66 .context(query_error::ProcedureServiceSnafu)?
67 .pid
68 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
69 }
70
71 async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
72 self.procedure_executor
73 .query_procedure_state(&ExecutorContext::default(), pid)
74 .await
75 .map_err(BoxedError::new)
76 .context(query_error::ProcedureServiceSnafu)
77 }
78
79 async fn manage_region_follower(
80 &self,
81 request: ManageRegionFollowerRequest,
82 ) -> QueryResult<()> {
83 self.procedure_executor
84 .manage_region_follower(&ExecutorContext::default(), request)
85 .await
86 .map_err(BoxedError::new)
87 .context(query_error::ProcedureServiceSnafu)
88 }
89
90 fn catalog_manager(&self) -> &CatalogManagerRef {
91 &self.catalog_manager
92 }
93}