operator/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ReconcileRequest;
16use async_trait::async_trait;
17use catalog::CatalogManagerRef;
18use common_error::ext::BoxedError;
19use common_function::handlers::ProcedureServiceHandler;
20use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
21use common_meta::rpc::procedure::{
22    GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
23    GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
24    ProcedureStateResponse,
25};
26use common_query::error as query_error;
27use common_query::error::Result as QueryResult;
28use snafu::ResultExt;
29
30/// The operator for procedures which implements [`ProcedureServiceHandler`].
31#[derive(Clone)]
32pub struct ProcedureServiceOperator {
33    procedure_executor: ProcedureExecutorRef,
34    catalog_manager: CatalogManagerRef,
35}
36
37impl ProcedureServiceOperator {
38    pub fn new(
39        procedure_executor: ProcedureExecutorRef,
40        catalog_manager: CatalogManagerRef,
41    ) -> Self {
42        Self {
43            procedure_executor,
44            catalog_manager,
45        }
46    }
47}
48
49#[async_trait]
50impl ProcedureServiceHandler for ProcedureServiceOperator {
51    async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
52        Ok(self
53            .procedure_executor
54            .migrate_region(&ExecutorContext::default(), request)
55            .await
56            .map_err(BoxedError::new)
57            .context(query_error::ProcedureServiceSnafu)?
58            .pid
59            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
60    }
61
62    async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
63        Ok(self
64            .procedure_executor
65            .reconcile(&ExecutorContext::default(), request)
66            .await
67            .map_err(BoxedError::new)
68            .context(query_error::ProcedureServiceSnafu)?
69            .pid
70            .map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
71    }
72
73    async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
74        self.procedure_executor
75            .query_procedure_state(&ExecutorContext::default(), pid)
76            .await
77            .map_err(BoxedError::new)
78            .context(query_error::ProcedureServiceSnafu)
79    }
80
81    async fn manage_region_follower(
82        &self,
83        request: ManageRegionFollowerRequest,
84    ) -> QueryResult<()> {
85        self.procedure_executor
86            .manage_region_follower(&ExecutorContext::default(), request)
87            .await
88            .map_err(BoxedError::new)
89            .context(query_error::ProcedureServiceSnafu)
90    }
91
92    fn catalog_manager(&self) -> &CatalogManagerRef {
93        &self.catalog_manager
94    }
95
96    async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult<MetaGcResponse> {
97        self.procedure_executor
98            .gc_regions(&ExecutorContext::default(), request)
99            .await
100            .map_err(BoxedError::new)
101            .context(query_error::ProcedureServiceSnafu)
102    }
103
104    async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult<MetaGcResponse> {
105        self.procedure_executor
106            .gc_table(&ExecutorContext::default(), request)
107            .await
108            .map_err(BoxedError::new)
109            .context(query_error::ProcedureServiceSnafu)
110    }
111}