meta_client/client/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25    Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31    RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ResultExt, ensure};
35use tokio::sync::RwLock;
36use tonic::Status;
37use tonic::codec::CompressionEncoding;
38use tonic::transport::Channel;
39
40use crate::client::{LeaderProviderRef, util};
41use crate::error::{
42    ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
43    ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
44};
45
46#[derive(Clone, Debug)]
47pub struct Client {
48    inner: Arc<RwLock<Inner>>,
49}
50
51impl Client {
52    pub fn new(channel_manager: ChannelManager, max_retry: usize) -> Self {
53        let inner = Arc::new(RwLock::new(Inner {
54            channel_manager,
55            leader_provider: None,
56            max_retry,
57        }));
58
59        Self { inner }
60    }
61
62    /// Start the client with a [LeaderProvider].
63    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
64        let mut inner = self.inner.write().await;
65        inner.start_with(leader_provider)
66    }
67
68    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
69        let inner = self.inner.read().await;
70        inner.range(req).await
71    }
72
73    #[allow(dead_code)]
74    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
75        let inner = self.inner.read().await;
76        inner.batch_get(req).await
77    }
78
79    pub async fn get_metasrv_peers(
80        &self,
81    ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
82        let inner = self.inner.read().await;
83        inner.get_metasrv_peers().await
84    }
85}
86
87impl TxnService for Client {
88    type Error = MetaError;
89}
90
91#[async_trait::async_trait]
92impl KvBackend for Client {
93    fn name(&self) -> &str {
94        "ClusterClientKvBackend"
95    }
96
97    fn as_any(&self) -> &dyn Any {
98        self
99    }
100
101    async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
102        let resp = self.range(req).await;
103        match resp {
104            Ok(resp) => Ok(resp),
105            Err(err) if err.is_exceeded_size_limit() => {
106                Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
107            }
108            Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
109        }
110    }
111
112    async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
113        unimplemented!("`put` is not supported in cluster client kv backend")
114    }
115
116    async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
117        unimplemented!("`batch_put` is not supported in cluster client kv backend")
118    }
119
120    async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
121        self.batch_get(req)
122            .await
123            .map_err(BoxedError::new)
124            .context(ExternalSnafu)
125    }
126
127    async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
128        unimplemented!("`delete_range` is not supported in cluster client kv backend")
129    }
130
131    async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
132        unimplemented!("`batch_delete` is not supported in cluster client kv backend")
133    }
134}
135
136#[derive(Debug)]
137struct Inner {
138    channel_manager: ChannelManager,
139    leader_provider: Option<LeaderProviderRef>,
140    max_retry: usize,
141}
142
143impl Inner {
144    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
145        ensure!(
146            !self.is_started(),
147            IllegalGrpcClientStateSnafu {
148                err_msg: "Cluster client already started",
149            }
150        );
151        self.leader_provider = Some(leader_provider);
152        Ok(())
153    }
154
155    fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
156        let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
157
158        Ok(ClusterClient::new(channel)
159            .accept_compressed(CompressionEncoding::Gzip)
160            .accept_compressed(CompressionEncoding::Zstd)
161            .send_compressed(CompressionEncoding::Zstd))
162    }
163
164    #[inline]
165    fn is_started(&self) -> bool {
166        self.leader_provider.is_some()
167    }
168
169    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
170    where
171        R: Future<Output = std::result::Result<T, Status>>,
172        F: Fn(ClusterClient<Channel>) -> R,
173        H: Fn(&T) -> &Option<ResponseHeader>,
174    {
175        let Some(leader_provider) = self.leader_provider.as_ref() else {
176            return IllegalGrpcClientStateSnafu {
177                err_msg: "not started",
178            }
179            .fail();
180        };
181
182        let mut times = 0;
183        let mut last_error = None;
184
185        while times < self.max_retry {
186            if let Some(leader) = &leader_provider.leader() {
187                let client = self.make_client(leader)?;
188                match body_fn(client).await {
189                    Ok(res) => {
190                        if util::is_not_leader(get_header(&res)) {
191                            last_error = Some(format!("{leader} is not a leader"));
192                            warn!("Failed to {task} to {leader}, not a leader");
193                            let leader = leader_provider.ask_leader().await?;
194                            info!("Cluster client updated to new leader addr: {leader}");
195                            times += 1;
196                            continue;
197                        }
198                        return Ok(res);
199                    }
200                    Err(status) => {
201                        // The leader may be unreachable.
202                        if util::is_unreachable(&status) {
203                            last_error = Some(status.to_string());
204                            warn!("Failed to {task} to {leader}, source: {status}");
205                            let leader = leader_provider.ask_leader().await?;
206                            info!("Cluster client updated to new leader addr: {leader}");
207                            times += 1;
208                            continue;
209                        } else {
210                            error!("An error occurred in gRPC, status: {status}");
211                            return Err(Error::from(status));
212                        }
213                    }
214                }
215            } else if let Err(err) = leader_provider.ask_leader().await {
216                return Err(err);
217            }
218        }
219
220        RetryTimesExceededSnafu {
221            msg: format!("Failed to {task}, last error: {:?}", last_error),
222            times: self.max_retry,
223        }
224        .fail()
225    }
226
227    async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
228        self.with_retry(
229            "range",
230            move |mut client| {
231                let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
232
233                async move { client.range(inner_req).await.map(|res| res.into_inner()) }
234            },
235            |res| &res.header,
236        )
237        .await?
238        .try_into()
239        .context(ConvertMetaResponseSnafu)
240    }
241
242    async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
243        self.with_retry(
244            "batch_get",
245            move |mut client| {
246                let inner_req =
247                    tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
248
249                async move {
250                    client
251                        .batch_get(inner_req)
252                        .await
253                        .map(|res| res.into_inner())
254                }
255            },
256            |res| &res.header,
257        )
258        .await?
259        .try_into()
260        .context(ConvertMetaResponseSnafu)
261    }
262
263    async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
264        self.with_retry(
265            "get_metasrv_peers",
266            move |mut client| {
267                let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
268
269                async move {
270                    client
271                        .metasrv_peers(inner_req)
272                        .await
273                        .map(|res| res.into_inner())
274                }
275            },
276            |res| &res.header,
277        )
278        .await
279        .map(|res| (res.leader, res.followers))
280    }
281}
282
283/// A client for the cluster info. Read only and corresponding to
284/// `in_memory` kvbackend in the meta-srv.
285#[derive(Clone, Debug)]
286pub struct ClusterKvBackend {
287    inner: Arc<Client>,
288}
289
290impl ClusterKvBackend {
291    pub fn new(client: Arc<Client>) -> Self {
292        Self { inner: client }
293    }
294
295    fn unimpl(&self) -> common_meta::error::Error {
296        let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
297            name: self.name().to_string(),
298        }
299        .fail()
300        .map_err(BoxedError::new)
301        .context(common_meta::error::ExternalSnafu);
302        ret.unwrap_err()
303    }
304}
305
306impl TxnService for ClusterKvBackend {
307    type Error = common_meta::error::Error;
308}
309
310#[async_trait::async_trait]
311impl KvBackend for ClusterKvBackend {
312    fn name(&self) -> &str {
313        "ClusterKvBackend"
314    }
315
316    fn as_any(&self) -> &dyn Any {
317        self
318    }
319
320    async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
321        self.inner
322            .range(req)
323            .await
324            .map_err(BoxedError::new)
325            .context(common_meta::error::ExternalSnafu)
326    }
327
328    async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
329        Err(self.unimpl())
330    }
331
332    async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
333        Err(self.unimpl())
334    }
335
336    async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
337        Err(self.unimpl())
338    }
339
340    async fn delete_range(
341        &self,
342        _: DeleteRangeRequest,
343    ) -> common_meta::error::Result<DeleteRangeResponse> {
344        Err(self.unimpl())
345    }
346
347    async fn batch_delete(
348        &self,
349        _: BatchDeleteRequest,
350    ) -> common_meta::error::Result<BatchDeleteResponse> {
351        Err(self.unimpl())
352    }
353}