common_meta/
procedure_executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24    ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28    self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
29    ProcedureStateResponse,
30};
31
32/// The context of procedure executor.
33#[derive(Debug, Default)]
34pub struct ExecutorContext {
35    pub tracing_context: Option<W3cTrace>,
36}
37
38/// The procedure executor that accepts ddl, region migration task etc.
39#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41    /// Submit a ddl task
42    async fn submit_ddl_task(
43        &self,
44        ctx: &ExecutorContext,
45        request: SubmitDdlTaskRequest,
46    ) -> Result<SubmitDdlTaskResponse>;
47
48    /// Submit ad manage region follower task
49    async fn manage_region_follower(
50        &self,
51        _ctx: &ExecutorContext,
52        _request: ManageRegionFollowerRequest,
53    ) -> Result<()> {
54        UnsupportedSnafu {
55            operation: "manage_region_follower",
56        }
57        .fail()
58    }
59
60    /// Submit a region migration task
61    async fn migrate_region(
62        &self,
63        ctx: &ExecutorContext,
64        request: MigrateRegionRequest,
65    ) -> Result<MigrateRegionResponse>;
66
67    /// Submit a reconcile task.
68    async fn reconcile(
69        &self,
70        _ctx: &ExecutorContext,
71        request: ReconcileRequest,
72    ) -> Result<ReconcileResponse>;
73
74    /// Query the procedure state by its id
75    async fn query_procedure_state(
76        &self,
77        ctx: &ExecutorContext,
78        pid: &str,
79    ) -> Result<ProcedureStateResponse>;
80
81    async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
82}
83
84pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
85
86/// The local procedure executor that accepts ddl, region migration task etc.
87pub struct LocalProcedureExecutor {
88    pub ddl_manager: DdlManagerRef,
89    pub procedure_manager: ProcedureManagerRef,
90}
91
92impl LocalProcedureExecutor {
93    pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
94        Self {
95            ddl_manager,
96            procedure_manager,
97        }
98    }
99}
100
101#[async_trait::async_trait]
102impl ProcedureExecutor for LocalProcedureExecutor {
103    async fn submit_ddl_task(
104        &self,
105        ctx: &ExecutorContext,
106        request: SubmitDdlTaskRequest,
107    ) -> Result<SubmitDdlTaskResponse> {
108        self.ddl_manager.submit_ddl_task(ctx, request).await
109    }
110
111    async fn migrate_region(
112        &self,
113        _ctx: &ExecutorContext,
114        _request: MigrateRegionRequest,
115    ) -> Result<MigrateRegionResponse> {
116        UnsupportedSnafu {
117            operation: "migrate_region",
118        }
119        .fail()
120    }
121
122    async fn reconcile(
123        &self,
124        _ctx: &ExecutorContext,
125        _request: ReconcileRequest,
126    ) -> Result<ReconcileResponse> {
127        UnsupportedSnafu {
128            operation: "reconcile",
129        }
130        .fail()
131    }
132
133    async fn query_procedure_state(
134        &self,
135        _ctx: &ExecutorContext,
136        pid: &str,
137    ) -> Result<ProcedureStateResponse> {
138        let pid =
139            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
140
141        let state = self
142            .procedure_manager
143            .procedure_state(pid)
144            .await
145            .context(QueryProcedureSnafu)?
146            .with_context(|| ProcedureNotFoundSnafu {
147                pid: pid.to_string(),
148            })?;
149
150        Ok(procedure::procedure_state_to_pb_response(&state))
151    }
152
153    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
154        let metas = self
155            .procedure_manager
156            .list_procedures()
157            .await
158            .context(QueryProcedureSnafu)?;
159        Ok(procedure::procedure_details_to_pb_response(metas))
160    }
161}