1use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25 Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31 RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ensure, ResultExt};
35use tokio::sync::RwLock;
36use tonic::codec::CompressionEncoding;
37use tonic::transport::Channel;
38use tonic::Status;
39
40use crate::client::ask_leader::AskLeader;
41use crate::client::{util, Id};
42use crate::error::{
43 ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
44 ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
45};
46
47#[derive(Clone, Debug)]
48pub struct Client {
49 inner: Arc<RwLock<Inner>>,
50}
51
52impl Client {
53 pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
54 let inner = Arc::new(RwLock::new(Inner {
55 id,
56 role,
57 channel_manager,
58 ask_leader: None,
59 max_retry,
60 }));
61
62 Self { inner }
63 }
64
65 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
66 where
67 U: AsRef<str>,
68 A: AsRef<[U]>,
69 {
70 let mut inner = self.inner.write().await;
71 inner.start(urls).await
72 }
73
74 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
75 let inner = self.inner.read().await;
76 inner.range(req).await
77 }
78
79 #[allow(dead_code)]
80 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
81 let inner = self.inner.read().await;
82 inner.batch_get(req).await
83 }
84
85 pub async fn get_metasrv_peers(
86 &self,
87 ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
88 let inner = self.inner.read().await;
89 inner.get_metasrv_peers().await
90 }
91}
92
93impl TxnService for Client {
94 type Error = MetaError;
95}
96
97#[async_trait::async_trait]
98impl KvBackend for Client {
99 fn name(&self) -> &str {
100 "ClusterClientKvBackend"
101 }
102
103 fn as_any(&self) -> &dyn Any {
104 self
105 }
106
107 async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
108 let resp = self.range(req).await;
109 match resp {
110 Ok(resp) => Ok(resp),
111 Err(err) if err.is_exceeded_size_limit() => {
112 Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
113 }
114 Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
115 }
116 }
117
118 async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
119 unimplemented!("`put` is not supported in cluster client kv backend")
120 }
121
122 async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
123 unimplemented!("`batch_put` is not supported in cluster client kv backend")
124 }
125
126 async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
127 self.batch_get(req)
128 .await
129 .map_err(BoxedError::new)
130 .context(ExternalSnafu)
131 }
132
133 async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
134 unimplemented!("`delete_range` is not supported in cluster client kv backend")
135 }
136
137 async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
138 unimplemented!("`batch_delete` is not supported in cluster client kv backend")
139 }
140}
141
142#[derive(Debug)]
143struct Inner {
144 id: Id,
145 role: Role,
146 channel_manager: ChannelManager,
147 ask_leader: Option<AskLeader>,
148 max_retry: usize,
149}
150
151impl Inner {
152 async fn start<U, A>(&mut self, urls: A) -> Result<()>
153 where
154 U: AsRef<str>,
155 A: AsRef<[U]>,
156 {
157 ensure!(
158 !self.is_started(),
159 IllegalGrpcClientStateSnafu {
160 err_msg: "Cluster client already started",
161 }
162 );
163
164 let peers = urls
165 .as_ref()
166 .iter()
167 .map(|url| url.as_ref().to_string())
168 .collect::<Vec<_>>();
169 self.ask_leader = Some(AskLeader::new(
170 self.id,
171 self.role,
172 peers,
173 self.channel_manager.clone(),
174 self.max_retry,
175 ));
176
177 Ok(())
178 }
179
180 fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
181 let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
182
183 Ok(ClusterClient::new(channel)
184 .accept_compressed(CompressionEncoding::Gzip)
185 .accept_compressed(CompressionEncoding::Zstd)
186 .send_compressed(CompressionEncoding::Zstd))
187 }
188
189 #[inline]
190 fn is_started(&self) -> bool {
191 self.ask_leader.is_some()
192 }
193
194 fn ask_leader(&self) -> Result<&AskLeader> {
195 ensure!(
196 self.is_started(),
197 IllegalGrpcClientStateSnafu {
198 err_msg: "Cluster client not start"
199 }
200 );
201
202 Ok(self.ask_leader.as_ref().unwrap())
203 }
204
205 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
206 where
207 R: Future<Output = std::result::Result<T, Status>>,
208 F: Fn(ClusterClient<Channel>) -> R,
209 H: Fn(&T) -> &Option<ResponseHeader>,
210 {
211 let ask_leader = self.ask_leader()?;
212 let mut times = 0;
213 let mut last_error = None;
214
215 while times < self.max_retry {
216 if let Some(leader) = &ask_leader.get_leader() {
217 let client = self.make_client(leader)?;
218 match body_fn(client).await {
219 Ok(res) => {
220 if util::is_not_leader(get_header(&res)) {
221 last_error = Some(format!("{leader} is not a leader"));
222 warn!("Failed to {task} to {leader}, not a leader");
223 let leader = ask_leader.ask_leader().await?;
224 info!("Cluster client updated to new leader addr: {leader}");
225 times += 1;
226 continue;
227 }
228 return Ok(res);
229 }
230 Err(status) => {
231 if util::is_unreachable(&status) {
233 last_error = Some(status.to_string());
234 warn!("Failed to {task} to {leader}, source: {status}");
235 let leader = ask_leader.ask_leader().await?;
236 info!("Cluster client updated to new leader addr: {leader}");
237 times += 1;
238 continue;
239 } else {
240 error!("An error occurred in gRPC, status: {status}");
241 return Err(Error::from(status));
242 }
243 }
244 }
245 } else if let Err(err) = ask_leader.ask_leader().await {
246 return Err(err);
247 }
248 }
249
250 RetryTimesExceededSnafu {
251 msg: format!("Failed to {task}, last error: {:?}", last_error),
252 times: self.max_retry,
253 }
254 .fail()
255 }
256
257 async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
258 self.with_retry(
259 "range",
260 move |mut client| {
261 let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
262
263 async move { client.range(inner_req).await.map(|res| res.into_inner()) }
264 },
265 |res| &res.header,
266 )
267 .await?
268 .try_into()
269 .context(ConvertMetaResponseSnafu)
270 }
271
272 async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
273 self.with_retry(
274 "batch_get",
275 move |mut client| {
276 let inner_req =
277 tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
278
279 async move {
280 client
281 .batch_get(inner_req)
282 .await
283 .map(|res| res.into_inner())
284 }
285 },
286 |res| &res.header,
287 )
288 .await?
289 .try_into()
290 .context(ConvertMetaResponseSnafu)
291 }
292
293 async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
294 self.with_retry(
295 "get_metasrv_peers",
296 move |mut client| {
297 let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
298
299 async move {
300 client
301 .metasrv_peers(inner_req)
302 .await
303 .map(|res| res.into_inner())
304 }
305 },
306 |res| &res.header,
307 )
308 .await
309 .map(|res| (res.leader, res.followers))
310 }
311}
312
313#[derive(Clone, Debug)]
316pub struct ClusterKvBackend {
317 inner: Arc<Client>,
318}
319
320impl ClusterKvBackend {
321 pub fn new(client: Arc<Client>) -> Self {
322 Self { inner: client }
323 }
324
325 fn unimpl(&self) -> common_meta::error::Error {
326 let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
327 name: self.name().to_string(),
328 }
329 .fail()
330 .map_err(BoxedError::new)
331 .context(common_meta::error::ExternalSnafu);
332 ret.unwrap_err()
333 }
334}
335
336impl TxnService for ClusterKvBackend {
337 type Error = common_meta::error::Error;
338}
339
340#[async_trait::async_trait]
341impl KvBackend for ClusterKvBackend {
342 fn name(&self) -> &str {
343 "ClusterKvBackend"
344 }
345
346 fn as_any(&self) -> &dyn Any {
347 self
348 }
349
350 async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
351 self.inner
352 .range(req)
353 .await
354 .map_err(BoxedError::new)
355 .context(common_meta::error::ExternalSnafu)
356 }
357
358 async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
359 Err(self.unimpl())
360 }
361
362 async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
363 Err(self.unimpl())
364 }
365
366 async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
367 Err(self.unimpl())
368 }
369
370 async fn delete_range(
371 &self,
372 _: DeleteRangeRequest,
373 ) -> common_meta::error::Result<DeleteRangeResponse> {
374 Err(self.unimpl())
375 }
376
377 async fn batch_delete(
378 &self,
379 _: BatchDeleteRequest,
380 ) -> common_meta::error::Result<BatchDeleteResponse> {
381 Err(self.unimpl())
382 }
383}