1use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21 DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest,
22 GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest,
23 ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest,
24 ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role,
25};
26use common_grpc::channel_manager::ChannelManager;
27use common_meta::rpc::procedure::{
28 GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
29 GcTableRequest as MetaGcTableRequest,
30};
31use common_telemetry::tracing_context::TracingContext;
32use common_telemetry::{error, info, warn};
33use snafu::{ResultExt, ensure};
34use tokio::sync::RwLock;
35use tonic::codec::CompressionEncoding;
36use tonic::transport::Channel;
37use tonic::{Request, Status};
38
39use crate::client::{Id, LeaderProviderRef, util};
40use crate::error;
41use crate::error::Result;
42
43#[derive(Clone, Debug)]
44pub struct Client {
45 inner: Arc<RwLock<Inner>>,
46}
47
48impl Client {
49 pub fn new(
50 id: Id,
51 role: Role,
52 channel_manager: ChannelManager,
53 max_retry: usize,
54 timeout: Duration,
55 ) -> Self {
56 let inner = Arc::new(RwLock::new(Inner {
57 id,
58 role,
59 channel_manager,
60 leader_provider: None,
61 max_retry,
62 timeout,
63 }));
64
65 Self { inner }
66 }
67
68 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
70 let mut inner = self.inner.write().await;
71 inner.start_with(leader_provider)
72 }
73
74 pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
75 let inner = self.inner.read().await;
76 inner.submit_ddl_task(req).await
77 }
78
79 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
81 let inner = self.inner.read().await;
82 inner.query_procedure_state(pid).await
83 }
84
85 pub async fn migrate_region(
91 &self,
92 region_id: u64,
93 from_peer: u64,
94 to_peer: u64,
95 timeout: Duration,
96 ) -> Result<MigrateRegionResponse> {
97 let inner = self.inner.read().await;
98 inner
99 .migrate_region(region_id, from_peer, to_peer, timeout)
100 .await
101 }
102
103 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
105 let inner = self.inner.read().await;
106 inner.reconcile(request).await
107 }
108
109 pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
110 let inner = self.inner.read().await;
111 inner.list_procedures().await
112 }
113
114 pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
115 let inner = self.inner.read().await;
116 inner.gc_regions(request).await
117 }
118
119 pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
120 let inner = self.inner.read().await;
121 inner.gc_table(request).await
122 }
123}
124
125#[derive(Debug)]
126struct Inner {
127 id: Id,
128 role: Role,
129 channel_manager: ChannelManager,
130 leader_provider: Option<LeaderProviderRef>,
131 max_retry: usize,
132 timeout: Duration,
134}
135
136impl Inner {
137 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
138 ensure!(
139 !self.is_started(),
140 error::IllegalGrpcClientStateSnafu {
141 err_msg: "DDL client already started",
142 }
143 );
144 self.leader_provider = Some(leader_provider);
145 Ok(())
146 }
147
148 fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
149 let channel = self
150 .channel_manager
151 .get(addr)
152 .context(error::CreateChannelSnafu)?;
153
154 Ok(ProcedureServiceClient::new(channel)
155 .accept_compressed(CompressionEncoding::Gzip)
156 .accept_compressed(CompressionEncoding::Zstd)
157 .send_compressed(CompressionEncoding::Zstd))
158 }
159
160 #[inline]
161 fn is_started(&self) -> bool {
162 self.leader_provider.is_some()
163 }
164
165 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
166 where
167 R: Future<Output = std::result::Result<T, Status>>,
168 F: Fn(ProcedureServiceClient<Channel>) -> R,
169 H: Fn(&T) -> &Option<ResponseHeader>,
170 {
171 let Some(leader_provider) = self.leader_provider.as_ref() else {
172 return error::IllegalGrpcClientStateSnafu {
173 err_msg: "not started",
174 }
175 .fail();
176 };
177
178 let mut times = 0;
179 let mut last_error = None;
180
181 while times < self.max_retry {
182 if let Some(leader) = &leader_provider.leader() {
183 let client = self.make_client(leader)?;
184 match body_fn(client).await {
185 Ok(res) => {
186 if util::is_not_leader(get_header(&res)) {
187 last_error = Some(format!("{leader} is not a leader"));
188 warn!("Failed to {task} to {leader}, not a leader");
189 let leader = leader_provider.ask_leader().await?;
190 info!("DDL client updated to new leader addr: {leader}");
191 times += 1;
192 continue;
193 }
194 return Ok(res);
195 }
196 Err(status) => {
197 if util::is_unreachable(&status) {
199 last_error = Some(status.to_string());
200 warn!("Failed to {task} to {leader}, source: {status}");
201 let leader = leader_provider.ask_leader().await?;
202 info!("Procedure client updated to new leader addr: {leader}");
203 times += 1;
204 continue;
205 } else {
206 error!("An error occurred in gRPC, status: {status:?}");
207 return Err(error::Error::from(status));
208 }
209 }
210 }
211 } else {
212 leader_provider.ask_leader().await?;
213 }
214 }
215
216 error::RetryTimesExceededSnafu {
217 msg: format!("Failed to {task}, last error: {:?}", last_error),
218 times: self.max_retry,
219 }
220 .fail()
221 }
222
223 async fn migrate_region(
224 &self,
225 region_id: u64,
226 from_peer: u64,
227 to_peer: u64,
228 timeout: Duration,
229 ) -> Result<MigrateRegionResponse> {
230 let mut req = MigrateRegionRequest {
231 region_id,
232 from_peer,
233 to_peer,
234 timeout_secs: timeout.as_secs() as u32,
235 ..Default::default()
236 };
237
238 req.set_header(
239 self.id,
240 self.role,
241 TracingContext::from_current_span().to_w3c(),
242 );
243
244 self.with_retry(
245 "migrate region",
246 move |mut client| {
247 let mut req = Request::new(req.clone());
248 req.set_timeout(self.timeout);
249
250 async move { client.migrate(req).await.map(|res| res.into_inner()) }
251 },
252 |resp: &MigrateRegionResponse| &resp.header,
253 )
254 .await
255 }
256
257 async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
258 let mut req = request;
259 req.set_header(
260 self.id,
261 self.role,
262 TracingContext::from_current_span().to_w3c(),
263 );
264
265 self.with_retry(
266 "reconcile",
267 move |mut client| {
268 let mut req = Request::new(req.clone());
269 req.set_timeout(self.timeout);
270
271 async move { client.reconcile(req).await.map(|res| res.into_inner()) }
272 },
273 |resp: &ReconcileResponse| &resp.header,
274 )
275 .await
276 }
277
278 async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
279 let timeout = request.timeout;
280 let req = GcRegionsRequest {
281 header: Some(RequestHeader {
282 protocol_version: 0,
283 member_id: self.id,
284 role: self.role as i32,
285 tracing_context: TracingContext::from_current_span().to_w3c(),
286 }),
287 region_ids: request.region_ids,
288 full_file_listing: request.full_file_listing,
289 timeout_secs: gc_timeout_secs(timeout),
290 };
291
292 let resp: GcRegionsResponse = self
293 .with_retry(
294 "gc_regions",
295 move |mut client| {
296 let mut req = Request::new(req.clone());
297 if let Some(timeout) = timeout {
298 req.set_timeout(timeout);
299 }
300 async move { client.gc_regions(req).await.map(|res| res.into_inner()) }
301 },
302 |resp: &GcRegionsResponse| &resp.header,
303 )
304 .await?;
305
306 let stats = resp.stats.unwrap_or_default();
307 Ok(MetaGcResponse {
308 processed_regions: stats.processed_regions,
309 need_retry_regions: stats.need_retry_regions,
310 deleted_files: stats.deleted_files,
311 deleted_indexes: stats.deleted_indexes,
312 })
313 }
314
315 async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
316 let timeout = request.timeout;
317 let req = GcTableRequest {
318 header: Some(RequestHeader {
319 protocol_version: 0,
320 member_id: self.id,
321 role: self.role as i32,
322 tracing_context: TracingContext::from_current_span().to_w3c(),
323 }),
324 catalog_name: request.catalog_name,
325 schema_name: request.schema_name,
326 table_name: request.table_name,
327 full_file_listing: request.full_file_listing,
328 timeout_secs: gc_timeout_secs(timeout),
329 };
330
331 let resp: GcTableResponse = self
332 .with_retry(
333 "gc_table",
334 move |mut client| {
335 let mut req = Request::new(req.clone());
336 if let Some(timeout) = timeout {
337 req.set_timeout(timeout);
338 }
339 async move { client.gc_table(req).await.map(|res| res.into_inner()) }
340 },
341 |resp: &GcTableResponse| &resp.header,
342 )
343 .await?;
344
345 let stats = resp.stats.unwrap_or_default();
346 Ok(MetaGcResponse {
347 processed_regions: stats.processed_regions,
348 need_retry_regions: stats.need_retry_regions,
349 deleted_files: stats.deleted_files,
350 deleted_indexes: stats.deleted_indexes,
351 })
352 }
353
354 async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
355 let mut req = QueryProcedureRequest {
356 pid: Some(ProcedureId { key: pid.into() }),
357 ..Default::default()
358 };
359
360 req.set_header(
361 self.id,
362 self.role,
363 TracingContext::from_current_span().to_w3c(),
364 );
365
366 self.with_retry(
367 "query procedure state",
368 move |mut client| {
369 let mut req = Request::new(req.clone());
370 req.set_timeout(self.timeout);
371
372 async move { client.query(req).await.map(|res| res.into_inner()) }
373 },
374 |resp: &ProcedureStateResponse| &resp.header,
375 )
376 .await
377 }
378
379 async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
380 req.set_header(
381 self.id,
382 self.role,
383 TracingContext::from_current_span().to_w3c(),
384 );
385 let timeout = Duration::from_secs(req.timeout_secs.into());
386
387 self.with_retry(
388 "submit ddl task",
389 move |mut client| {
390 let mut req = Request::new(req.clone());
391 req.set_timeout(timeout);
392 async move { client.ddl(req).await.map(|res| res.into_inner()) }
393 },
394 |resp: &DdlTaskResponse| &resp.header,
395 )
396 .await
397 }
398
399 async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
400 let mut req = ProcedureDetailRequest::default();
401 req.set_header(
402 self.id,
403 self.role,
404 TracingContext::from_current_span().to_w3c(),
405 );
406
407 self.with_retry(
408 "list procedure",
409 move |mut client| {
410 let mut req = Request::new(req.clone());
411 req.set_timeout(self.timeout);
412 async move { client.details(req).await.map(|res| res.into_inner()) }
413 },
414 |resp: &ProcedureDetailResponse| &resp.header,
415 )
416 .await
417 }
418}
419
420fn gc_timeout_secs(timeout: Option<Duration>) -> u32 {
421 timeout
422 .map(|timeout| timeout.as_secs().max(1).try_into().unwrap_or(u32::MAX))
423 .unwrap_or(0)
424}
425
426#[cfg(test)]
427mod tests {
428 use std::time::{Duration, Instant};
429
430 use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer};
431 use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer};
432 use api::v1::meta::{
433 AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest,
434 GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse,
435 MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest,
436 ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest,
437 ReconcileResponse, ResponseHeader, Role,
438 };
439 use async_trait::async_trait;
440 use common_error::status_code::StatusCode;
441 use common_meta::rpc::ddl::{
442 CommentObjectType, CommentOnTask, DdlTask, QueryContext, SubmitDdlTaskRequest,
443 };
444 use common_telemetry::common_error::ext::ErrorExt;
445 use common_telemetry::info;
446 use tokio::net::TcpListener;
447 use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
448 use tonic::codec::CompressionEncoding;
449 use tonic::{Request, Response, Status};
450
451 use super::gc_timeout_secs;
452 use crate::client::MetaClientBuilder;
453
454 #[test]
455 fn test_gc_timeout_secs() {
456 assert_eq!(gc_timeout_secs(None), 0);
457 assert_eq!(gc_timeout_secs(Some(Duration::from_millis(1))), 1);
458 assert_eq!(gc_timeout_secs(Some(Duration::from_millis(999))), 1);
459 assert_eq!(gc_timeout_secs(Some(Duration::from_secs(1))), 1);
460 assert_eq!(gc_timeout_secs(Some(Duration::from_secs(10))), 10);
461 }
462
463 #[derive(Clone)]
464 struct MockHeartbeat {
465 leader_addr: String,
466 }
467
468 #[async_trait]
469 impl Heartbeat for MockHeartbeat {
470 type HeartbeatStream = ReceiverStream<Result<HeartbeatResponse, Status>>;
471
472 async fn heartbeat(
473 &self,
474 _request: Request<tonic::Streaming<HeartbeatRequest>>,
475 ) -> Result<Response<Self::HeartbeatStream>, Status> {
476 Err(Status::unimplemented(
477 "heartbeat stream is not used in this test",
478 ))
479 }
480
481 async fn ask_leader(
482 &self,
483 _request: Request<AskLeaderRequest>,
484 ) -> Result<Response<AskLeaderResponse>, Status> {
485 Ok(Response::new(AskLeaderResponse {
486 header: Some(ResponseHeader {
487 protocol_version: 0,
488 error: None,
489 }),
490 leader: Some(Peer {
491 id: 1,
492 addr: self.leader_addr.clone(),
493 }),
494 }))
495 }
496 }
497
498 #[derive(Clone)]
499 struct MockProcedure {
500 delay: Duration,
501 }
502
503 #[async_trait]
504 impl ProcedureService for MockProcedure {
505 async fn query(
506 &self,
507 _request: Request<QueryProcedureRequest>,
508 ) -> Result<Response<ProcedureStateResponse>, Status> {
509 Err(Status::unimplemented("query is not used in this test"))
510 }
511
512 async fn ddl(
513 &self,
514 _request: Request<DdlTaskRequest>,
515 ) -> Result<Response<DdlTaskResponse>, Status> {
516 tokio::time::sleep(self.delay).await;
517 Ok(Response::new(DdlTaskResponse {
518 header: Some(ResponseHeader {
519 protocol_version: 0,
520 error: None,
521 }),
522 ..Default::default()
523 }))
524 }
525
526 async fn reconcile(
527 &self,
528 _request: Request<ReconcileRequest>,
529 ) -> Result<Response<ReconcileResponse>, Status> {
530 Err(Status::unimplemented("reconcile is not used in this test"))
531 }
532
533 async fn migrate(
534 &self,
535 _request: Request<MigrateRegionRequest>,
536 ) -> Result<Response<MigrateRegionResponse>, Status> {
537 Err(Status::unimplemented("migrate is not used in this test"))
538 }
539
540 async fn details(
541 &self,
542 _request: Request<ProcedureDetailRequest>,
543 ) -> Result<Response<ProcedureDetailResponse>, Status> {
544 Err(Status::unimplemented("details is not used in this test"))
545 }
546
547 async fn gc_regions(
548 &self,
549 _request: Request<GcRegionsRequest>,
550 ) -> Result<Response<GcRegionsResponse>, Status> {
551 Err(Status::unimplemented("gc_regions is not used in this test"))
552 }
553
554 async fn gc_table(
555 &self,
556 _request: Request<GcTableRequest>,
557 ) -> Result<Response<GcTableResponse>, Status> {
558 Err(Status::unimplemented("gc_table is not used in this test"))
559 }
560 }
561
562 #[tokio::test(flavor = "multi_thread")]
563 async fn test_meta_client_ddl_request_timeout() {
564 common_telemetry::init_default_ut_logging();
565
566 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
567 let addr = listener.local_addr().unwrap();
568 let addr_str = addr.to_string();
569
570 let heartbeat = MockHeartbeat {
571 leader_addr: addr_str.clone(),
572 };
573 let procedure = MockProcedure {
574 delay: Duration::from_secs(4),
575 };
576
577 let server = tonic::transport::Server::builder()
578 .add_service(
579 HeartbeatServer::new(heartbeat).accept_compressed(CompressionEncoding::Zstd),
580 )
581 .add_service(
582 ProcedureServiceServer::new(procedure).accept_compressed(CompressionEncoding::Zstd),
583 )
584 .serve_with_incoming(TcpListenerStream::new(listener));
585 let server_handle = tokio::spawn(server);
586
587 let mut client = MetaClientBuilder::new(0, Role::Frontend)
588 .enable_heartbeat()
589 .enable_procedure()
590 .build();
591 client.start(&[addr_str.as_str()]).await.unwrap();
592
593 let mut request = SubmitDdlTaskRequest::new(
594 QueryContext::default(),
595 DdlTask::new_comment_on(CommentOnTask {
596 catalog_name: "greptime".to_string(),
597 schema_name: "public".to_string(),
598 object_type: CommentObjectType::Table,
599 object_name: "test_table".to_string(),
600 column_name: None,
601 object_id: None,
602 comment: Some("timeout".to_string()),
603 }),
604 );
605 request.timeout = Duration::from_secs(1);
606
607 let now = Instant::now();
608 let err = client.submit_ddl_task(request).await.unwrap_err();
609 let elapsed = now.elapsed();
610 assert!(elapsed < Duration::from_secs(2));
612 info!("err: {err:?}, code: {}", err.status_code());
613 assert_eq!(err.status_code(), StatusCode::Cancelled);
614 let err_msg = err.to_string();
615 assert!(
616 err_msg.contains("Timeout expired"),
617 "unexpected error: {err_msg}"
618 );
619
620 server_handle.abort();
621 }
622}