1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{
22 BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
23 RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
27use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
28use common_meta::rpc::KeyValue;
29use common_meta::rpc::store::{
30 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
32 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
33};
34use common_meta::util;
35use common_telemetry::warn;
36use derive_builder::Builder;
37use snafu::{OptionExt, ResultExt, ensure};
38
39use crate::error::{self, Result, match_for_io_error};
40use crate::metasrv::ElectionRef;
41
42pub type MetaPeerClientRef = Arc<MetaPeerClient>;
43
44#[derive(Builder)]
45pub struct MetaPeerClient {
46 election: Option<ElectionRef>,
47 in_memory: ResettableKvBackendRef,
48 #[builder(default = "ChannelManager::default()")]
49 channel_manager: ChannelManager,
50 #[builder(default = "3")]
51 max_retry_count: usize,
52 #[builder(default = "1000")]
53 retry_interval_ms: u64,
54}
55
56#[async_trait::async_trait]
57impl TxnService for MetaPeerClient {
58 type Error = error::Error;
59}
60
61#[async_trait::async_trait]
62impl KvBackend for MetaPeerClient {
63 fn name(&self) -> &str {
64 "MetaPeerClient"
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
72 if self.is_leader() {
73 return self
74 .in_memory
75 .range(req)
76 .await
77 .context(error::KvBackendSnafu);
78 }
79
80 let max_retry_count = self.max_retry_count;
81 let retry_interval_ms = self.retry_interval_ms;
82
83 for _ in 0..max_retry_count {
84 match self
85 .remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
86 .await
87 {
88 Ok(res) => return Ok(res),
89 Err(e) => {
90 if need_retry(&e) {
91 warn!(e; "Encountered an error that need to retry");
92 tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
93 } else {
94 return Err(e);
95 }
96 }
97 }
98 }
99
100 error::ExceededRetryLimitSnafu {
101 func_name: "range",
102 retry_num: max_retry_count,
103 }
104 .fail()
105 }
106
107 async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
109 error::UnsupportedSnafu {
110 operation: "put".to_string(),
111 }
112 .fail()
113 }
114
115 async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
116 error::UnsupportedSnafu {
117 operation: "batch put".to_string(),
118 }
119 .fail()
120 }
121
122 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
124 if self.is_leader() {
125 return self
126 .in_memory
127 .batch_get(req)
128 .await
129 .context(error::KvBackendSnafu);
130 }
131
132 let max_retry_count = self.max_retry_count;
133 let retry_interval_ms = self.retry_interval_ms;
134
135 for _ in 0..max_retry_count {
136 match self.remote_batch_get(req.keys.clone()).await {
137 Ok(res) => return Ok(res),
138 Err(e) => {
139 if need_retry(&e) {
140 warn!(e; "Encountered an error that need to retry");
141 tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
142 } else {
143 return Err(e);
144 }
145 }
146 }
147 }
148
149 error::ExceededRetryLimitSnafu {
150 func_name: "batch_get",
151 retry_num: max_retry_count,
152 }
153 .fail()
154 }
155
156 async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
157 error::UnsupportedSnafu {
158 operation: "delete range".to_string(),
159 }
160 .fail()
161 }
162
163 async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
164 error::UnsupportedSnafu {
165 operation: "batch delete".to_string(),
166 }
167 .fail()
168 }
169
170 async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
171 error::UnsupportedSnafu {
172 operation: "compare and put".to_string(),
173 }
174 .fail()
175 }
176
177 async fn put_conditionally(
178 &self,
179 _key: Vec<u8>,
180 _value: Vec<u8>,
181 _if_not_exists: bool,
182 ) -> Result<bool> {
183 error::UnsupportedSnafu {
184 operation: "put conditionally".to_string(),
185 }
186 .fail()
187 }
188
189 async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
190 error::UnsupportedSnafu {
191 operation: "delete".to_string(),
192 }
193 .fail()
194 }
195}
196
197impl MetaPeerClient {
198 async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
199 let key = DatanodeStatKey::prefix_key();
200 let range_end = util::get_prefix_end_key(&key);
201 let range_request = RangeRequest {
202 key,
203 range_end,
204 keys_only,
205 ..Default::default()
206 };
207 self.range(range_request).await.map(|res| res.kvs)
208 }
209
210 pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
212 let kvs = self.get_dn_key_value(false).await?;
213 to_stat_kv_map(kvs)
214 }
215
216 pub async fn get_node_cnt(&self) -> Result<i32> {
217 let kvs = self.get_dn_key_value(true).await?;
218 kvs.into_iter()
219 .map(|kv| {
220 kv.key
221 .try_into()
222 .context(error::InvalidDatanodeStatFormatSnafu {})
223 })
224 .collect::<Result<HashSet<DatanodeStatKey>>>()
225 .map(|hash_set| hash_set.len() as i32)
226 }
227
228 pub async fn get_dn_stat_kvs(
230 &self,
231 keys: Vec<DatanodeStatKey>,
232 ) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
233 let stat_keys = keys.into_iter().map(|key| key.into()).collect();
234 let batch_get_req = BatchGetRequest { keys: stat_keys };
235
236 let res = self.batch_get(batch_get_req).await?;
237
238 to_stat_kv_map(res.kvs)
239 }
240
241 async fn remote_range(
242 &self,
243 key: Vec<u8>,
244 range_end: Vec<u8>,
245 keys_only: bool,
246 ) -> Result<RangeResponse> {
247 let election = self.election.as_ref().unwrap();
249
250 let leader_addr = election.leader().await?.0;
251
252 let channel = self
253 .channel_manager
254 .get(&leader_addr)
255 .context(error::CreateChannelSnafu)?;
256
257 let request = tonic::Request::new(PbRangeRequest {
258 key,
259 range_end,
260 keys_only,
261 ..Default::default()
262 });
263
264 let response: PbRangeResponse = ClusterClient::new(channel)
265 .range(request)
266 .await
267 .context(error::RangeSnafu)?
268 .into_inner();
269
270 check_resp_header(&response.header, Context { addr: &leader_addr })?;
271
272 Ok(RangeResponse {
273 kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
274 more: response.more,
275 })
276 }
277
278 async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
279 let election = self.election.as_ref().unwrap();
281
282 let leader_addr = election.leader().await?.0;
283
284 let channel = self
285 .channel_manager
286 .get(&leader_addr)
287 .context(error::CreateChannelSnafu)?;
288
289 let request = tonic::Request::new(PbBatchGetRequest {
290 keys,
291 ..Default::default()
292 });
293
294 let response: PbBatchGetResponse = ClusterClient::new(channel)
295 .batch_get(request)
296 .await
297 .context(error::BatchGetSnafu)?
298 .into_inner();
299
300 check_resp_header(&response.header, Context { addr: &leader_addr })?;
301
302 Ok(BatchGetResponse {
303 kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
304 })
305 }
306
307 pub(crate) fn is_leader(&self) -> bool {
310 self.election
311 .as_ref()
312 .map(|election| election.is_leader())
313 .unwrap_or(true)
314 }
315
316 #[cfg(test)]
317 pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
318 self.in_memory.clone()
319 }
320}
321
322fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
323 let mut map = HashMap::with_capacity(kvs.len());
324 for kv in kvs {
325 let _ = map.insert(
326 kv.key
327 .try_into()
328 .context(error::InvalidDatanodeStatFormatSnafu {})?,
329 kv.value
330 .try_into()
331 .context(error::InvalidDatanodeStatFormatSnafu {})?,
332 );
333 }
334 Ok(map)
335}
336
337struct Context<'a> {
338 addr: &'a str,
339}
340
341fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()> {
342 let header = header
343 .as_ref()
344 .context(error::ResponseHeaderNotFoundSnafu)?;
345
346 ensure!(
347 !header.is_not_leader(),
348 error::IsNotLeaderSnafu {
349 node_addr: ctx.addr
350 }
351 );
352
353 Ok(())
354}
355
356fn need_retry(error: &error::Error) -> bool {
357 match error {
358 error::Error::IsNotLeader { .. } => true,
359 error::Error::Range { error, .. } | error::Error::BatchGet { error, .. } => {
360 match_for_io_error(error).is_some()
361 }
362 _ => false,
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use api::v1::meta::{Error, ErrorCode, ResponseHeader};
369 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
370 use common_meta::rpc::KeyValue;
371
372 use super::{Context, check_resp_header, to_stat_kv_map};
373 use crate::error;
374
375 #[test]
376 fn test_to_stat_kv_map() {
377 let stat_key = DatanodeStatKey { node_id: 100 };
378
379 let stat = Stat {
380 id: 100,
381 addr: "127.0.0.1:3001".to_string(),
382 ..Default::default()
383 };
384 let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap();
385
386 let kv = KeyValue {
387 key: stat_key.into(),
388 value: stat_val,
389 };
390
391 let kv_map = to_stat_kv_map(vec![kv]).unwrap();
392 assert_eq!(1, kv_map.len());
393 let _ = kv_map.get(&stat_key).unwrap();
394
395 let stat_val = kv_map.get(&stat_key).unwrap();
396 let stat = stat_val.stats.first().unwrap();
397
398 assert_eq!(100, stat.id);
399 assert_eq!("127.0.0.1:3001", stat.addr);
400 }
401
402 #[test]
403 fn test_check_resp_header() {
404 let header = Some(ResponseHeader {
405 error: None,
406 ..Default::default()
407 });
408 check_resp_header(&header, mock_ctx()).unwrap();
409
410 let result = check_resp_header(&None, mock_ctx());
411 assert!(result.is_err());
412 assert!(matches!(
413 result.err().unwrap(),
414 error::Error::ResponseHeaderNotFound { .. }
415 ));
416
417 let header = Some(ResponseHeader {
418 error: Some(Error {
419 code: ErrorCode::NotLeader as i32,
420 err_msg: "The current meta is not leader".to_string(),
421 }),
422 ..Default::default()
423 });
424 let result = check_resp_header(&header, mock_ctx());
425 assert!(result.is_err());
426 assert!(matches!(
427 result.err().unwrap(),
428 error::Error::IsNotLeader { .. }
429 ));
430 }
431
432 fn mock_ctx<'a>() -> Context<'a> {
433 Context { addr: "addr" }
434 }
435}