1use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21 DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22 ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
23 QueryProcedureRequest, ResponseHeader, Role,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info, warn};
28use snafu::{ensure, ResultExt};
29use tokio::sync::RwLock;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32use tonic::Status;
33
34use crate::client::ask_leader::AskLeader;
35use crate::client::{util, Id};
36use crate::error;
37use crate::error::Result;
38
39#[derive(Clone, Debug)]
40pub struct Client {
41 inner: Arc<RwLock<Inner>>,
42}
43
44impl Client {
45 pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
46 let inner = Arc::new(RwLock::new(Inner {
47 id,
48 role,
49 channel_manager,
50 ask_leader: None,
51 max_retry,
52 }));
53
54 Self { inner }
55 }
56
57 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
58 where
59 U: AsRef<str>,
60 A: AsRef<[U]>,
61 {
62 let mut inner = self.inner.write().await;
63 inner.start(urls).await
64 }
65
66 pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
67 let inner = self.inner.read().await;
68 inner.submit_ddl_task(req).await
69 }
70
71 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
73 let inner = self.inner.read().await;
74 inner.query_procedure_state(pid).await
75 }
76
77 pub async fn migrate_region(
83 &self,
84 region_id: u64,
85 from_peer: u64,
86 to_peer: u64,
87 timeout: Duration,
88 ) -> Result<MigrateRegionResponse> {
89 let inner = self.inner.read().await;
90 inner
91 .migrate_region(region_id, from_peer, to_peer, timeout)
92 .await
93 }
94
95 pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
96 let inner = self.inner.read().await;
97 inner.list_procedures().await
98 }
99}
100
101#[derive(Debug)]
102struct Inner {
103 id: Id,
104 role: Role,
105 channel_manager: ChannelManager,
106 ask_leader: Option<AskLeader>,
107 max_retry: usize,
108}
109
110impl Inner {
111 async fn start<U, A>(&mut self, urls: A) -> Result<()>
112 where
113 U: AsRef<str>,
114 A: AsRef<[U]>,
115 {
116 ensure!(
117 !self.is_started(),
118 error::IllegalGrpcClientStateSnafu {
119 err_msg: "DDL client already started",
120 }
121 );
122
123 let peers = urls
124 .as_ref()
125 .iter()
126 .map(|url| url.as_ref().to_string())
127 .collect::<Vec<_>>();
128 self.ask_leader = Some(AskLeader::new(
129 self.id,
130 self.role,
131 peers,
132 self.channel_manager.clone(),
133 self.max_retry,
134 ));
135
136 Ok(())
137 }
138
139 fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
140 let channel = self
141 .channel_manager
142 .get(addr)
143 .context(error::CreateChannelSnafu)?;
144
145 Ok(ProcedureServiceClient::new(channel)
146 .accept_compressed(CompressionEncoding::Gzip)
147 .accept_compressed(CompressionEncoding::Zstd)
148 .send_compressed(CompressionEncoding::Zstd))
149 }
150
151 #[inline]
152 fn is_started(&self) -> bool {
153 self.ask_leader.is_some()
154 }
155
156 fn ask_leader(&self) -> Result<&AskLeader> {
157 ensure!(
158 self.is_started(),
159 error::IllegalGrpcClientStateSnafu {
160 err_msg: "DDL client not start"
161 }
162 );
163
164 Ok(self.ask_leader.as_ref().unwrap())
165 }
166
167 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
168 where
169 R: Future<Output = std::result::Result<T, Status>>,
170 F: Fn(ProcedureServiceClient<Channel>) -> R,
171 H: Fn(&T) -> &Option<ResponseHeader>,
172 {
173 let ask_leader = self.ask_leader()?;
174 let mut times = 0;
175 let mut last_error = None;
176
177 while times < self.max_retry {
178 if let Some(leader) = &ask_leader.get_leader() {
179 let client = self.make_client(leader)?;
180 match body_fn(client).await {
181 Ok(res) => {
182 if util::is_not_leader(get_header(&res)) {
183 last_error = Some(format!("{leader} is not a leader"));
184 warn!("Failed to {task} to {leader}, not a leader");
185 let leader = ask_leader.ask_leader().await?;
186 info!("DDL client updated to new leader addr: {leader}");
187 times += 1;
188 continue;
189 }
190 return Ok(res);
191 }
192 Err(status) => {
193 if util::is_unreachable(&status) {
195 last_error = Some(status.to_string());
196 warn!("Failed to {task} to {leader}, source: {status}");
197 let leader = ask_leader.ask_leader().await?;
198 info!("Procedure client updated to new leader addr: {leader}");
199 times += 1;
200 continue;
201 } else {
202 error!("An error occurred in gRPC, status: {status}");
203 return Err(error::Error::from(status));
204 }
205 }
206 }
207 } else if let Err(err) = ask_leader.ask_leader().await {
208 return Err(err);
209 }
210 }
211
212 error::RetryTimesExceededSnafu {
213 msg: format!("Failed to {task}, last error: {:?}", last_error),
214 times: self.max_retry,
215 }
216 .fail()
217 }
218
219 async fn migrate_region(
220 &self,
221 region_id: u64,
222 from_peer: u64,
223 to_peer: u64,
224 timeout: Duration,
225 ) -> Result<MigrateRegionResponse> {
226 let mut req = MigrateRegionRequest {
227 region_id,
228 from_peer,
229 to_peer,
230 timeout_secs: timeout.as_secs() as u32,
231 ..Default::default()
232 };
233
234 req.set_header(
235 self.id,
236 self.role,
237 TracingContext::from_current_span().to_w3c(),
238 );
239
240 self.with_retry(
241 "migrate region",
242 move |mut client| {
243 let req = req.clone();
244
245 async move { client.migrate(req).await.map(|res| res.into_inner()) }
246 },
247 |resp: &MigrateRegionResponse| &resp.header,
248 )
249 .await
250 }
251
252 async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
253 let mut req = QueryProcedureRequest {
254 pid: Some(ProcedureId { key: pid.into() }),
255 ..Default::default()
256 };
257
258 req.set_header(
259 self.id,
260 self.role,
261 TracingContext::from_current_span().to_w3c(),
262 );
263
264 self.with_retry(
265 "query procedure state",
266 move |mut client| {
267 let req = req.clone();
268
269 async move { client.query(req).await.map(|res| res.into_inner()) }
270 },
271 |resp: &ProcedureStateResponse| &resp.header,
272 )
273 .await
274 }
275
276 async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
277 req.set_header(
278 self.id,
279 self.role,
280 TracingContext::from_current_span().to_w3c(),
281 );
282
283 self.with_retry(
284 "submit ddl task",
285 move |mut client| {
286 let req = req.clone();
287 async move { client.ddl(req).await.map(|res| res.into_inner()) }
288 },
289 |resp: &DdlTaskResponse| &resp.header,
290 )
291 .await
292 }
293
294 async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
295 let mut req = ProcedureDetailRequest::default();
296 req.set_header(
297 self.id,
298 self.role,
299 TracingContext::from_current_span().to_w3c(),
300 );
301
302 self.with_retry(
303 "list procedure",
304 move |mut client| {
305 let req = req.clone();
306 async move { client.details(req).await.map(|res| res.into_inner()) }
307 },
308 |resp: &ProcedureDetailResponse| &resp.header,
309 )
310 .await
311 }
312}