common_meta/
procedure_executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
18use common_procedure::{ProcedureId, ProcedureManagerRef};
19use common_telemetry::tracing_context::W3cTrace;
20use snafu::{OptionExt, ResultExt};
21
22use crate::ddl_manager::DdlManagerRef;
23use crate::error::{
24    ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
25};
26use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
27use crate::rpc::procedure::{
28    self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
29    ProcedureStateResponse, RemoveRegionFollowerRequest,
30};
31
32/// The context of procedure executor.
33#[derive(Debug, Default)]
34pub struct ExecutorContext {
35    pub tracing_context: Option<W3cTrace>,
36}
37
38/// The procedure executor that accepts ddl, region migration task etc.
39#[async_trait::async_trait]
40pub trait ProcedureExecutor: Send + Sync {
41    /// Submit a ddl task
42    async fn submit_ddl_task(
43        &self,
44        ctx: &ExecutorContext,
45        request: SubmitDdlTaskRequest,
46    ) -> Result<SubmitDdlTaskResponse>;
47
48    /// Add a region follower
49    async fn add_region_follower(
50        &self,
51        _ctx: &ExecutorContext,
52        _request: AddRegionFollowerRequest,
53    ) -> Result<()> {
54        UnsupportedSnafu {
55            operation: "add_region_follower",
56        }
57        .fail()
58    }
59
60    /// Remove a region follower
61    async fn remove_region_follower(
62        &self,
63        _ctx: &ExecutorContext,
64        _request: RemoveRegionFollowerRequest,
65    ) -> Result<()> {
66        UnsupportedSnafu {
67            operation: "remove_region_follower",
68        }
69        .fail()
70    }
71
72    /// Submit a region migration task
73    async fn migrate_region(
74        &self,
75        ctx: &ExecutorContext,
76        request: MigrateRegionRequest,
77    ) -> Result<MigrateRegionResponse>;
78
79    /// Submit a reconcile task.
80    async fn reconcile(
81        &self,
82        _ctx: &ExecutorContext,
83        request: ReconcileRequest,
84    ) -> Result<ReconcileResponse>;
85
86    /// Query the procedure state by its id
87    async fn query_procedure_state(
88        &self,
89        ctx: &ExecutorContext,
90        pid: &str,
91    ) -> Result<ProcedureStateResponse>;
92
93    async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
94}
95
96pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
97
98/// The local procedure executor that accepts ddl, region migration task etc.
99pub struct LocalProcedureExecutor {
100    pub ddl_manager: DdlManagerRef,
101    pub procedure_manager: ProcedureManagerRef,
102}
103
104impl LocalProcedureExecutor {
105    pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
106        Self {
107            ddl_manager,
108            procedure_manager,
109        }
110    }
111}
112
113#[async_trait::async_trait]
114impl ProcedureExecutor for LocalProcedureExecutor {
115    async fn submit_ddl_task(
116        &self,
117        ctx: &ExecutorContext,
118        request: SubmitDdlTaskRequest,
119    ) -> Result<SubmitDdlTaskResponse> {
120        self.ddl_manager.submit_ddl_task(ctx, request).await
121    }
122
123    async fn migrate_region(
124        &self,
125        _ctx: &ExecutorContext,
126        _request: MigrateRegionRequest,
127    ) -> Result<MigrateRegionResponse> {
128        UnsupportedSnafu {
129            operation: "migrate_region",
130        }
131        .fail()
132    }
133
134    async fn reconcile(
135        &self,
136        _ctx: &ExecutorContext,
137        _request: ReconcileRequest,
138    ) -> Result<ReconcileResponse> {
139        UnsupportedSnafu {
140            operation: "reconcile",
141        }
142        .fail()
143    }
144
145    async fn query_procedure_state(
146        &self,
147        _ctx: &ExecutorContext,
148        pid: &str,
149    ) -> Result<ProcedureStateResponse> {
150        let pid =
151            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
152
153        let state = self
154            .procedure_manager
155            .procedure_state(pid)
156            .await
157            .context(QueryProcedureSnafu)?
158            .with_context(|| ProcedureNotFoundSnafu {
159                pid: pid.to_string(),
160            })?;
161
162        Ok(procedure::procedure_state_to_pb_response(&state))
163    }
164
165    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
166        let metas = self
167            .procedure_manager
168            .list_procedures()
169            .await
170            .context(QueryProcedureSnafu)?;
171        Ok(procedure::procedure_details_to_pb_response(metas))
172    }
173}