operator/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22    ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
23};
24use common_query::error as query_error;
25use common_query::error::Result as QueryResult;
26use snafu::ResultExt;
27
28/// The operator for procedures which implements [`ProcedureServiceHandler`].
29#[derive(Clone)]
30pub struct ProcedureServiceOperator {
31    procedure_executor: ProcedureExecutorRef,
32    catalog_manager: CatalogManagerRef,
33}
34
35impl ProcedureServiceOperator {
36    pub fn new(
37        procedure_executor: ProcedureExecutorRef,
38        catalog_manager: CatalogManagerRef,
39    ) -> Self {
40        Self {
41            procedure_executor,
42            catalog_manager,
43        }
44    }
45}
46
47#[async_trait]
48impl ProcedureServiceHandler for ProcedureServiceOperator {
49    async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
50        Ok(self
51            .procedure_executor
52            .migrate_region(&ExecutorContext::default(), request)
53            .await
54            .map_err(BoxedError::new)
55            .context(query_error::ProcedureServiceSnafu)?
56            .pid
57            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
58    }
59
60    async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
61        Ok(self
62            .procedure_executor
63            .reconcile(&ExecutorContext::default(), request)
64            .await
65            .map_err(BoxedError::new)
66            .context(query_error::ProcedureServiceSnafu)?
67            .pid
68            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
69    }
70
71    async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
72        self.procedure_executor
73            .query_procedure_state(&ExecutorContext::default(), pid)
74            .await
75            .map_err(BoxedError::new)
76            .context(query_error::ProcedureServiceSnafu)
77    }
78
79    async fn manage_region_follower(
80        &self,
81        request: ManageRegionFollowerRequest,
82    ) -> QueryResult<()> {
83        self.procedure_executor
84            .manage_region_follower(&ExecutorContext::default(), request)
85            .await
86            .map_err(BoxedError::new)
87            .context(query_error::ProcedureServiceSnafu)
88    }
89
90    fn catalog_manager(&self) -> &CatalogManagerRef {
91        &self.catalog_manager
92    }
93}