meta_client/client/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21    DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22    ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
23    QueryProcedureRequest, ResponseHeader, Role,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info, warn};
28use snafu::{ensure, ResultExt};
29use tokio::sync::RwLock;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32use tonic::Status;
33
34use crate::client::ask_leader::AskLeader;
35use crate::client::{util, Id};
36use crate::error;
37use crate::error::Result;
38
39#[derive(Clone, Debug)]
40pub struct Client {
41    inner: Arc<RwLock<Inner>>,
42}
43
44impl Client {
45    pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
46        let inner = Arc::new(RwLock::new(Inner {
47            id,
48            role,
49            channel_manager,
50            ask_leader: None,
51            max_retry,
52        }));
53
54        Self { inner }
55    }
56
57    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
58    where
59        U: AsRef<str>,
60        A: AsRef<[U]>,
61    {
62        let mut inner = self.inner.write().await;
63        inner.start(urls).await
64    }
65
66    pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
67        let inner = self.inner.read().await;
68        inner.submit_ddl_task(req).await
69    }
70
71    /// Query the procedure' state by its id
72    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
73        let inner = self.inner.read().await;
74        inner.query_procedure_state(pid).await
75    }
76
77    /// Migrate the region from one datanode to the other datanode:
78    /// - `region_id`:  the migrated region id
79    /// - `from_peer`:  the source datanode id
80    /// - `to_peer`:  the target datanode id
81    /// - `timeout`: timeout for downgrading region and upgrading region operations
82    pub async fn migrate_region(
83        &self,
84        region_id: u64,
85        from_peer: u64,
86        to_peer: u64,
87        timeout: Duration,
88    ) -> Result<MigrateRegionResponse> {
89        let inner = self.inner.read().await;
90        inner
91            .migrate_region(region_id, from_peer, to_peer, timeout)
92            .await
93    }
94
95    pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
96        let inner = self.inner.read().await;
97        inner.list_procedures().await
98    }
99}
100
101#[derive(Debug)]
102struct Inner {
103    id: Id,
104    role: Role,
105    channel_manager: ChannelManager,
106    ask_leader: Option<AskLeader>,
107    max_retry: usize,
108}
109
110impl Inner {
111    async fn start<U, A>(&mut self, urls: A) -> Result<()>
112    where
113        U: AsRef<str>,
114        A: AsRef<[U]>,
115    {
116        ensure!(
117            !self.is_started(),
118            error::IllegalGrpcClientStateSnafu {
119                err_msg: "DDL client already started",
120            }
121        );
122
123        let peers = urls
124            .as_ref()
125            .iter()
126            .map(|url| url.as_ref().to_string())
127            .collect::<Vec<_>>();
128        self.ask_leader = Some(AskLeader::new(
129            self.id,
130            self.role,
131            peers,
132            self.channel_manager.clone(),
133            self.max_retry,
134        ));
135
136        Ok(())
137    }
138
139    fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
140        let channel = self
141            .channel_manager
142            .get(addr)
143            .context(error::CreateChannelSnafu)?;
144
145        Ok(ProcedureServiceClient::new(channel)
146            .accept_compressed(CompressionEncoding::Gzip)
147            .accept_compressed(CompressionEncoding::Zstd)
148            .send_compressed(CompressionEncoding::Zstd))
149    }
150
151    #[inline]
152    fn is_started(&self) -> bool {
153        self.ask_leader.is_some()
154    }
155
156    fn ask_leader(&self) -> Result<&AskLeader> {
157        ensure!(
158            self.is_started(),
159            error::IllegalGrpcClientStateSnafu {
160                err_msg: "DDL client not start"
161            }
162        );
163
164        Ok(self.ask_leader.as_ref().unwrap())
165    }
166
167    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
168    where
169        R: Future<Output = std::result::Result<T, Status>>,
170        F: Fn(ProcedureServiceClient<Channel>) -> R,
171        H: Fn(&T) -> &Option<ResponseHeader>,
172    {
173        let ask_leader = self.ask_leader()?;
174        let mut times = 0;
175        let mut last_error = None;
176
177        while times < self.max_retry {
178            if let Some(leader) = &ask_leader.get_leader() {
179                let client = self.make_client(leader)?;
180                match body_fn(client).await {
181                    Ok(res) => {
182                        if util::is_not_leader(get_header(&res)) {
183                            last_error = Some(format!("{leader} is not a leader"));
184                            warn!("Failed to {task} to {leader}, not a leader");
185                            let leader = ask_leader.ask_leader().await?;
186                            info!("DDL client updated to new leader addr: {leader}");
187                            times += 1;
188                            continue;
189                        }
190                        return Ok(res);
191                    }
192                    Err(status) => {
193                        // The leader may be unreachable.
194                        if util::is_unreachable(&status) {
195                            last_error = Some(status.to_string());
196                            warn!("Failed to {task} to {leader}, source: {status}");
197                            let leader = ask_leader.ask_leader().await?;
198                            info!("Procedure client updated to new leader addr: {leader}");
199                            times += 1;
200                            continue;
201                        } else {
202                            error!("An error occurred in gRPC, status: {status}");
203                            return Err(error::Error::from(status));
204                        }
205                    }
206                }
207            } else if let Err(err) = ask_leader.ask_leader().await {
208                return Err(err);
209            }
210        }
211
212        error::RetryTimesExceededSnafu {
213            msg: format!("Failed to {task}, last error: {:?}", last_error),
214            times: self.max_retry,
215        }
216        .fail()
217    }
218
219    async fn migrate_region(
220        &self,
221        region_id: u64,
222        from_peer: u64,
223        to_peer: u64,
224        timeout: Duration,
225    ) -> Result<MigrateRegionResponse> {
226        let mut req = MigrateRegionRequest {
227            region_id,
228            from_peer,
229            to_peer,
230            timeout_secs: timeout.as_secs() as u32,
231            ..Default::default()
232        };
233
234        req.set_header(
235            self.id,
236            self.role,
237            TracingContext::from_current_span().to_w3c(),
238        );
239
240        self.with_retry(
241            "migrate region",
242            move |mut client| {
243                let req = req.clone();
244
245                async move { client.migrate(req).await.map(|res| res.into_inner()) }
246            },
247            |resp: &MigrateRegionResponse| &resp.header,
248        )
249        .await
250    }
251
252    async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
253        let mut req = QueryProcedureRequest {
254            pid: Some(ProcedureId { key: pid.into() }),
255            ..Default::default()
256        };
257
258        req.set_header(
259            self.id,
260            self.role,
261            TracingContext::from_current_span().to_w3c(),
262        );
263
264        self.with_retry(
265            "query procedure state",
266            move |mut client| {
267                let req = req.clone();
268
269                async move { client.query(req).await.map(|res| res.into_inner()) }
270            },
271            |resp: &ProcedureStateResponse| &resp.header,
272        )
273        .await
274    }
275
276    async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
277        req.set_header(
278            self.id,
279            self.role,
280            TracingContext::from_current_span().to_w3c(),
281        );
282
283        self.with_retry(
284            "submit ddl task",
285            move |mut client| {
286                let req = req.clone();
287                async move { client.ddl(req).await.map(|res| res.into_inner()) }
288            },
289            |resp: &DdlTaskResponse| &resp.header,
290        )
291        .await
292    }
293
294    async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
295        let mut req = ProcedureDetailRequest::default();
296        req.set_header(
297            self.id,
298            self.role,
299            TracingContext::from_current_span().to_w3c(),
300        );
301
302        self.with_retry(
303            "list procedure",
304            move |mut client| {
305                let req = req.clone();
306                async move { client.details(req).await.map(|res| res.into_inner()) }
307            },
308            |resp: &ProcedureDetailResponse| &resp.header,
309        )
310        .await
311    }
312}