operator/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22    AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
23    RemoveRegionFollowerRequest,
24};
25use common_query::error as query_error;
26use common_query::error::Result as QueryResult;
27use snafu::ResultExt;
28
29/// The operator for procedures which implements [`ProcedureServiceHandler`].
30#[derive(Clone)]
31pub struct ProcedureServiceOperator {
32    procedure_executor: ProcedureExecutorRef,
33    catalog_manager: CatalogManagerRef,
34}
35
36impl ProcedureServiceOperator {
37    pub fn new(
38        procedure_executor: ProcedureExecutorRef,
39        catalog_manager: CatalogManagerRef,
40    ) -> Self {
41        Self {
42            procedure_executor,
43            catalog_manager,
44        }
45    }
46}
47
48#[async_trait]
49impl ProcedureServiceHandler for ProcedureServiceOperator {
50    async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
51        Ok(self
52            .procedure_executor
53            .migrate_region(&ExecutorContext::default(), request)
54            .await
55            .map_err(BoxedError::new)
56            .context(query_error::ProcedureServiceSnafu)?
57            .pid
58            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
59    }
60
61    async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
62        Ok(self
63            .procedure_executor
64            .reconcile(&ExecutorContext::default(), request)
65            .await
66            .map_err(BoxedError::new)
67            .context(query_error::ProcedureServiceSnafu)?
68            .pid
69            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
70    }
71
72    async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
73        self.procedure_executor
74            .query_procedure_state(&ExecutorContext::default(), pid)
75            .await
76            .map_err(BoxedError::new)
77            .context(query_error::ProcedureServiceSnafu)
78    }
79
80    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
81        self.procedure_executor
82            .add_region_follower(&ExecutorContext::default(), request)
83            .await
84            .map_err(BoxedError::new)
85            .context(query_error::ProcedureServiceSnafu)
86    }
87
88    async fn remove_region_follower(
89        &self,
90        request: RemoveRegionFollowerRequest,
91    ) -> QueryResult<()> {
92        self.procedure_executor
93            .remove_region_follower(&ExecutorContext::default(), request)
94            .await
95            .map_err(BoxedError::new)
96            .context(query_error::ProcedureServiceSnafu)
97    }
98
99    fn catalog_manager(&self) -> &CatalogManagerRef {
100        &self.catalog_manager
101    }
102}