meta_client/client/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25    Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31    RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ensure, ResultExt};
35use tokio::sync::RwLock;
36use tonic::codec::CompressionEncoding;
37use tonic::transport::Channel;
38use tonic::Status;
39
40use crate::client::ask_leader::AskLeader;
41use crate::client::{util, Id};
42use crate::error::{
43    ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
44    ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
45};
46
47#[derive(Clone, Debug)]
48pub struct Client {
49    inner: Arc<RwLock<Inner>>,
50}
51
52impl Client {
53    pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
54        let inner = Arc::new(RwLock::new(Inner {
55            id,
56            role,
57            channel_manager,
58            ask_leader: None,
59            max_retry,
60        }));
61
62        Self { inner }
63    }
64
65    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
66    where
67        U: AsRef<str>,
68        A: AsRef<[U]>,
69    {
70        let mut inner = self.inner.write().await;
71        inner.start(urls).await
72    }
73
74    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
75        let inner = self.inner.read().await;
76        inner.range(req).await
77    }
78
79    #[allow(dead_code)]
80    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
81        let inner = self.inner.read().await;
82        inner.batch_get(req).await
83    }
84
85    pub async fn get_metasrv_peers(
86        &self,
87    ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
88        let inner = self.inner.read().await;
89        inner.get_metasrv_peers().await
90    }
91}
92
93impl TxnService for Client {
94    type Error = MetaError;
95}
96
97#[async_trait::async_trait]
98impl KvBackend for Client {
99    fn name(&self) -> &str {
100        "ClusterClientKvBackend"
101    }
102
103    fn as_any(&self) -> &dyn Any {
104        self
105    }
106
107    async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
108        let resp = self.range(req).await;
109        match resp {
110            Ok(resp) => Ok(resp),
111            Err(err) if err.is_exceeded_size_limit() => {
112                Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
113            }
114            Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
115        }
116    }
117
118    async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
119        unimplemented!("`put` is not supported in cluster client kv backend")
120    }
121
122    async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
123        unimplemented!("`batch_put` is not supported in cluster client kv backend")
124    }
125
126    async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
127        self.batch_get(req)
128            .await
129            .map_err(BoxedError::new)
130            .context(ExternalSnafu)
131    }
132
133    async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
134        unimplemented!("`delete_range` is not supported in cluster client kv backend")
135    }
136
137    async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
138        unimplemented!("`batch_delete` is not supported in cluster client kv backend")
139    }
140}
141
142#[derive(Debug)]
143struct Inner {
144    id: Id,
145    role: Role,
146    channel_manager: ChannelManager,
147    ask_leader: Option<AskLeader>,
148    max_retry: usize,
149}
150
151impl Inner {
152    async fn start<U, A>(&mut self, urls: A) -> Result<()>
153    where
154        U: AsRef<str>,
155        A: AsRef<[U]>,
156    {
157        ensure!(
158            !self.is_started(),
159            IllegalGrpcClientStateSnafu {
160                err_msg: "Cluster client already started",
161            }
162        );
163
164        let peers = urls
165            .as_ref()
166            .iter()
167            .map(|url| url.as_ref().to_string())
168            .collect::<Vec<_>>();
169        self.ask_leader = Some(AskLeader::new(
170            self.id,
171            self.role,
172            peers,
173            self.channel_manager.clone(),
174            self.max_retry,
175        ));
176
177        Ok(())
178    }
179
180    fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
181        let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
182
183        Ok(ClusterClient::new(channel)
184            .accept_compressed(CompressionEncoding::Gzip)
185            .accept_compressed(CompressionEncoding::Zstd)
186            .send_compressed(CompressionEncoding::Zstd))
187    }
188
189    #[inline]
190    fn is_started(&self) -> bool {
191        self.ask_leader.is_some()
192    }
193
194    fn ask_leader(&self) -> Result<&AskLeader> {
195        ensure!(
196            self.is_started(),
197            IllegalGrpcClientStateSnafu {
198                err_msg: "Cluster client not start"
199            }
200        );
201
202        Ok(self.ask_leader.as_ref().unwrap())
203    }
204
205    async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
206    where
207        R: Future<Output = std::result::Result<T, Status>>,
208        F: Fn(ClusterClient<Channel>) -> R,
209        H: Fn(&T) -> &Option<ResponseHeader>,
210    {
211        let ask_leader = self.ask_leader()?;
212        let mut times = 0;
213        let mut last_error = None;
214
215        while times < self.max_retry {
216            if let Some(leader) = &ask_leader.get_leader() {
217                let client = self.make_client(leader)?;
218                match body_fn(client).await {
219                    Ok(res) => {
220                        if util::is_not_leader(get_header(&res)) {
221                            last_error = Some(format!("{leader} is not a leader"));
222                            warn!("Failed to {task} to {leader}, not a leader");
223                            let leader = ask_leader.ask_leader().await?;
224                            info!("Cluster client updated to new leader addr: {leader}");
225                            times += 1;
226                            continue;
227                        }
228                        return Ok(res);
229                    }
230                    Err(status) => {
231                        // The leader may be unreachable.
232                        if util::is_unreachable(&status) {
233                            last_error = Some(status.to_string());
234                            warn!("Failed to {task} to {leader}, source: {status}");
235                            let leader = ask_leader.ask_leader().await?;
236                            info!("Cluster client updated to new leader addr: {leader}");
237                            times += 1;
238                            continue;
239                        } else {
240                            error!("An error occurred in gRPC, status: {status}");
241                            return Err(Error::from(status));
242                        }
243                    }
244                }
245            } else if let Err(err) = ask_leader.ask_leader().await {
246                return Err(err);
247            }
248        }
249
250        RetryTimesExceededSnafu {
251            msg: format!("Failed to {task}, last error: {:?}", last_error),
252            times: self.max_retry,
253        }
254        .fail()
255    }
256
257    async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
258        self.with_retry(
259            "range",
260            move |mut client| {
261                let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
262
263                async move { client.range(inner_req).await.map(|res| res.into_inner()) }
264            },
265            |res| &res.header,
266        )
267        .await?
268        .try_into()
269        .context(ConvertMetaResponseSnafu)
270    }
271
272    async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
273        self.with_retry(
274            "batch_get",
275            move |mut client| {
276                let inner_req =
277                    tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
278
279                async move {
280                    client
281                        .batch_get(inner_req)
282                        .await
283                        .map(|res| res.into_inner())
284                }
285            },
286            |res| &res.header,
287        )
288        .await?
289        .try_into()
290        .context(ConvertMetaResponseSnafu)
291    }
292
293    async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
294        self.with_retry(
295            "get_metasrv_peers",
296            move |mut client| {
297                let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
298
299                async move {
300                    client
301                        .metasrv_peers(inner_req)
302                        .await
303                        .map(|res| res.into_inner())
304                }
305            },
306            |res| &res.header,
307        )
308        .await
309        .map(|res| (res.leader, res.followers))
310    }
311}
312
313/// A client for the cluster info. Read only and corresponding to
314/// `in_memory` kvbackend in the meta-srv.
315#[derive(Clone, Debug)]
316pub struct ClusterKvBackend {
317    inner: Arc<Client>,
318}
319
320impl ClusterKvBackend {
321    pub fn new(client: Arc<Client>) -> Self {
322        Self { inner: client }
323    }
324
325    fn unimpl(&self) -> common_meta::error::Error {
326        let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
327            name: self.name().to_string(),
328        }
329        .fail()
330        .map_err(BoxedError::new)
331        .context(common_meta::error::ExternalSnafu);
332        ret.unwrap_err()
333    }
334}
335
336impl TxnService for ClusterKvBackend {
337    type Error = common_meta::error::Error;
338}
339
340#[async_trait::async_trait]
341impl KvBackend for ClusterKvBackend {
342    fn name(&self) -> &str {
343        "ClusterKvBackend"
344    }
345
346    fn as_any(&self) -> &dyn Any {
347        self
348    }
349
350    async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
351        self.inner
352            .range(req)
353            .await
354            .map_err(BoxedError::new)
355            .context(common_meta::error::ExternalSnafu)
356    }
357
358    async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
359        Err(self.unimpl())
360    }
361
362    async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
363        Err(self.unimpl())
364    }
365
366    async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
367        Err(self.unimpl())
368    }
369
370    async fn delete_range(
371        &self,
372        _: DeleteRangeRequest,
373    ) -> common_meta::error::Result<DeleteRangeResponse> {
374        Err(self.unimpl())
375    }
376
377    async fn batch_delete(
378        &self,
379        _: BatchDeleteRequest,
380    ) -> common_meta::error::Result<BatchDeleteResponse> {
381        Err(self.unimpl())
382    }
383}