1use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22 GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
23 GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
24 ProcedureStateResponse,
25};
26use common_query::error as query_error;
27use common_query::error::Result as QueryResult;
28use snafu::ResultExt;
29
30#[derive(Clone)]
32pub struct ProcedureServiceOperator {
33 procedure_executor: ProcedureExecutorRef,
34 catalog_manager: CatalogManagerRef,
35}
36
37impl ProcedureServiceOperator {
38 pub fn new(
39 procedure_executor: ProcedureExecutorRef,
40 catalog_manager: CatalogManagerRef,
41 ) -> Self {
42 Self {
43 procedure_executor,
44 catalog_manager,
45 }
46 }
47}
48
49#[async_trait]
50impl ProcedureServiceHandler for ProcedureServiceOperator {
51 async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
52 Ok(self
53 .procedure_executor
54 .migrate_region(&ExecutorContext::default(), request)
55 .await
56 .map_err(BoxedError::new)
57 .context(query_error::ProcedureServiceSnafu)?
58 .pid
59 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
60 }
61
62 async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
63 Ok(self
64 .procedure_executor
65 .reconcile(&ExecutorContext::default(), request)
66 .await
67 .map_err(BoxedError::new)
68 .context(query_error::ProcedureServiceSnafu)?
69 .pid
70 .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
71 }
72
73 async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
74 self.procedure_executor
75 .query_procedure_state(&ExecutorContext::default(), pid)
76 .await
77 .map_err(BoxedError::new)
78 .context(query_error::ProcedureServiceSnafu)
79 }
80
81 async fn manage_region_follower(
82 &self,
83 request: ManageRegionFollowerRequest,
84 ) -> QueryResult<()> {
85 self.procedure_executor
86 .manage_region_follower(&ExecutorContext::default(), request)
87 .await
88 .map_err(BoxedError::new)
89 .context(query_error::ProcedureServiceSnafu)
90 }
91
92 fn catalog_manager(&self) -> &CatalogManagerRef {
93 &self.catalog_manager
94 }
95
96 async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult<MetaGcResponse> {
97 self.procedure_executor
98 .gc_regions(&ExecutorContext::default(), request)
99 .await
100 .map_err(BoxedError::new)
101 .context(query_error::ProcedureServiceSnafu)
102 }
103
104 async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult<MetaGcResponse> {
105 self.procedure_executor
106 .gc_table(&ExecutorContext::default(), request)
107 .await
108 .map_err(BoxedError::new)
109 .context(query_error::ProcedureServiceSnafu)
110 }
111}