1use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25 Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31 RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ResultExt, ensure};
35use tokio::sync::RwLock;
36use tonic::Status;
37use tonic::codec::CompressionEncoding;
38use tonic::transport::Channel;
39
40use crate::client::{LeaderProviderRef, util};
41use crate::error::{
42 ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
43 ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
44};
45
46#[derive(Clone, Debug)]
47pub struct Client {
48 inner: Arc<RwLock<Inner>>,
49}
50
51impl Client {
52 pub fn new(channel_manager: ChannelManager, max_retry: usize) -> Self {
53 let inner = Arc::new(RwLock::new(Inner {
54 channel_manager,
55 leader_provider: None,
56 max_retry,
57 }));
58
59 Self { inner }
60 }
61
62 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
64 let mut inner = self.inner.write().await;
65 inner.start_with(leader_provider)
66 }
67
68 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
69 let inner = self.inner.read().await;
70 inner.range(req).await
71 }
72
73 #[allow(dead_code)]
74 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
75 let inner = self.inner.read().await;
76 inner.batch_get(req).await
77 }
78
79 pub async fn get_metasrv_peers(
80 &self,
81 ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
82 let inner = self.inner.read().await;
83 inner.get_metasrv_peers().await
84 }
85}
86
87impl TxnService for Client {
88 type Error = MetaError;
89}
90
91#[async_trait::async_trait]
92impl KvBackend for Client {
93 fn name(&self) -> &str {
94 "ClusterClientKvBackend"
95 }
96
97 fn as_any(&self) -> &dyn Any {
98 self
99 }
100
101 async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
102 let resp = self.range(req).await;
103 match resp {
104 Ok(resp) => Ok(resp),
105 Err(err) if err.is_exceeded_size_limit() => {
106 Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
107 }
108 Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
109 }
110 }
111
112 async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
113 unimplemented!("`put` is not supported in cluster client kv backend")
114 }
115
116 async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
117 unimplemented!("`batch_put` is not supported in cluster client kv backend")
118 }
119
120 async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
121 self.batch_get(req)
122 .await
123 .map_err(BoxedError::new)
124 .context(ExternalSnafu)
125 }
126
127 async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
128 unimplemented!("`delete_range` is not supported in cluster client kv backend")
129 }
130
131 async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
132 unimplemented!("`batch_delete` is not supported in cluster client kv backend")
133 }
134}
135
136#[derive(Debug)]
137struct Inner {
138 channel_manager: ChannelManager,
139 leader_provider: Option<LeaderProviderRef>,
140 max_retry: usize,
141}
142
143impl Inner {
144 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
145 ensure!(
146 !self.is_started(),
147 IllegalGrpcClientStateSnafu {
148 err_msg: "Cluster client already started",
149 }
150 );
151 self.leader_provider = Some(leader_provider);
152 Ok(())
153 }
154
155 fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
156 let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
157
158 Ok(ClusterClient::new(channel)
159 .accept_compressed(CompressionEncoding::Gzip)
160 .accept_compressed(CompressionEncoding::Zstd)
161 .send_compressed(CompressionEncoding::Zstd))
162 }
163
164 #[inline]
165 fn is_started(&self) -> bool {
166 self.leader_provider.is_some()
167 }
168
169 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
170 where
171 R: Future<Output = std::result::Result<T, Status>>,
172 F: Fn(ClusterClient<Channel>) -> R,
173 H: Fn(&T) -> &Option<ResponseHeader>,
174 {
175 let Some(leader_provider) = self.leader_provider.as_ref() else {
176 return IllegalGrpcClientStateSnafu {
177 err_msg: "not started",
178 }
179 .fail();
180 };
181
182 let mut times = 0;
183 let mut last_error = None;
184
185 while times < self.max_retry {
186 if let Some(leader) = &leader_provider.leader() {
187 let client = self.make_client(leader)?;
188 match body_fn(client).await {
189 Ok(res) => {
190 if util::is_not_leader(get_header(&res)) {
191 last_error = Some(format!("{leader} is not a leader"));
192 warn!("Failed to {task} to {leader}, not a leader");
193 let leader = leader_provider.ask_leader().await?;
194 info!("Cluster client updated to new leader addr: {leader}");
195 times += 1;
196 continue;
197 }
198 return Ok(res);
199 }
200 Err(status) => {
201 if util::is_unreachable(&status) {
203 last_error = Some(status.to_string());
204 warn!("Failed to {task} to {leader}, source: {status}");
205 let leader = leader_provider.ask_leader().await?;
206 info!("Cluster client updated to new leader addr: {leader}");
207 times += 1;
208 continue;
209 } else {
210 error!("An error occurred in gRPC, status: {status}");
211 return Err(Error::from(status));
212 }
213 }
214 }
215 } else if let Err(err) = leader_provider.ask_leader().await {
216 return Err(err);
217 }
218 }
219
220 RetryTimesExceededSnafu {
221 msg: format!("Failed to {task}, last error: {:?}", last_error),
222 times: self.max_retry,
223 }
224 .fail()
225 }
226
227 async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
228 self.with_retry(
229 "range",
230 move |mut client| {
231 let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
232
233 async move { client.range(inner_req).await.map(|res| res.into_inner()) }
234 },
235 |res| &res.header,
236 )
237 .await?
238 .try_into()
239 .context(ConvertMetaResponseSnafu)
240 }
241
242 async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
243 self.with_retry(
244 "batch_get",
245 move |mut client| {
246 let inner_req =
247 tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
248
249 async move {
250 client
251 .batch_get(inner_req)
252 .await
253 .map(|res| res.into_inner())
254 }
255 },
256 |res| &res.header,
257 )
258 .await?
259 .try_into()
260 .context(ConvertMetaResponseSnafu)
261 }
262
263 async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
264 self.with_retry(
265 "get_metasrv_peers",
266 move |mut client| {
267 let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
268
269 async move {
270 client
271 .metasrv_peers(inner_req)
272 .await
273 .map(|res| res.into_inner())
274 }
275 },
276 |res| &res.header,
277 )
278 .await
279 .map(|res| (res.leader, res.followers))
280 }
281}
282
283#[derive(Clone, Debug)]
286pub struct ClusterKvBackend {
287 inner: Arc<Client>,
288}
289
290impl ClusterKvBackend {
291 pub fn new(client: Arc<Client>) -> Self {
292 Self { inner: client }
293 }
294
295 fn unimpl(&self) -> common_meta::error::Error {
296 let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
297 name: self.name().to_string(),
298 }
299 .fail()
300 .map_err(BoxedError::new)
301 .context(common_meta::error::ExternalSnafu);
302 ret.unwrap_err()
303 }
304}
305
306impl TxnService for ClusterKvBackend {
307 type Error = common_meta::error::Error;
308}
309
310#[async_trait::async_trait]
311impl KvBackend for ClusterKvBackend {
312 fn name(&self) -> &str {
313 "ClusterKvBackend"
314 }
315
316 fn as_any(&self) -> &dyn Any {
317 self
318 }
319
320 async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
321 self.inner
322 .range(req)
323 .await
324 .map_err(BoxedError::new)
325 .context(common_meta::error::ExternalSnafu)
326 }
327
328 async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
329 Err(self.unimpl())
330 }
331
332 async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
333 Err(self.unimpl())
334 }
335
336 async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
337 Err(self.unimpl())
338 }
339
340 async fn delete_range(
341 &self,
342 _: DeleteRangeRequest,
343 ) -> common_meta::error::Result<DeleteRangeResponse> {
344 Err(self.unimpl())
345 }
346
347 async fn batch_delete(
348 &self,
349 _: BatchDeleteRequest,
350 ) -> common_meta::error::Result<BatchDeleteResponse> {
351 Err(self.unimpl())
352 }
353}