1use async_trait::async_trait;
16use catalog::CatalogManagerRef;
17use common_error::ext::BoxedError;
18use common_function::handlers::ProcedureServiceHandler;
19use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
20use common_meta::rpc::procedure::{
21 AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
22 RemoveRegionFollowerRequest,
23};
24use common_query::error as query_error;
25use common_query::error::Result as QueryResult;
26use snafu::ResultExt;
27
28#[derive(Clone)]
30pub struct ProcedureServiceOperator {
31 procedure_executor: ProcedureExecutorRef,
32 catalog_manager: CatalogManagerRef,
33}
34
35impl ProcedureServiceOperator {
36 pub fn new(
37 procedure_executor: ProcedureExecutorRef,
38 catalog_manager: CatalogManagerRef,
39 ) -> Self {
40 Self {
41 procedure_executor,
42 catalog_manager,
43 }
44 }
45}
46
47#[async_trait]
48impl ProcedureServiceHandler for ProcedureServiceOperator {
49 async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
50 Ok(self
51 .procedure_executor
52 .migrate_region(&ExecutorContext::default(), request)
53 .await
54 .map_err(BoxedError::new)
55 .context(query_error::ProcedureServiceSnafu)?
56 .pid
57 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
58 }
59
60 async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
61 self.procedure_executor
62 .query_procedure_state(&ExecutorContext::default(), pid)
63 .await
64 .map_err(BoxedError::new)
65 .context(query_error::ProcedureServiceSnafu)
66 }
67
68 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
69 self.procedure_executor
70 .add_region_follower(&ExecutorContext::default(), request)
71 .await
72 .map_err(BoxedError::new)
73 .context(query_error::ProcedureServiceSnafu)
74 }
75
76 async fn remove_region_follower(
77 &self,
78 request: RemoveRegionFollowerRequest,
79 ) -> QueryResult<()> {
80 self.procedure_executor
81 .remove_region_follower(&ExecutorContext::default(), request)
82 .await
83 .map_err(BoxedError::new)
84 .context(query_error::ProcedureServiceSnafu)
85 }
86
87 fn catalog_manager(&self) -> &CatalogManagerRef {
88 &self.catalog_manager
89 }
90}