common_meta/
procedure_executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24    ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28    self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
29    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
30};
31
32/// The context of procedure executor.
33#[derive(Debug, Default)]
34pub struct ExecutorContext {
35    pub tracing_context: Option<W3cTrace>,
36}
37
38/// The procedure executor that accepts ddl, region migration task etc.
39#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41    /// Submit a ddl task
42    async fn submit_ddl_task(
43        &self,
44        ctx: &ExecutorContext,
45        request: SubmitDdlTaskRequest,
46    ) -> Result<SubmitDdlTaskResponse>;
47
48    /// Submit ad manage region follower task
49    async fn manage_region_follower(
50        &self,
51        _ctx: &ExecutorContext,
52        _request: ManageRegionFollowerRequest,
53    ) -> Result<()> {
54        UnsupportedSnafu {
55            operation: "manage_region_follower",
56        }
57        .fail()
58    }
59
60    /// Submit a region migration task
61    async fn migrate_region(
62        &self,
63        ctx: &ExecutorContext,
64        request: MigrateRegionRequest,
65    ) -> Result<MigrateRegionResponse>;
66
67    /// Submit a reconcile task.
68    async fn reconcile(
69        &self,
70        _ctx: &ExecutorContext,
71        request: ReconcileRequest,
72    ) -> Result<ReconcileResponse>;
73
74    /// Query the procedure state by its id
75    async fn query_procedure_state(
76        &self,
77        ctx: &ExecutorContext,
78        pid: &str,
79    ) -> Result<ProcedureStateResponse>;
80
81    /// Manually trigger GC for the specified regions.
82    async fn gc_regions(
83        &self,
84        _ctx: &ExecutorContext,
85        _request: GcRegionsRequest,
86    ) -> Result<GcResponse> {
87        UnsupportedSnafu {
88            operation: "gc_regions",
89        }
90        .fail()
91    }
92
93    /// Manually trigger GC for the specified table.
94    async fn gc_table(
95        &self,
96        _ctx: &ExecutorContext,
97        _request: GcTableRequest,
98    ) -> Result<GcResponse> {
99        UnsupportedSnafu {
100            operation: "gc_table",
101        }
102        .fail()
103    }
104
105    async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
106}
107
108pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
109
110/// The local procedure executor that accepts ddl, region migration task etc.
111pub struct LocalProcedureExecutor {
112    pub ddl_manager: DdlManagerRef,
113    pub procedure_manager: ProcedureManagerRef,
114}
115
116impl LocalProcedureExecutor {
117    pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
118        Self {
119            ddl_manager,
120            procedure_manager,
121        }
122    }
123}
124
125#[async_trait::async_trait]
126impl ProcedureExecutor for LocalProcedureExecutor {
127    async fn submit_ddl_task(
128        &self,
129        ctx: &ExecutorContext,
130        request: SubmitDdlTaskRequest,
131    ) -> Result<SubmitDdlTaskResponse> {
132        self.ddl_manager.submit_ddl_task(ctx, request).await
133    }
134
135    async fn migrate_region(
136        &self,
137        _ctx: &ExecutorContext,
138        _request: MigrateRegionRequest,
139    ) -> Result<MigrateRegionResponse> {
140        UnsupportedSnafu {
141            operation: "migrate_region",
142        }
143        .fail()
144    }
145
146    async fn reconcile(
147        &self,
148        _ctx: &ExecutorContext,
149        _request: ReconcileRequest,
150    ) -> Result<ReconcileResponse> {
151        UnsupportedSnafu {
152            operation: "reconcile",
153        }
154        .fail()
155    }
156
157    async fn query_procedure_state(
158        &self,
159        _ctx: &ExecutorContext,
160        pid: &str,
161    ) -> Result<ProcedureStateResponse> {
162        let pid =
163            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
164
165        let state = self
166            .procedure_manager
167            .procedure_state(pid)
168            .await
169            .context(QueryProcedureSnafu)?
170            .with_context(|| ProcedureNotFoundSnafu {
171                pid: pid.to_string(),
172            })?;
173
174        Ok(procedure::procedure_state_to_pb_response(&state))
175    }
176
177    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
178        let metas = self
179            .procedure_manager
180            .list_procedures()
181            .await
182            .context(QueryProcedureSnafu)?;
183        Ok(procedure::procedure_details_to_pb_response(metas))
184    }
185}