meta_client/client/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25    Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31    RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ensure, ResultExt};
35use tokio::sync::RwLock;
36use tonic::codec::CompressionEncoding;
37use tonic::transport::Channel;
38use tonic::Status;
39
40use crate::client::ask_leader::AskLeader;
41use crate::client::{util, Id, LeaderProviderRef};
42use crate::error::{
43    ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
44    ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
45};
46
47#[derive(Clone, Debug)]
48pub struct Client {
49    inner: Arc<RwLock<Inner>>,
50}
51
52impl Client {
53    pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
54        let inner = Arc::new(RwLock::new(Inner {
55            id,
56            role,
57            channel_manager,
58            leader_provider: None,
59            max_retry,
60        }));
61
62        Self { inner }
63    }
64
65    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
66    where
67        U: AsRef<str>,
68        A: AsRef<[U]>,
69    {
70        let mut inner = self.inner.write().await;
71        inner.start(urls)
72    }
73
74    /// Start the client with a [LeaderProvider].
75    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
76        let mut inner = self.inner.write().await;
77        inner.start_with(leader_provider)
78    }
79
80    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
81        let inner = self.inner.read().await;
82        inner.range(req).await
83    }
84
85    #[allow(dead_code)]
86    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
87        let inner = self.inner.read().await;
88        inner.batch_get(req).await
89    }
90
91    pub async fn get_metasrv_peers(
92        &self,
93    ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
94        let inner = self.inner.read().await;
95        inner.get_metasrv_peers().await
96    }
97}
98
99impl TxnService for Client {
100    type Error = MetaError;
101}
102
103#[async_trait::async_trait]
104impl KvBackend for Client {
105    fn name(&self) -> &str {
106        "ClusterClientKvBackend"
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112
113    async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
114        let resp = self.range(req).await;
115        match resp {
116            Ok(resp) => Ok(resp),
117            Err(err) if err.is_exceeded_size_limit() => {
118                Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
119            }
120            Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
121        }
122    }
123
124    async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
125        unimplemented!("`put` is not supported in cluster client kv backend")
126    }
127
128    async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
129        unimplemented!("`batch_put` is not supported in cluster client kv backend")
130    }
131
132    async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
133        self.batch_get(req)
134            .await
135            .map_err(BoxedError::new)
136            .context(ExternalSnafu)
137    }
138
139    async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
140        unimplemented!("`delete_range` is not supported in cluster client kv backend")
141    }
142
143    async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
144        unimplemented!("`batch_delete` is not supported in cluster client kv backend")
145    }
146}
147
148#[derive(Debug)]
149struct Inner {
150    id: Id,
151    role: Role,
152    channel_manager: ChannelManager,
153    leader_provider: Option<LeaderProviderRef>,
154    max_retry: usize,
155}
156
157impl Inner {
158    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
159        ensure!(
160            !self.is_started(),
161            IllegalGrpcClientStateSnafu {
162                err_msg: "Cluster client already started",
163            }
164        );
165        self.leader_provider = Some(leader_provider);
166        Ok(())
167    }
168
169    fn start<U, A>(&mut self, urls: A) -> Result<()>
170    where
171        U: AsRef<str>,
172        A: AsRef<[U]>,
173    {
174        let peers = urls
175            .as_ref()
176            .iter()
177            .map(|url| url.as_ref().to_string())
178            .collect::<Vec<_>>();
179        let ask_leader = AskLeader::new(
180            self.id,
181            self.role,
182            peers,
183            self.channel_manager.clone(),
184            self.max_retry,
185        );
186        self.start_with(Arc::new(ask_leader))
187    }
188
189    fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
190        let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
191
192        Ok(ClusterClient::new(channel)
193            .accept_compressed(CompressionEncoding::Gzip)
194            .accept_compressed(CompressionEncoding::Zstd)
195            .send_compressed(CompressionEncoding::Zstd))
196    }
197
198    #[inline]
199    fn is_started(&self) -> bool {
200        self.leader_provider.is_some()
201    }
202
203    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
204    where
205        R: Future<Output = std::result::Result<T, Status>>,
206        F: Fn(ClusterClient<Channel>) -> R,
207        H: Fn(&T) -> &Option<ResponseHeader>,
208    {
209        let Some(leader_provider) = self.leader_provider.as_ref() else {
210            return IllegalGrpcClientStateSnafu {
211                err_msg: "not started",
212            }
213            .fail();
214        };
215
216        let mut times = 0;
217        let mut last_error = None;
218
219        while times < self.max_retry {
220            if let Some(leader) = &leader_provider.leader() {
221                let client = self.make_client(leader)?;
222                match body_fn(client).await {
223                    Ok(res) => {
224                        if util::is_not_leader(get_header(&res)) {
225                            last_error = Some(format!("{leader} is not a leader"));
226                            warn!("Failed to {task} to {leader}, not a leader");
227                            let leader = leader_provider.ask_leader().await?;
228                            info!("Cluster client updated to new leader addr: {leader}");
229                            times += 1;
230                            continue;
231                        }
232                        return Ok(res);
233                    }
234                    Err(status) => {
235                        // The leader may be unreachable.
236                        if util::is_unreachable(&status) {
237                            last_error = Some(status.to_string());
238                            warn!("Failed to {task} to {leader}, source: {status}");
239                            let leader = leader_provider.ask_leader().await?;
240                            info!("Cluster client updated to new leader addr: {leader}");
241                            times += 1;
242                            continue;
243                        } else {
244                            error!("An error occurred in gRPC, status: {status}");
245                            return Err(Error::from(status));
246                        }
247                    }
248                }
249            } else if let Err(err) = leader_provider.ask_leader().await {
250                return Err(err);
251            }
252        }
253
254        RetryTimesExceededSnafu {
255            msg: format!("Failed to {task}, last error: {:?}", last_error),
256            times: self.max_retry,
257        }
258        .fail()
259    }
260
261    async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
262        self.with_retry(
263            "range",
264            move |mut client| {
265                let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
266
267                async move { client.range(inner_req).await.map(|res| res.into_inner()) }
268            },
269            |res| &res.header,
270        )
271        .await?
272        .try_into()
273        .context(ConvertMetaResponseSnafu)
274    }
275
276    async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
277        self.with_retry(
278            "batch_get",
279            move |mut client| {
280                let inner_req =
281                    tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
282
283                async move {
284                    client
285                        .batch_get(inner_req)
286                        .await
287                        .map(|res| res.into_inner())
288                }
289            },
290            |res| &res.header,
291        )
292        .await?
293        .try_into()
294        .context(ConvertMetaResponseSnafu)
295    }
296
297    async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
298        self.with_retry(
299            "get_metasrv_peers",
300            move |mut client| {
301                let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
302
303                async move {
304                    client
305                        .metasrv_peers(inner_req)
306                        .await
307                        .map(|res| res.into_inner())
308                }
309            },
310            |res| &res.header,
311        )
312        .await
313        .map(|res| (res.leader, res.followers))
314    }
315}
316
317/// A client for the cluster info. Read only and corresponding to
318/// `in_memory` kvbackend in the meta-srv.
319#[derive(Clone, Debug)]
320pub struct ClusterKvBackend {
321    inner: Arc<Client>,
322}
323
324impl ClusterKvBackend {
325    pub fn new(client: Arc<Client>) -> Self {
326        Self { inner: client }
327    }
328
329    fn unimpl(&self) -> common_meta::error::Error {
330        let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
331            name: self.name().to_string(),
332        }
333        .fail()
334        .map_err(BoxedError::new)
335        .context(common_meta::error::ExternalSnafu);
336        ret.unwrap_err()
337    }
338}
339
340impl TxnService for ClusterKvBackend {
341    type Error = common_meta::error::Error;
342}
343
344#[async_trait::async_trait]
345impl KvBackend for ClusterKvBackend {
346    fn name(&self) -> &str {
347        "ClusterKvBackend"
348    }
349
350    fn as_any(&self) -> &dyn Any {
351        self
352    }
353
354    async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
355        self.inner
356            .range(req)
357            .await
358            .map_err(BoxedError::new)
359            .context(common_meta::error::ExternalSnafu)
360    }
361
362    async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
363        Err(self.unimpl())
364    }
365
366    async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
367        Err(self.unimpl())
368    }
369
370    async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
371        Err(self.unimpl())
372    }
373
374    async fn delete_range(
375        &self,
376        _: DeleteRangeRequest,
377    ) -> common_meta::error::Result<DeleteRangeResponse> {
378        Err(self.unimpl())
379    }
380
381    async fn batch_delete(
382        &self,
383        _: BatchDeleteRequest,
384    ) -> common_meta::error::Result<BatchDeleteResponse> {
385        Err(self.unimpl())
386    }
387}