1use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22    ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
23};
24use common_query::error as query_error;
25use common_query::error::Result as QueryResult;
26use snafu::ResultExt;
27
28#[derive(Clone)]
30pub struct ProcedureServiceOperator {
31    procedure_executor: ProcedureExecutorRef,
32    catalog_manager: CatalogManagerRef,
33}
34
35impl ProcedureServiceOperator {
36    pub fn new(
37        procedure_executor: ProcedureExecutorRef,
38        catalog_manager: CatalogManagerRef,
39    ) -> Self {
40        Self {
41            procedure_executor,
42            catalog_manager,
43        }
44    }
45}
46
47#[async_trait]
48impl ProcedureServiceHandler for ProcedureServiceOperator {
49    async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
50        Ok(self
51            .procedure_executor
52            .migrate_region(&ExecutorContext::default(), request)
53            .await
54            .map_err(BoxedError::new)
55            .context(query_error::ProcedureServiceSnafu)?
56            .pid
57            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
58    }
59
60    async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
61        Ok(self
62            .procedure_executor
63            .reconcile(&ExecutorContext::default(), request)
64            .await
65            .map_err(BoxedError::new)
66            .context(query_error::ProcedureServiceSnafu)?
67            .pid
68            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
69    }
70
71    async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
72        self.procedure_executor
73            .query_procedure_state(&ExecutorContext::default(), pid)
74            .await
75            .map_err(BoxedError::new)
76            .context(query_error::ProcedureServiceSnafu)
77    }
78
79    async fn manage_region_follower(
80        &self,
81        request: ManageRegionFollowerRequest,
82    ) -> QueryResult<()> {
83        self.procedure_executor
84            .manage_region_follower(&ExecutorContext::default(), request)
85            .await
86            .map_err(BoxedError::new)
87            .context(query_error::ProcedureServiceSnafu)
88    }
89
90    fn catalog_manager(&self) -> &CatalogManagerRef {
91        &self.catalog_manager
92    }
93}