1use std::collections::HashSet;
16use std::sync::Arc;
17
18use api::v1::meta::store_client::StoreClient;
19use api::v1::meta::{
20 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
21 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
22 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role,
23};
24use common_grpc::channel_manager::ChannelManager;
25use common_telemetry::tracing_context::TracingContext;
26use snafu::{ensure, OptionExt, ResultExt};
27use tokio::sync::RwLock;
28use tonic::codec::CompressionEncoding;
29use tonic::transport::Channel;
30
31use crate::client::{load_balance as lb, Id};
32use crate::error;
33use crate::error::Result;
34
35#[derive(Clone, Debug)]
36pub struct Client {
37 inner: Arc<RwLock<Inner>>,
38}
39
40impl Client {
41 pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
42 let inner = Arc::new(RwLock::new(Inner {
43 id,
44 role,
45 channel_manager,
46 peers: vec![],
47 }));
48
49 Self { inner }
50 }
51
52 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
53 where
54 U: AsRef<str>,
55 A: AsRef<[U]>,
56 {
57 let mut inner = self.inner.write().await;
58 inner.start(urls).await
59 }
60
61 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
62 let inner = self.inner.read().await;
63 inner.range(req).await
64 }
65
66 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
67 let inner = self.inner.read().await;
68 inner.put(req).await
69 }
70
71 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
72 let inner = self.inner.read().await;
73 inner.batch_get(req).await
74 }
75
76 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
77 let inner = self.inner.read().await;
78 inner.batch_put(req).await
79 }
80
81 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
82 let inner = self.inner.read().await;
83 inner.batch_delete(req).await
84 }
85
86 pub async fn compare_and_put(
87 &self,
88 req: CompareAndPutRequest,
89 ) -> Result<CompareAndPutResponse> {
90 let inner = self.inner.read().await;
91 inner.compare_and_put(req).await
92 }
93
94 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
95 let inner = self.inner.read().await;
96 inner.delete_range(req).await
97 }
98}
99
100#[derive(Debug)]
101struct Inner {
102 id: Id,
103 role: Role,
104 channel_manager: ChannelManager,
105 peers: Vec<String>,
106}
107
108impl Inner {
109 async fn start<U, A>(&mut self, urls: A) -> Result<()>
110 where
111 U: AsRef<str>,
112 A: AsRef<[U]>,
113 {
114 ensure!(
115 !self.is_started(),
116 error::IllegalGrpcClientStateSnafu {
117 err_msg: "Store client already started",
118 }
119 );
120
121 self.peers = urls
122 .as_ref()
123 .iter()
124 .map(|url| url.as_ref().to_string())
125 .collect::<HashSet<_>>()
126 .drain()
127 .collect::<Vec<_>>();
128
129 Ok(())
130 }
131
132 async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
133 let mut client = self.random_client()?;
134 req.set_header(
135 self.id,
136 self.role,
137 TracingContext::from_current_span().to_w3c(),
138 );
139 let res = client.range(req).await.map_err(error::Error::from)?;
140
141 Ok(res.into_inner())
142 }
143
144 async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
145 let mut client = self.random_client()?;
146 req.set_header(
147 self.id,
148 self.role,
149 TracingContext::from_current_span().to_w3c(),
150 );
151 let res = client.put(req).await.map_err(error::Error::from)?;
152
153 Ok(res.into_inner())
154 }
155
156 async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
157 let mut client = self.random_client()?;
158 req.set_header(
159 self.id,
160 self.role,
161 TracingContext::from_current_span().to_w3c(),
162 );
163
164 let res = client.batch_get(req).await.map_err(error::Error::from)?;
165
166 Ok(res.into_inner())
167 }
168
169 async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
170 let mut client = self.random_client()?;
171 req.set_header(
172 self.id,
173 self.role,
174 TracingContext::from_current_span().to_w3c(),
175 );
176 let res = client.batch_put(req).await.map_err(error::Error::from)?;
177
178 Ok(res.into_inner())
179 }
180
181 async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
182 let mut client = self.random_client()?;
183 req.set_header(
184 self.id,
185 self.role,
186 TracingContext::from_current_span().to_w3c(),
187 );
188 let res = client.batch_delete(req).await.map_err(error::Error::from)?;
189
190 Ok(res.into_inner())
191 }
192
193 async fn compare_and_put(
194 &self,
195 mut req: CompareAndPutRequest,
196 ) -> Result<CompareAndPutResponse> {
197 let mut client = self.random_client()?;
198 req.set_header(
199 self.id,
200 self.role,
201 TracingContext::from_current_span().to_w3c(),
202 );
203 let res = client
204 .compare_and_put(req)
205 .await
206 .map_err(error::Error::from)?;
207
208 Ok(res.into_inner())
209 }
210
211 async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
212 let mut client = self.random_client()?;
213 req.set_header(
214 self.id,
215 self.role,
216 TracingContext::from_current_span().to_w3c(),
217 );
218 let res = client.delete_range(req).await.map_err(error::Error::from)?;
219
220 Ok(res.into_inner())
221 }
222
223 fn random_client(&self) -> Result<StoreClient<Channel>> {
224 let len = self.peers.len();
225 let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
226 error::IllegalGrpcClientStateSnafu {
227 err_msg: "Empty peers, store client may not start yet",
228 },
229 )?;
230
231 self.make_client(peer)
232 }
233
234 fn make_client(&self, addr: impl AsRef<str>) -> Result<StoreClient<Channel>> {
235 let channel = self
236 .channel_manager
237 .get(addr)
238 .context(error::CreateChannelSnafu)?;
239
240 Ok(StoreClient::new(channel)
241 .accept_compressed(CompressionEncoding::Gzip)
242 .accept_compressed(CompressionEncoding::Zstd)
243 .send_compressed(CompressionEncoding::Zstd))
244 }
245
246 #[inline]
247 fn is_started(&self) -> bool {
248 !self.peers.is_empty()
249 }
250}
251
252#[cfg(test)]
253mod test {
254 use super::*;
255
256 #[tokio::test]
257 async fn test_already_start() {
258 let mut client = Client::new(0, Role::Frontend, ChannelManager::default());
259 client
260 .start(&["127.0.0.1:1000", "127.0.0.1:1001"])
261 .await
262 .unwrap();
263 let res = client.start(&["127.0.0.1:1002"]).await;
264 assert!(res.is_err());
265 assert!(matches!(
266 res.err(),
267 Some(error::Error::IllegalGrpcClientState { .. })
268 ));
269 }
270
271 #[tokio::test]
272 async fn test_start_with_duplicate_peers() {
273 let mut client = Client::new(0, Role::Frontend, ChannelManager::default());
274 client
275 .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
276 .await
277 .unwrap();
278 assert_eq!(1, client.inner.write().await.peers.len());
279 }
280}