operator/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use catalog::CatalogManagerRef;
17use common_error::ext::BoxedError;
18use common_function::handlers::ProcedureServiceHandler;
19use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
20use common_meta::rpc::procedure::{
21    AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
22    RemoveRegionFollowerRequest,
23};
24use common_query::error as query_error;
25use common_query::error::Result as QueryResult;
26use snafu::ResultExt;
27
28/// The operator for procedures which implements [`ProcedureServiceHandler`].
29#[derive(Clone)]
30pub struct ProcedureServiceOperator {
31    procedure_executor: ProcedureExecutorRef,
32    catalog_manager: CatalogManagerRef,
33}
34
35impl ProcedureServiceOperator {
36    pub fn new(
37        procedure_executor: ProcedureExecutorRef,
38        catalog_manager: CatalogManagerRef,
39    ) -> Self {
40        Self {
41            procedure_executor,
42            catalog_manager,
43        }
44    }
45}
46
47#[async_trait]
48impl ProcedureServiceHandler for ProcedureServiceOperator {
49    async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
50        Ok(self
51            .procedure_executor
52            .migrate_region(&ExecutorContext::default(), request)
53            .await
54            .map_err(BoxedError::new)
55            .context(query_error::ProcedureServiceSnafu)?
56            .pid
57            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
58    }
59
60    async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
61        self.procedure_executor
62            .query_procedure_state(&ExecutorContext::default(), pid)
63            .await
64            .map_err(BoxedError::new)
65            .context(query_error::ProcedureServiceSnafu)
66    }
67
68    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
69        self.procedure_executor
70            .add_region_follower(&ExecutorContext::default(), request)
71            .await
72            .map_err(BoxedError::new)
73            .context(query_error::ProcedureServiceSnafu)
74    }
75
76    async fn remove_region_follower(
77        &self,
78        request: RemoveRegionFollowerRequest,
79    ) -> QueryResult<()> {
80        self.procedure_executor
81            .remove_region_follower(&ExecutorContext::default(), request)
82            .await
83            .map_err(BoxedError::new)
84            .context(query_error::ProcedureServiceSnafu)
85    }
86
87    fn catalog_manager(&self) -> &CatalogManagerRef {
88        &self.catalog_manager
89    }
90}