meta_client/client/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use api::v1::meta::store_client::StoreClient;
19use api::v1::meta::{
20    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
21    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
22    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role,
23};
24use common_grpc::channel_manager::ChannelManager;
25use common_telemetry::tracing_context::TracingContext;
26use snafu::{ensure, OptionExt, ResultExt};
27use tokio::sync::RwLock;
28use tonic::codec::CompressionEncoding;
29use tonic::transport::Channel;
30
31use crate::client::{load_balance as lb, Id};
32use crate::error;
33use crate::error::Result;
34
35#[derive(Clone, Debug)]
36pub struct Client {
37    inner: Arc<RwLock<Inner>>,
38}
39
40impl Client {
41    pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
42        let inner = Arc::new(RwLock::new(Inner {
43            id,
44            role,
45            channel_manager,
46            peers: vec![],
47        }));
48
49        Self { inner }
50    }
51
52    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
53    where
54        U: AsRef<str>,
55        A: AsRef<[U]>,
56    {
57        let mut inner = self.inner.write().await;
58        inner.start(urls).await
59    }
60
61    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
62        let inner = self.inner.read().await;
63        inner.range(req).await
64    }
65
66    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
67        let inner = self.inner.read().await;
68        inner.put(req).await
69    }
70
71    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
72        let inner = self.inner.read().await;
73        inner.batch_get(req).await
74    }
75
76    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
77        let inner = self.inner.read().await;
78        inner.batch_put(req).await
79    }
80
81    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
82        let inner = self.inner.read().await;
83        inner.batch_delete(req).await
84    }
85
86    pub async fn compare_and_put(
87        &self,
88        req: CompareAndPutRequest,
89    ) -> Result<CompareAndPutResponse> {
90        let inner = self.inner.read().await;
91        inner.compare_and_put(req).await
92    }
93
94    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
95        let inner = self.inner.read().await;
96        inner.delete_range(req).await
97    }
98}
99
100#[derive(Debug)]
101struct Inner {
102    id: Id,
103    role: Role,
104    channel_manager: ChannelManager,
105    peers: Vec<String>,
106}
107
108impl Inner {
109    async fn start<U, A>(&mut self, urls: A) -> Result<()>
110    where
111        U: AsRef<str>,
112        A: AsRef<[U]>,
113    {
114        ensure!(
115            !self.is_started(),
116            error::IllegalGrpcClientStateSnafu {
117                err_msg: "Store client already started",
118            }
119        );
120
121        self.peers = urls
122            .as_ref()
123            .iter()
124            .map(|url| url.as_ref().to_string())
125            .collect::<HashSet<_>>()
126            .drain()
127            .collect::<Vec<_>>();
128
129        Ok(())
130    }
131
132    async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
133        let mut client = self.random_client()?;
134        req.set_header(
135            self.id,
136            self.role,
137            TracingContext::from_current_span().to_w3c(),
138        );
139        let res = client.range(req).await.map_err(error::Error::from)?;
140
141        Ok(res.into_inner())
142    }
143
144    async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
145        let mut client = self.random_client()?;
146        req.set_header(
147            self.id,
148            self.role,
149            TracingContext::from_current_span().to_w3c(),
150        );
151        let res = client.put(req).await.map_err(error::Error::from)?;
152
153        Ok(res.into_inner())
154    }
155
156    async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
157        let mut client = self.random_client()?;
158        req.set_header(
159            self.id,
160            self.role,
161            TracingContext::from_current_span().to_w3c(),
162        );
163
164        let res = client.batch_get(req).await.map_err(error::Error::from)?;
165
166        Ok(res.into_inner())
167    }
168
169    async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
170        let mut client = self.random_client()?;
171        req.set_header(
172            self.id,
173            self.role,
174            TracingContext::from_current_span().to_w3c(),
175        );
176        let res = client.batch_put(req).await.map_err(error::Error::from)?;
177
178        Ok(res.into_inner())
179    }
180
181    async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
182        let mut client = self.random_client()?;
183        req.set_header(
184            self.id,
185            self.role,
186            TracingContext::from_current_span().to_w3c(),
187        );
188        let res = client.batch_delete(req).await.map_err(error::Error::from)?;
189
190        Ok(res.into_inner())
191    }
192
193    async fn compare_and_put(
194        &self,
195        mut req: CompareAndPutRequest,
196    ) -> Result<CompareAndPutResponse> {
197        let mut client = self.random_client()?;
198        req.set_header(
199            self.id,
200            self.role,
201            TracingContext::from_current_span().to_w3c(),
202        );
203        let res = client
204            .compare_and_put(req)
205            .await
206            .map_err(error::Error::from)?;
207
208        Ok(res.into_inner())
209    }
210
211    async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
212        let mut client = self.random_client()?;
213        req.set_header(
214            self.id,
215            self.role,
216            TracingContext::from_current_span().to_w3c(),
217        );
218        let res = client.delete_range(req).await.map_err(error::Error::from)?;
219
220        Ok(res.into_inner())
221    }
222
223    fn random_client(&self) -> Result<StoreClient<Channel>> {
224        let len = self.peers.len();
225        let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
226            error::IllegalGrpcClientStateSnafu {
227                err_msg: "Empty peers, store client may not start yet",
228            },
229        )?;
230
231        self.make_client(peer)
232    }
233
234    fn make_client(&self, addr: impl AsRef<str>) -> Result<StoreClient<Channel>> {
235        let channel = self
236            .channel_manager
237            .get(addr)
238            .context(error::CreateChannelSnafu)?;
239
240        Ok(StoreClient::new(channel)
241            .accept_compressed(CompressionEncoding::Gzip)
242            .accept_compressed(CompressionEncoding::Zstd)
243            .send_compressed(CompressionEncoding::Zstd))
244    }
245
246    #[inline]
247    fn is_started(&self) -> bool {
248        !self.peers.is_empty()
249    }
250}
251
252#[cfg(test)]
253mod test {
254    use super::*;
255
256    #[tokio::test]
257    async fn test_already_start() {
258        let mut client = Client::new(0, Role::Frontend, ChannelManager::default());
259        client
260            .start(&["127.0.0.1:1000", "127.0.0.1:1001"])
261            .await
262            .unwrap();
263        let res = client.start(&["127.0.0.1:1002"]).await;
264        assert!(res.is_err());
265        assert!(matches!(
266            res.err(),
267            Some(error::Error::IllegalGrpcClientState { .. })
268        ));
269    }
270
271    #[tokio::test]
272    async fn test_start_with_duplicate_peers() {
273        let mut client = Client::new(0, Role::Frontend, ChannelManager::default());
274        client
275            .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
276            .await
277            .unwrap();
278        assert_eq!(1, client.inner.write().await.peers.len());
279    }
280}