meta_client/client/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21    DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22    ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
23    QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info, warn};
28use snafu::{ensure, ResultExt};
29use tokio::sync::RwLock;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32use tonic::Status;
33
34use crate::client::ask_leader::AskLeader;
35use crate::client::{util, Id, LeaderProviderRef};
36use crate::error;
37use crate::error::Result;
38
39#[derive(Clone, Debug)]
40pub struct Client {
41    inner: Arc<RwLock<Inner>>,
42}
43
44impl Client {
45    pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
46        let inner = Arc::new(RwLock::new(Inner {
47            id,
48            role,
49            channel_manager,
50            leader_provider: None,
51            max_retry,
52        }));
53
54        Self { inner }
55    }
56
57    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
58    where
59        U: AsRef<str>,
60        A: AsRef<[U]>,
61    {
62        let mut inner = self.inner.write().await;
63        inner.start(urls)
64    }
65
66    /// Start the client with a [LeaderProvider].
67    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
68        let mut inner = self.inner.write().await;
69        inner.start_with(leader_provider)
70    }
71
72    pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
73        let inner = self.inner.read().await;
74        inner.submit_ddl_task(req).await
75    }
76
77    /// Query the procedure' state by its id
78    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
79        let inner = self.inner.read().await;
80        inner.query_procedure_state(pid).await
81    }
82
83    /// Migrate the region from one datanode to the other datanode:
84    /// - `region_id`:  the migrated region id
85    /// - `from_peer`:  the source datanode id
86    /// - `to_peer`:  the target datanode id
87    /// - `timeout`: timeout for downgrading region and upgrading region operations
88    pub async fn migrate_region(
89        &self,
90        region_id: u64,
91        from_peer: u64,
92        to_peer: u64,
93        timeout: Duration,
94    ) -> Result<MigrateRegionResponse> {
95        let inner = self.inner.read().await;
96        inner
97            .migrate_region(region_id, from_peer, to_peer, timeout)
98            .await
99    }
100
101    /// Reconcile the procedure state.
102    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
103        let inner = self.inner.read().await;
104        inner.reconcile(request).await
105    }
106
107    pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
108        let inner = self.inner.read().await;
109        inner.list_procedures().await
110    }
111}
112
113#[derive(Debug)]
114struct Inner {
115    id: Id,
116    role: Role,
117    channel_manager: ChannelManager,
118    leader_provider: Option<LeaderProviderRef>,
119    max_retry: usize,
120}
121
122impl Inner {
123    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
124        ensure!(
125            !self.is_started(),
126            error::IllegalGrpcClientStateSnafu {
127                err_msg: "DDL client already started",
128            }
129        );
130        self.leader_provider = Some(leader_provider);
131        Ok(())
132    }
133
134    fn start<U, A>(&mut self, urls: A) -> Result<()>
135    where
136        U: AsRef<str>,
137        A: AsRef<[U]>,
138    {
139        let peers = urls
140            .as_ref()
141            .iter()
142            .map(|url| url.as_ref().to_string())
143            .collect::<Vec<_>>();
144        let ask_leader = AskLeader::new(
145            self.id,
146            self.role,
147            peers,
148            self.channel_manager.clone(),
149            self.max_retry,
150        );
151        self.start_with(Arc::new(ask_leader))
152    }
153
154    fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
155        let channel = self
156            .channel_manager
157            .get(addr)
158            .context(error::CreateChannelSnafu)?;
159
160        Ok(ProcedureServiceClient::new(channel)
161            .accept_compressed(CompressionEncoding::Gzip)
162            .accept_compressed(CompressionEncoding::Zstd)
163            .send_compressed(CompressionEncoding::Zstd))
164    }
165
166    #[inline]
167    fn is_started(&self) -> bool {
168        self.leader_provider.is_some()
169    }
170
171    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
172    where
173        R: Future<Output = std::result::Result<T, Status>>,
174        F: Fn(ProcedureServiceClient<Channel>) -> R,
175        H: Fn(&T) -> &Option<ResponseHeader>,
176    {
177        let Some(leader_provider) = self.leader_provider.as_ref() else {
178            return error::IllegalGrpcClientStateSnafu {
179                err_msg: "not started",
180            }
181            .fail();
182        };
183
184        let mut times = 0;
185        let mut last_error = None;
186
187        while times < self.max_retry {
188            if let Some(leader) = &leader_provider.leader() {
189                let client = self.make_client(leader)?;
190                match body_fn(client).await {
191                    Ok(res) => {
192                        if util::is_not_leader(get_header(&res)) {
193                            last_error = Some(format!("{leader} is not a leader"));
194                            warn!("Failed to {task} to {leader}, not a leader");
195                            let leader = leader_provider.ask_leader().await?;
196                            info!("DDL client updated to new leader addr: {leader}");
197                            times += 1;
198                            continue;
199                        }
200                        return Ok(res);
201                    }
202                    Err(status) => {
203                        // The leader may be unreachable.
204                        if util::is_unreachable(&status) {
205                            last_error = Some(status.to_string());
206                            warn!("Failed to {task} to {leader}, source: {status}");
207                            let leader = leader_provider.ask_leader().await?;
208                            info!("Procedure client updated to new leader addr: {leader}");
209                            times += 1;
210                            continue;
211                        } else {
212                            error!("An error occurred in gRPC, status: {status}");
213                            return Err(error::Error::from(status));
214                        }
215                    }
216                }
217            } else if let Err(err) = leader_provider.ask_leader().await {
218                return Err(err);
219            }
220        }
221
222        error::RetryTimesExceededSnafu {
223            msg: format!("Failed to {task}, last error: {:?}", last_error),
224            times: self.max_retry,
225        }
226        .fail()
227    }
228
229    async fn migrate_region(
230        &self,
231        region_id: u64,
232        from_peer: u64,
233        to_peer: u64,
234        timeout: Duration,
235    ) -> Result<MigrateRegionResponse> {
236        let mut req = MigrateRegionRequest {
237            region_id,
238            from_peer,
239            to_peer,
240            timeout_secs: timeout.as_secs() as u32,
241            ..Default::default()
242        };
243
244        req.set_header(
245            self.id,
246            self.role,
247            TracingContext::from_current_span().to_w3c(),
248        );
249
250        self.with_retry(
251            "migrate region",
252            move |mut client| {
253                let req = req.clone();
254
255                async move { client.migrate(req).await.map(|res| res.into_inner()) }
256            },
257            |resp: &MigrateRegionResponse| &resp.header,
258        )
259        .await
260    }
261
262    async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
263        let mut req = request;
264        req.set_header(
265            self.id,
266            self.role,
267            TracingContext::from_current_span().to_w3c(),
268        );
269
270        self.with_retry(
271            "reconcile",
272            move |mut client| {
273                let req = req.clone();
274
275                async move { client.reconcile(req).await.map(|res| res.into_inner()) }
276            },
277            |resp: &ReconcileResponse| &resp.header,
278        )
279        .await
280    }
281
282    async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
283        let mut req = QueryProcedureRequest {
284            pid: Some(ProcedureId { key: pid.into() }),
285            ..Default::default()
286        };
287
288        req.set_header(
289            self.id,
290            self.role,
291            TracingContext::from_current_span().to_w3c(),
292        );
293
294        self.with_retry(
295            "query procedure state",
296            move |mut client| {
297                let req = req.clone();
298
299                async move { client.query(req).await.map(|res| res.into_inner()) }
300            },
301            |resp: &ProcedureStateResponse| &resp.header,
302        )
303        .await
304    }
305
306    async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
307        req.set_header(
308            self.id,
309            self.role,
310            TracingContext::from_current_span().to_w3c(),
311        );
312
313        self.with_retry(
314            "submit ddl task",
315            move |mut client| {
316                let req = req.clone();
317                async move { client.ddl(req).await.map(|res| res.into_inner()) }
318            },
319            |resp: &DdlTaskResponse| &resp.header,
320        )
321        .await
322    }
323
324    async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
325        let mut req = ProcedureDetailRequest::default();
326        req.set_header(
327            self.id,
328            self.role,
329            TracingContext::from_current_span().to_w3c(),
330        );
331
332        self.with_retry(
333            "list procedure",
334            move |mut client| {
335                let req = req.clone();
336                async move { client.details(req).await.map(|res| res.into_inner()) }
337            },
338            |resp: &ProcedureDetailResponse| &resp.header,
339        )
340        .await
341    }
342}