1use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21 DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
22 ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
23 QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info, warn};
28use snafu::{ensure, ResultExt};
29use tokio::sync::RwLock;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32use tonic::Status;
33
34use crate::client::ask_leader::AskLeader;
35use crate::client::{util, Id, LeaderProviderRef};
36use crate::error;
37use crate::error::Result;
38
39#[derive(Clone, Debug)]
40pub struct Client {
41 inner: Arc<RwLock<Inner>>,
42}
43
44impl Client {
45 pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
46 let inner = Arc::new(RwLock::new(Inner {
47 id,
48 role,
49 channel_manager,
50 leader_provider: None,
51 max_retry,
52 }));
53
54 Self { inner }
55 }
56
57 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
58 where
59 U: AsRef<str>,
60 A: AsRef<[U]>,
61 {
62 let mut inner = self.inner.write().await;
63 inner.start(urls)
64 }
65
66 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
68 let mut inner = self.inner.write().await;
69 inner.start_with(leader_provider)
70 }
71
72 pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
73 let inner = self.inner.read().await;
74 inner.submit_ddl_task(req).await
75 }
76
77 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
79 let inner = self.inner.read().await;
80 inner.query_procedure_state(pid).await
81 }
82
83 pub async fn migrate_region(
89 &self,
90 region_id: u64,
91 from_peer: u64,
92 to_peer: u64,
93 timeout: Duration,
94 ) -> Result<MigrateRegionResponse> {
95 let inner = self.inner.read().await;
96 inner
97 .migrate_region(region_id, from_peer, to_peer, timeout)
98 .await
99 }
100
101 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
103 let inner = self.inner.read().await;
104 inner.reconcile(request).await
105 }
106
107 pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
108 let inner = self.inner.read().await;
109 inner.list_procedures().await
110 }
111}
112
113#[derive(Debug)]
114struct Inner {
115 id: Id,
116 role: Role,
117 channel_manager: ChannelManager,
118 leader_provider: Option<LeaderProviderRef>,
119 max_retry: usize,
120}
121
122impl Inner {
123 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
124 ensure!(
125 !self.is_started(),
126 error::IllegalGrpcClientStateSnafu {
127 err_msg: "DDL client already started",
128 }
129 );
130 self.leader_provider = Some(leader_provider);
131 Ok(())
132 }
133
134 fn start<U, A>(&mut self, urls: A) -> Result<()>
135 where
136 U: AsRef<str>,
137 A: AsRef<[U]>,
138 {
139 let peers = urls
140 .as_ref()
141 .iter()
142 .map(|url| url.as_ref().to_string())
143 .collect::<Vec<_>>();
144 let ask_leader = AskLeader::new(
145 self.id,
146 self.role,
147 peers,
148 self.channel_manager.clone(),
149 self.max_retry,
150 );
151 self.start_with(Arc::new(ask_leader))
152 }
153
154 fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
155 let channel = self
156 .channel_manager
157 .get(addr)
158 .context(error::CreateChannelSnafu)?;
159
160 Ok(ProcedureServiceClient::new(channel)
161 .accept_compressed(CompressionEncoding::Gzip)
162 .accept_compressed(CompressionEncoding::Zstd)
163 .send_compressed(CompressionEncoding::Zstd))
164 }
165
166 #[inline]
167 fn is_started(&self) -> bool {
168 self.leader_provider.is_some()
169 }
170
171 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
172 where
173 R: Future<Output = std::result::Result<T, Status>>,
174 F: Fn(ProcedureServiceClient<Channel>) -> R,
175 H: Fn(&T) -> &Option<ResponseHeader>,
176 {
177 let Some(leader_provider) = self.leader_provider.as_ref() else {
178 return error::IllegalGrpcClientStateSnafu {
179 err_msg: "not started",
180 }
181 .fail();
182 };
183
184 let mut times = 0;
185 let mut last_error = None;
186
187 while times < self.max_retry {
188 if let Some(leader) = &leader_provider.leader() {
189 let client = self.make_client(leader)?;
190 match body_fn(client).await {
191 Ok(res) => {
192 if util::is_not_leader(get_header(&res)) {
193 last_error = Some(format!("{leader} is not a leader"));
194 warn!("Failed to {task} to {leader}, not a leader");
195 let leader = leader_provider.ask_leader().await?;
196 info!("DDL client updated to new leader addr: {leader}");
197 times += 1;
198 continue;
199 }
200 return Ok(res);
201 }
202 Err(status) => {
203 if util::is_unreachable(&status) {
205 last_error = Some(status.to_string());
206 warn!("Failed to {task} to {leader}, source: {status}");
207 let leader = leader_provider.ask_leader().await?;
208 info!("Procedure client updated to new leader addr: {leader}");
209 times += 1;
210 continue;
211 } else {
212 error!("An error occurred in gRPC, status: {status}");
213 return Err(error::Error::from(status));
214 }
215 }
216 }
217 } else if let Err(err) = leader_provider.ask_leader().await {
218 return Err(err);
219 }
220 }
221
222 error::RetryTimesExceededSnafu {
223 msg: format!("Failed to {task}, last error: {:?}", last_error),
224 times: self.max_retry,
225 }
226 .fail()
227 }
228
229 async fn migrate_region(
230 &self,
231 region_id: u64,
232 from_peer: u64,
233 to_peer: u64,
234 timeout: Duration,
235 ) -> Result<MigrateRegionResponse> {
236 let mut req = MigrateRegionRequest {
237 region_id,
238 from_peer,
239 to_peer,
240 timeout_secs: timeout.as_secs() as u32,
241 ..Default::default()
242 };
243
244 req.set_header(
245 self.id,
246 self.role,
247 TracingContext::from_current_span().to_w3c(),
248 );
249
250 self.with_retry(
251 "migrate region",
252 move |mut client| {
253 let req = req.clone();
254
255 async move { client.migrate(req).await.map(|res| res.into_inner()) }
256 },
257 |resp: &MigrateRegionResponse| &resp.header,
258 )
259 .await
260 }
261
262 async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
263 let mut req = request;
264 req.set_header(
265 self.id,
266 self.role,
267 TracingContext::from_current_span().to_w3c(),
268 );
269
270 self.with_retry(
271 "reconcile",
272 move |mut client| {
273 let req = req.clone();
274
275 async move { client.reconcile(req).await.map(|res| res.into_inner()) }
276 },
277 |resp: &ReconcileResponse| &resp.header,
278 )
279 .await
280 }
281
282 async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
283 let mut req = QueryProcedureRequest {
284 pid: Some(ProcedureId { key: pid.into() }),
285 ..Default::default()
286 };
287
288 req.set_header(
289 self.id,
290 self.role,
291 TracingContext::from_current_span().to_w3c(),
292 );
293
294 self.with_retry(
295 "query procedure state",
296 move |mut client| {
297 let req = req.clone();
298
299 async move { client.query(req).await.map(|res| res.into_inner()) }
300 },
301 |resp: &ProcedureStateResponse| &resp.header,
302 )
303 .await
304 }
305
306 async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
307 req.set_header(
308 self.id,
309 self.role,
310 TracingContext::from_current_span().to_w3c(),
311 );
312
313 self.with_retry(
314 "submit ddl task",
315 move |mut client| {
316 let req = req.clone();
317 async move { client.ddl(req).await.map(|res| res.into_inner()) }
318 },
319 |resp: &DdlTaskResponse| &resp.header,
320 )
321 .await
322 }
323
324 async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
325 let mut req = ProcedureDetailRequest::default();
326 req.set_header(
327 self.id,
328 self.role,
329 TracingContext::from_current_span().to_w3c(),
330 );
331
332 self.with_retry(
333 "list procedure",
334 move |mut client| {
335 let req = req.clone();
336 async move { client.details(req).await.map(|res| res.into_inner()) }
337 },
338 |resp: &ProcedureDetailResponse| &resp.header,
339 )
340 .await
341 }
342}