1use std::any::Any;
16use std::future::Future;
17use std::sync::Arc;
18
19use api::greptime_proto::v1;
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
22use common_error::ext::BoxedError;
23use common_grpc::channel_manager::ChannelManager;
24use common_meta::error::{
25 Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
26};
27use common_meta::kv_backend::{KvBackend, TxnService};
28use common_meta::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31 RangeRequest, RangeResponse,
32};
33use common_telemetry::{error, info, warn};
34use snafu::{ensure, ResultExt};
35use tokio::sync::RwLock;
36use tonic::codec::CompressionEncoding;
37use tonic::transport::Channel;
38use tonic::Status;
39
40use crate::client::ask_leader::AskLeader;
41use crate::client::{util, Id, LeaderProviderRef};
42use crate::error::{
43 ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
44 ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
45};
46
47#[derive(Clone, Debug)]
48pub struct Client {
49 inner: Arc<RwLock<Inner>>,
50}
51
52impl Client {
53 pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
54 let inner = Arc::new(RwLock::new(Inner {
55 id,
56 role,
57 channel_manager,
58 leader_provider: None,
59 max_retry,
60 }));
61
62 Self { inner }
63 }
64
65 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
66 where
67 U: AsRef<str>,
68 A: AsRef<[U]>,
69 {
70 let mut inner = self.inner.write().await;
71 inner.start(urls)
72 }
73
74 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
76 let mut inner = self.inner.write().await;
77 inner.start_with(leader_provider)
78 }
79
80 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
81 let inner = self.inner.read().await;
82 inner.range(req).await
83 }
84
85 #[allow(dead_code)]
86 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
87 let inner = self.inner.read().await;
88 inner.batch_get(req).await
89 }
90
91 pub async fn get_metasrv_peers(
92 &self,
93 ) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
94 let inner = self.inner.read().await;
95 inner.get_metasrv_peers().await
96 }
97}
98
99impl TxnService for Client {
100 type Error = MetaError;
101}
102
103#[async_trait::async_trait]
104impl KvBackend for Client {
105 fn name(&self) -> &str {
106 "ClusterClientKvBackend"
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112
113 async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
114 let resp = self.range(req).await;
115 match resp {
116 Ok(resp) => Ok(resp),
117 Err(err) if err.is_exceeded_size_limit() => {
118 Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
119 }
120 Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
121 }
122 }
123
124 async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
125 unimplemented!("`put` is not supported in cluster client kv backend")
126 }
127
128 async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
129 unimplemented!("`batch_put` is not supported in cluster client kv backend")
130 }
131
132 async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
133 self.batch_get(req)
134 .await
135 .map_err(BoxedError::new)
136 .context(ExternalSnafu)
137 }
138
139 async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
140 unimplemented!("`delete_range` is not supported in cluster client kv backend")
141 }
142
143 async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
144 unimplemented!("`batch_delete` is not supported in cluster client kv backend")
145 }
146}
147
148#[derive(Debug)]
149struct Inner {
150 id: Id,
151 role: Role,
152 channel_manager: ChannelManager,
153 leader_provider: Option<LeaderProviderRef>,
154 max_retry: usize,
155}
156
157impl Inner {
158 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
159 ensure!(
160 !self.is_started(),
161 IllegalGrpcClientStateSnafu {
162 err_msg: "Cluster client already started",
163 }
164 );
165 self.leader_provider = Some(leader_provider);
166 Ok(())
167 }
168
169 fn start<U, A>(&mut self, urls: A) -> Result<()>
170 where
171 U: AsRef<str>,
172 A: AsRef<[U]>,
173 {
174 let peers = urls
175 .as_ref()
176 .iter()
177 .map(|url| url.as_ref().to_string())
178 .collect::<Vec<_>>();
179 let ask_leader = AskLeader::new(
180 self.id,
181 self.role,
182 peers,
183 self.channel_manager.clone(),
184 self.max_retry,
185 );
186 self.start_with(Arc::new(ask_leader))
187 }
188
189 fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
190 let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
191
192 Ok(ClusterClient::new(channel)
193 .accept_compressed(CompressionEncoding::Gzip)
194 .accept_compressed(CompressionEncoding::Zstd)
195 .send_compressed(CompressionEncoding::Zstd))
196 }
197
198 #[inline]
199 fn is_started(&self) -> bool {
200 self.leader_provider.is_some()
201 }
202
203 async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
204 where
205 R: Future<Output = std::result::Result<T, Status>>,
206 F: Fn(ClusterClient<Channel>) -> R,
207 H: Fn(&T) -> &Option<ResponseHeader>,
208 {
209 let Some(leader_provider) = self.leader_provider.as_ref() else {
210 return IllegalGrpcClientStateSnafu {
211 err_msg: "not started",
212 }
213 .fail();
214 };
215
216 let mut times = 0;
217 let mut last_error = None;
218
219 while times < self.max_retry {
220 if let Some(leader) = &leader_provider.leader() {
221 let client = self.make_client(leader)?;
222 match body_fn(client).await {
223 Ok(res) => {
224 if util::is_not_leader(get_header(&res)) {
225 last_error = Some(format!("{leader} is not a leader"));
226 warn!("Failed to {task} to {leader}, not a leader");
227 let leader = leader_provider.ask_leader().await?;
228 info!("Cluster client updated to new leader addr: {leader}");
229 times += 1;
230 continue;
231 }
232 return Ok(res);
233 }
234 Err(status) => {
235 if util::is_unreachable(&status) {
237 last_error = Some(status.to_string());
238 warn!("Failed to {task} to {leader}, source: {status}");
239 let leader = leader_provider.ask_leader().await?;
240 info!("Cluster client updated to new leader addr: {leader}");
241 times += 1;
242 continue;
243 } else {
244 error!("An error occurred in gRPC, status: {status}");
245 return Err(Error::from(status));
246 }
247 }
248 }
249 } else if let Err(err) = leader_provider.ask_leader().await {
250 return Err(err);
251 }
252 }
253
254 RetryTimesExceededSnafu {
255 msg: format!("Failed to {task}, last error: {:?}", last_error),
256 times: self.max_retry,
257 }
258 .fail()
259 }
260
261 async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
262 self.with_retry(
263 "range",
264 move |mut client| {
265 let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
266
267 async move { client.range(inner_req).await.map(|res| res.into_inner()) }
268 },
269 |res| &res.header,
270 )
271 .await?
272 .try_into()
273 .context(ConvertMetaResponseSnafu)
274 }
275
276 async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
277 self.with_retry(
278 "batch_get",
279 move |mut client| {
280 let inner_req =
281 tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
282
283 async move {
284 client
285 .batch_get(inner_req)
286 .await
287 .map(|res| res.into_inner())
288 }
289 },
290 |res| &res.header,
291 )
292 .await?
293 .try_into()
294 .context(ConvertMetaResponseSnafu)
295 }
296
297 async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
298 self.with_retry(
299 "get_metasrv_peers",
300 move |mut client| {
301 let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
302
303 async move {
304 client
305 .metasrv_peers(inner_req)
306 .await
307 .map(|res| res.into_inner())
308 }
309 },
310 |res| &res.header,
311 )
312 .await
313 .map(|res| (res.leader, res.followers))
314 }
315}
316
317#[derive(Clone, Debug)]
320pub struct ClusterKvBackend {
321 inner: Arc<Client>,
322}
323
324impl ClusterKvBackend {
325 pub fn new(client: Arc<Client>) -> Self {
326 Self { inner: client }
327 }
328
329 fn unimpl(&self) -> common_meta::error::Error {
330 let ret: common_meta::error::Result<()> = ReadOnlyKvBackendSnafu {
331 name: self.name().to_string(),
332 }
333 .fail()
334 .map_err(BoxedError::new)
335 .context(common_meta::error::ExternalSnafu);
336 ret.unwrap_err()
337 }
338}
339
340impl TxnService for ClusterKvBackend {
341 type Error = common_meta::error::Error;
342}
343
344#[async_trait::async_trait]
345impl KvBackend for ClusterKvBackend {
346 fn name(&self) -> &str {
347 "ClusterKvBackend"
348 }
349
350 fn as_any(&self) -> &dyn Any {
351 self
352 }
353
354 async fn range(&self, req: RangeRequest) -> common_meta::error::Result<RangeResponse> {
355 self.inner
356 .range(req)
357 .await
358 .map_err(BoxedError::new)
359 .context(common_meta::error::ExternalSnafu)
360 }
361
362 async fn batch_get(&self, _: BatchGetRequest) -> common_meta::error::Result<BatchGetResponse> {
363 Err(self.unimpl())
364 }
365
366 async fn put(&self, _: PutRequest) -> common_meta::error::Result<PutResponse> {
367 Err(self.unimpl())
368 }
369
370 async fn batch_put(&self, _: BatchPutRequest) -> common_meta::error::Result<BatchPutResponse> {
371 Err(self.unimpl())
372 }
373
374 async fn delete_range(
375 &self,
376 _: DeleteRangeRequest,
377 ) -> common_meta::error::Result<DeleteRangeResponse> {
378 Err(self.unimpl())
379 }
380
381 async fn batch_delete(
382 &self,
383 _: BatchDeleteRequest,
384 ) -> common_meta::error::Result<BatchDeleteResponse> {
385 Err(self.unimpl())
386 }
387}