Skip to main content

meta_client/client/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::procedure_service_client::ProcedureServiceClient;
20use api::v1::meta::{
21    DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest,
22    GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest,
23    ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest,
24    ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role,
25};
26use common_grpc::channel_manager::ChannelManager;
27use common_meta::rpc::procedure::{
28    GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
29    GcTableRequest as MetaGcTableRequest,
30};
31use common_telemetry::tracing_context::TracingContext;
32use common_telemetry::{error, info, warn};
33use snafu::{ResultExt, ensure};
34use tokio::sync::RwLock;
35use tonic::codec::CompressionEncoding;
36use tonic::transport::Channel;
37use tonic::{Request, Status};
38
39use crate::client::{Id, LeaderProviderRef, util};
40use crate::error;
41use crate::error::Result;
42
43#[derive(Clone, Debug)]
44pub struct Client {
45    inner: Arc<RwLock<Inner>>,
46}
47
48impl Client {
49    pub fn new(
50        id: Id,
51        role: Role,
52        channel_manager: ChannelManager,
53        max_retry: usize,
54        timeout: Duration,
55    ) -> Self {
56        let inner = Arc::new(RwLock::new(Inner {
57            id,
58            role,
59            channel_manager,
60            leader_provider: None,
61            max_retry,
62            timeout,
63        }));
64
65        Self { inner }
66    }
67
68    /// Start the client with a [LeaderProvider].
69    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
70        let mut inner = self.inner.write().await;
71        inner.start_with(leader_provider)
72    }
73
74    pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
75        let inner = self.inner.read().await;
76        inner.submit_ddl_task(req).await
77    }
78
79    /// Query the procedure' state by its id
80    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
81        let inner = self.inner.read().await;
82        inner.query_procedure_state(pid).await
83    }
84
85    /// Migrate the region from one datanode to the other datanode:
86    /// - `region_id`:  the migrated region id
87    /// - `from_peer`:  the source datanode id
88    /// - `to_peer`:  the target datanode id
89    /// - `timeout`: timeout for downgrading region and upgrading region operations
90    pub async fn migrate_region(
91        &self,
92        region_id: u64,
93        from_peer: u64,
94        to_peer: u64,
95        timeout: Duration,
96    ) -> Result<MigrateRegionResponse> {
97        let inner = self.inner.read().await;
98        inner
99            .migrate_region(region_id, from_peer, to_peer, timeout)
100            .await
101    }
102
103    /// Reconcile the procedure state.
104    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
105        let inner = self.inner.read().await;
106        inner.reconcile(request).await
107    }
108
109    pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
110        let inner = self.inner.read().await;
111        inner.list_procedures().await
112    }
113
114    pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
115        let inner = self.inner.read().await;
116        inner.gc_regions(request).await
117    }
118
119    pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
120        let inner = self.inner.read().await;
121        inner.gc_table(request).await
122    }
123}
124
125#[derive(Debug)]
126struct Inner {
127    id: Id,
128    role: Role,
129    channel_manager: ChannelManager,
130    leader_provider: Option<LeaderProviderRef>,
131    max_retry: usize,
132    /// Request timeout.
133    timeout: Duration,
134}
135
136impl Inner {
137    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
138        ensure!(
139            !self.is_started(),
140            error::IllegalGrpcClientStateSnafu {
141                err_msg: "DDL client already started",
142            }
143        );
144        self.leader_provider = Some(leader_provider);
145        Ok(())
146    }
147
148    fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
149        let channel = self
150            .channel_manager
151            .get(addr)
152            .context(error::CreateChannelSnafu)?;
153
154        Ok(ProcedureServiceClient::new(channel)
155            .accept_compressed(CompressionEncoding::Gzip)
156            .accept_compressed(CompressionEncoding::Zstd)
157            .send_compressed(CompressionEncoding::Zstd))
158    }
159
160    #[inline]
161    fn is_started(&self) -> bool {
162        self.leader_provider.is_some()
163    }
164
165    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
166    where
167        R: Future<Output = std::result::Result<T, Status>>,
168        F: Fn(ProcedureServiceClient<Channel>) -> R,
169        H: Fn(&T) -> &Option<ResponseHeader>,
170    {
171        let Some(leader_provider) = self.leader_provider.as_ref() else {
172            return error::IllegalGrpcClientStateSnafu {
173                err_msg: "not started",
174            }
175            .fail();
176        };
177
178        let mut times = 0;
179        let mut last_error = None;
180
181        while times < self.max_retry {
182            if let Some(leader) = &leader_provider.leader() {
183                let client = self.make_client(leader)?;
184                match body_fn(client).await {
185                    Ok(res) => {
186                        if util::is_not_leader(get_header(&res)) {
187                            last_error = Some(format!("{leader} is not a leader"));
188                            warn!("Failed to {task} to {leader}, not a leader");
189                            let leader = leader_provider.ask_leader().await?;
190                            info!("DDL client updated to new leader addr: {leader}");
191                            times += 1;
192                            continue;
193                        }
194                        return Ok(res);
195                    }
196                    Err(status) => {
197                        // The leader may be unreachable.
198                        if util::is_unreachable(&status) {
199                            last_error = Some(status.to_string());
200                            warn!("Failed to {task} to {leader}, source: {status}");
201                            let leader = leader_provider.ask_leader().await?;
202                            info!("Procedure client updated to new leader addr: {leader}");
203                            times += 1;
204                            continue;
205                        } else {
206                            error!("An error occurred in gRPC, status: {status:?}");
207                            return Err(error::Error::from(status));
208                        }
209                    }
210                }
211            } else {
212                leader_provider.ask_leader().await?;
213            }
214        }
215
216        error::RetryTimesExceededSnafu {
217            msg: format!("Failed to {task}, last error: {:?}", last_error),
218            times: self.max_retry,
219        }
220        .fail()
221    }
222
223    async fn migrate_region(
224        &self,
225        region_id: u64,
226        from_peer: u64,
227        to_peer: u64,
228        timeout: Duration,
229    ) -> Result<MigrateRegionResponse> {
230        let mut req = MigrateRegionRequest {
231            region_id,
232            from_peer,
233            to_peer,
234            timeout_secs: timeout.as_secs() as u32,
235            ..Default::default()
236        };
237
238        req.set_header(
239            self.id,
240            self.role,
241            TracingContext::from_current_span().to_w3c(),
242        );
243
244        self.with_retry(
245            "migrate region",
246            move |mut client| {
247                let mut req = Request::new(req.clone());
248                req.set_timeout(self.timeout);
249
250                async move { client.migrate(req).await.map(|res| res.into_inner()) }
251            },
252            |resp: &MigrateRegionResponse| &resp.header,
253        )
254        .await
255    }
256
257    async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
258        let mut req = request;
259        req.set_header(
260            self.id,
261            self.role,
262            TracingContext::from_current_span().to_w3c(),
263        );
264
265        self.with_retry(
266            "reconcile",
267            move |mut client| {
268                let mut req = Request::new(req.clone());
269                req.set_timeout(self.timeout);
270
271                async move { client.reconcile(req).await.map(|res| res.into_inner()) }
272            },
273            |resp: &ReconcileResponse| &resp.header,
274        )
275        .await
276    }
277
278    async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
279        let timeout = request.timeout;
280        let req = GcRegionsRequest {
281            header: Some(RequestHeader {
282                protocol_version: 0,
283                member_id: self.id,
284                role: self.role as i32,
285                tracing_context: TracingContext::from_current_span().to_w3c(),
286            }),
287            region_ids: request.region_ids,
288            full_file_listing: request.full_file_listing,
289            timeout_secs: gc_timeout_secs(timeout),
290        };
291
292        let resp: GcRegionsResponse = self
293            .with_retry(
294                "gc_regions",
295                move |mut client| {
296                    let mut req = Request::new(req.clone());
297                    if let Some(timeout) = timeout {
298                        req.set_timeout(timeout);
299                    }
300                    async move { client.gc_regions(req).await.map(|res| res.into_inner()) }
301                },
302                |resp: &GcRegionsResponse| &resp.header,
303            )
304            .await?;
305
306        let stats = resp.stats.unwrap_or_default();
307        Ok(MetaGcResponse {
308            processed_regions: stats.processed_regions,
309            need_retry_regions: stats.need_retry_regions,
310            deleted_files: stats.deleted_files,
311            deleted_indexes: stats.deleted_indexes,
312        })
313    }
314
315    async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
316        let timeout = request.timeout;
317        let req = GcTableRequest {
318            header: Some(RequestHeader {
319                protocol_version: 0,
320                member_id: self.id,
321                role: self.role as i32,
322                tracing_context: TracingContext::from_current_span().to_w3c(),
323            }),
324            catalog_name: request.catalog_name,
325            schema_name: request.schema_name,
326            table_name: request.table_name,
327            full_file_listing: request.full_file_listing,
328            timeout_secs: gc_timeout_secs(timeout),
329        };
330
331        let resp: GcTableResponse = self
332            .with_retry(
333                "gc_table",
334                move |mut client| {
335                    let mut req = Request::new(req.clone());
336                    if let Some(timeout) = timeout {
337                        req.set_timeout(timeout);
338                    }
339                    async move { client.gc_table(req).await.map(|res| res.into_inner()) }
340                },
341                |resp: &GcTableResponse| &resp.header,
342            )
343            .await?;
344
345        let stats = resp.stats.unwrap_or_default();
346        Ok(MetaGcResponse {
347            processed_regions: stats.processed_regions,
348            need_retry_regions: stats.need_retry_regions,
349            deleted_files: stats.deleted_files,
350            deleted_indexes: stats.deleted_indexes,
351        })
352    }
353
354    async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
355        let mut req = QueryProcedureRequest {
356            pid: Some(ProcedureId { key: pid.into() }),
357            ..Default::default()
358        };
359
360        req.set_header(
361            self.id,
362            self.role,
363            TracingContext::from_current_span().to_w3c(),
364        );
365
366        self.with_retry(
367            "query procedure state",
368            move |mut client| {
369                let mut req = Request::new(req.clone());
370                req.set_timeout(self.timeout);
371
372                async move { client.query(req).await.map(|res| res.into_inner()) }
373            },
374            |resp: &ProcedureStateResponse| &resp.header,
375        )
376        .await
377    }
378
379    async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
380        req.set_header(
381            self.id,
382            self.role,
383            TracingContext::from_current_span().to_w3c(),
384        );
385        let timeout = Duration::from_secs(req.timeout_secs.into());
386
387        self.with_retry(
388            "submit ddl task",
389            move |mut client| {
390                let mut req = Request::new(req.clone());
391                req.set_timeout(timeout);
392                async move { client.ddl(req).await.map(|res| res.into_inner()) }
393            },
394            |resp: &DdlTaskResponse| &resp.header,
395        )
396        .await
397    }
398
399    async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
400        let mut req = ProcedureDetailRequest::default();
401        req.set_header(
402            self.id,
403            self.role,
404            TracingContext::from_current_span().to_w3c(),
405        );
406
407        self.with_retry(
408            "list procedure",
409            move |mut client| {
410                let mut req = Request::new(req.clone());
411                req.set_timeout(self.timeout);
412                async move { client.details(req).await.map(|res| res.into_inner()) }
413            },
414            |resp: &ProcedureDetailResponse| &resp.header,
415        )
416        .await
417    }
418}
419
420fn gc_timeout_secs(timeout: Option<Duration>) -> u32 {
421    timeout
422        .map(|timeout| timeout.as_secs().max(1).try_into().unwrap_or(u32::MAX))
423        .unwrap_or(0)
424}
425
426#[cfg(test)]
427mod tests {
428    use std::time::{Duration, Instant};
429
430    use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer};
431    use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer};
432    use api::v1::meta::{
433        AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest,
434        GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse,
435        MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest,
436        ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest,
437        ReconcileResponse, ResponseHeader, Role,
438    };
439    use async_trait::async_trait;
440    use common_error::status_code::StatusCode;
441    use common_meta::rpc::ddl::{
442        CommentObjectType, CommentOnTask, DdlTask, QueryContext, SubmitDdlTaskRequest,
443    };
444    use common_telemetry::common_error::ext::ErrorExt;
445    use common_telemetry::info;
446    use tokio::net::TcpListener;
447    use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
448    use tonic::codec::CompressionEncoding;
449    use tonic::{Request, Response, Status};
450
451    use super::gc_timeout_secs;
452    use crate::client::MetaClientBuilder;
453
454    #[test]
455    fn test_gc_timeout_secs() {
456        assert_eq!(gc_timeout_secs(None), 0);
457        assert_eq!(gc_timeout_secs(Some(Duration::from_millis(1))), 1);
458        assert_eq!(gc_timeout_secs(Some(Duration::from_millis(999))), 1);
459        assert_eq!(gc_timeout_secs(Some(Duration::from_secs(1))), 1);
460        assert_eq!(gc_timeout_secs(Some(Duration::from_secs(10))), 10);
461    }
462
463    #[derive(Clone)]
464    struct MockHeartbeat {
465        leader_addr: String,
466    }
467
468    #[async_trait]
469    impl Heartbeat for MockHeartbeat {
470        type HeartbeatStream = ReceiverStream<Result<HeartbeatResponse, Status>>;
471
472        async fn heartbeat(
473            &self,
474            _request: Request<tonic::Streaming<HeartbeatRequest>>,
475        ) -> Result<Response<Self::HeartbeatStream>, Status> {
476            Err(Status::unimplemented(
477                "heartbeat stream is not used in this test",
478            ))
479        }
480
481        async fn ask_leader(
482            &self,
483            _request: Request<AskLeaderRequest>,
484        ) -> Result<Response<AskLeaderResponse>, Status> {
485            Ok(Response::new(AskLeaderResponse {
486                header: Some(ResponseHeader {
487                    protocol_version: 0,
488                    error: None,
489                }),
490                leader: Some(Peer {
491                    id: 1,
492                    addr: self.leader_addr.clone(),
493                }),
494            }))
495        }
496    }
497
498    #[derive(Clone)]
499    struct MockProcedure {
500        delay: Duration,
501    }
502
503    #[async_trait]
504    impl ProcedureService for MockProcedure {
505        async fn query(
506            &self,
507            _request: Request<QueryProcedureRequest>,
508        ) -> Result<Response<ProcedureStateResponse>, Status> {
509            Err(Status::unimplemented("query is not used in this test"))
510        }
511
512        async fn ddl(
513            &self,
514            _request: Request<DdlTaskRequest>,
515        ) -> Result<Response<DdlTaskResponse>, Status> {
516            tokio::time::sleep(self.delay).await;
517            Ok(Response::new(DdlTaskResponse {
518                header: Some(ResponseHeader {
519                    protocol_version: 0,
520                    error: None,
521                }),
522                ..Default::default()
523            }))
524        }
525
526        async fn reconcile(
527            &self,
528            _request: Request<ReconcileRequest>,
529        ) -> Result<Response<ReconcileResponse>, Status> {
530            Err(Status::unimplemented("reconcile is not used in this test"))
531        }
532
533        async fn migrate(
534            &self,
535            _request: Request<MigrateRegionRequest>,
536        ) -> Result<Response<MigrateRegionResponse>, Status> {
537            Err(Status::unimplemented("migrate is not used in this test"))
538        }
539
540        async fn details(
541            &self,
542            _request: Request<ProcedureDetailRequest>,
543        ) -> Result<Response<ProcedureDetailResponse>, Status> {
544            Err(Status::unimplemented("details is not used in this test"))
545        }
546
547        async fn gc_regions(
548            &self,
549            _request: Request<GcRegionsRequest>,
550        ) -> Result<Response<GcRegionsResponse>, Status> {
551            Err(Status::unimplemented("gc_regions is not used in this test"))
552        }
553
554        async fn gc_table(
555            &self,
556            _request: Request<GcTableRequest>,
557        ) -> Result<Response<GcTableResponse>, Status> {
558            Err(Status::unimplemented("gc_table is not used in this test"))
559        }
560    }
561
562    #[tokio::test(flavor = "multi_thread")]
563    async fn test_meta_client_ddl_request_timeout() {
564        common_telemetry::init_default_ut_logging();
565
566        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
567        let addr = listener.local_addr().unwrap();
568        let addr_str = addr.to_string();
569
570        let heartbeat = MockHeartbeat {
571            leader_addr: addr_str.clone(),
572        };
573        let procedure = MockProcedure {
574            delay: Duration::from_secs(4),
575        };
576
577        let server = tonic::transport::Server::builder()
578            .add_service(
579                HeartbeatServer::new(heartbeat).accept_compressed(CompressionEncoding::Zstd),
580            )
581            .add_service(
582                ProcedureServiceServer::new(procedure).accept_compressed(CompressionEncoding::Zstd),
583            )
584            .serve_with_incoming(TcpListenerStream::new(listener));
585        let server_handle = tokio::spawn(server);
586
587        let mut client = MetaClientBuilder::new(0, Role::Frontend)
588            .enable_heartbeat()
589            .enable_procedure()
590            .build();
591        client.start(&[addr_str.as_str()]).await.unwrap();
592
593        let mut request = SubmitDdlTaskRequest::new(
594            QueryContext::default(),
595            DdlTask::new_comment_on(CommentOnTask {
596                catalog_name: "greptime".to_string(),
597                schema_name: "public".to_string(),
598                object_type: CommentObjectType::Table,
599                object_name: "test_table".to_string(),
600                column_name: None,
601                object_id: None,
602                comment: Some("timeout".to_string()),
603            }),
604        );
605        request.timeout = Duration::from_secs(1);
606
607        let now = Instant::now();
608        let err = client.submit_ddl_task(request).await.unwrap_err();
609        let elapsed = now.elapsed();
610        // The request should be cancelled within 1 second.
611        assert!(elapsed < Duration::from_secs(2));
612        info!("err: {err:?}, code: {}", err.status_code());
613        assert_eq!(err.status_code(), StatusCode::Cancelled);
614        let err_msg = err.to_string();
615        assert!(
616            err_msg.contains("Timeout expired"),
617            "unexpected error: {err_msg}"
618        );
619
620        server_handle.abort();
621    }
622}