1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{
22 BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
23 RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
27use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
28use common_meta::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
31 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
32};
33use common_meta::rpc::KeyValue;
34use common_meta::util;
35use common_telemetry::warn;
36use derive_builder::Builder;
37use snafu::{ensure, OptionExt, ResultExt};
38
39use crate::error;
40use crate::error::{match_for_io_error, Result};
41use crate::metasrv::ElectionRef;
42
43pub type MetaPeerClientRef = Arc<MetaPeerClient>;
44
45#[derive(Builder)]
46pub struct MetaPeerClient {
47 election: Option<ElectionRef>,
48 in_memory: ResettableKvBackendRef,
49 #[builder(default = "ChannelManager::default()")]
50 channel_manager: ChannelManager,
51 #[builder(default = "3")]
52 max_retry_count: usize,
53 #[builder(default = "1000")]
54 retry_interval_ms: u64,
55}
56
57#[async_trait::async_trait]
58impl TxnService for MetaPeerClient {
59 type Error = error::Error;
60}
61
62#[async_trait::async_trait]
63impl KvBackend for MetaPeerClient {
64 fn name(&self) -> &str {
65 "MetaPeerClient"
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
73 if self.is_leader() {
74 return self
75 .in_memory
76 .range(req)
77 .await
78 .context(error::KvBackendSnafu);
79 }
80
81 let max_retry_count = self.max_retry_count;
82 let retry_interval_ms = self.retry_interval_ms;
83
84 for _ in 0..max_retry_count {
85 match self
86 .remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
87 .await
88 {
89 Ok(res) => return Ok(res),
90 Err(e) => {
91 if need_retry(&e) {
92 warn!(e; "Encountered an error that need to retry");
93 tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
94 } else {
95 return Err(e);
96 }
97 }
98 }
99 }
100
101 error::ExceededRetryLimitSnafu {
102 func_name: "range",
103 retry_num: max_retry_count,
104 }
105 .fail()
106 }
107
108 async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
110 error::UnsupportedSnafu {
111 operation: "put".to_string(),
112 }
113 .fail()
114 }
115
116 async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
117 error::UnsupportedSnafu {
118 operation: "batch put".to_string(),
119 }
120 .fail()
121 }
122
123 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
125 if self.is_leader() {
126 return self
127 .in_memory
128 .batch_get(req)
129 .await
130 .context(error::KvBackendSnafu);
131 }
132
133 let max_retry_count = self.max_retry_count;
134 let retry_interval_ms = self.retry_interval_ms;
135
136 for _ in 0..max_retry_count {
137 match self.remote_batch_get(req.keys.clone()).await {
138 Ok(res) => return Ok(res),
139 Err(e) => {
140 if need_retry(&e) {
141 warn!(e; "Encountered an error that need to retry");
142 tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
143 } else {
144 return Err(e);
145 }
146 }
147 }
148 }
149
150 error::ExceededRetryLimitSnafu {
151 func_name: "batch_get",
152 retry_num: max_retry_count,
153 }
154 .fail()
155 }
156
157 async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
158 error::UnsupportedSnafu {
159 operation: "delete range".to_string(),
160 }
161 .fail()
162 }
163
164 async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
165 error::UnsupportedSnafu {
166 operation: "batch delete".to_string(),
167 }
168 .fail()
169 }
170
171 async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
172 error::UnsupportedSnafu {
173 operation: "compare and put".to_string(),
174 }
175 .fail()
176 }
177
178 async fn put_conditionally(
179 &self,
180 _key: Vec<u8>,
181 _value: Vec<u8>,
182 _if_not_exists: bool,
183 ) -> Result<bool> {
184 error::UnsupportedSnafu {
185 operation: "put conditionally".to_string(),
186 }
187 .fail()
188 }
189
190 async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
191 error::UnsupportedSnafu {
192 operation: "delete".to_string(),
193 }
194 .fail()
195 }
196}
197
198impl MetaPeerClient {
199 async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
200 let key = DatanodeStatKey::prefix_key();
201 let range_end = util::get_prefix_end_key(&key);
202 let range_request = RangeRequest {
203 key,
204 range_end,
205 keys_only,
206 ..Default::default()
207 };
208 self.range(range_request).await.map(|res| res.kvs)
209 }
210
211 pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
213 let kvs = self.get_dn_key_value(false).await?;
214 to_stat_kv_map(kvs)
215 }
216
217 pub async fn get_node_cnt(&self) -> Result<i32> {
218 let kvs = self.get_dn_key_value(true).await?;
219 kvs.into_iter()
220 .map(|kv| {
221 kv.key
222 .try_into()
223 .context(error::InvalidDatanodeStatFormatSnafu {})
224 })
225 .collect::<Result<HashSet<DatanodeStatKey>>>()
226 .map(|hash_set| hash_set.len() as i32)
227 }
228
229 pub async fn get_dn_stat_kvs(
231 &self,
232 keys: Vec<DatanodeStatKey>,
233 ) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
234 let stat_keys = keys.into_iter().map(|key| key.into()).collect();
235 let batch_get_req = BatchGetRequest { keys: stat_keys };
236
237 let res = self.batch_get(batch_get_req).await?;
238
239 to_stat_kv_map(res.kvs)
240 }
241
242 async fn remote_range(
243 &self,
244 key: Vec<u8>,
245 range_end: Vec<u8>,
246 keys_only: bool,
247 ) -> Result<RangeResponse> {
248 let election = self.election.as_ref().unwrap();
250
251 let leader_addr = election.leader().await?.0;
252
253 let channel = self
254 .channel_manager
255 .get(&leader_addr)
256 .context(error::CreateChannelSnafu)?;
257
258 let request = tonic::Request::new(PbRangeRequest {
259 key,
260 range_end,
261 keys_only,
262 ..Default::default()
263 });
264
265 let response: PbRangeResponse = ClusterClient::new(channel)
266 .range(request)
267 .await
268 .context(error::RangeSnafu)?
269 .into_inner();
270
271 check_resp_header(&response.header, Context { addr: &leader_addr })?;
272
273 Ok(RangeResponse {
274 kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
275 more: response.more,
276 })
277 }
278
279 async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
280 let election = self.election.as_ref().unwrap();
282
283 let leader_addr = election.leader().await?.0;
284
285 let channel = self
286 .channel_manager
287 .get(&leader_addr)
288 .context(error::CreateChannelSnafu)?;
289
290 let request = tonic::Request::new(PbBatchGetRequest {
291 keys,
292 ..Default::default()
293 });
294
295 let response: PbBatchGetResponse = ClusterClient::new(channel)
296 .batch_get(request)
297 .await
298 .context(error::BatchGetSnafu)?
299 .into_inner();
300
301 check_resp_header(&response.header, Context { addr: &leader_addr })?;
302
303 Ok(BatchGetResponse {
304 kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
305 })
306 }
307
308 pub(crate) fn is_leader(&self) -> bool {
311 self.election
312 .as_ref()
313 .map(|election| election.is_leader())
314 .unwrap_or(true)
315 }
316
317 #[cfg(test)]
318 pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
319 self.in_memory.clone()
320 }
321}
322
323fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
324 let mut map = HashMap::with_capacity(kvs.len());
325 for kv in kvs {
326 let _ = map.insert(
327 kv.key
328 .try_into()
329 .context(error::InvalidDatanodeStatFormatSnafu {})?,
330 kv.value
331 .try_into()
332 .context(error::InvalidDatanodeStatFormatSnafu {})?,
333 );
334 }
335 Ok(map)
336}
337
338struct Context<'a> {
339 addr: &'a str,
340}
341
342fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()> {
343 let header = header
344 .as_ref()
345 .context(error::ResponseHeaderNotFoundSnafu)?;
346
347 ensure!(
348 !header.is_not_leader(),
349 error::IsNotLeaderSnafu {
350 node_addr: ctx.addr
351 }
352 );
353
354 Ok(())
355}
356
357fn need_retry(error: &error::Error) -> bool {
358 match error {
359 error::Error::IsNotLeader { .. } => true,
360 error::Error::Range { error, .. } | error::Error::BatchGet { error, .. } => {
361 match_for_io_error(error).is_some()
362 }
363 _ => false,
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use api::v1::meta::{Error, ErrorCode, ResponseHeader};
370 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
371 use common_meta::rpc::KeyValue;
372
373 use super::{check_resp_header, to_stat_kv_map, Context};
374 use crate::error;
375
376 #[test]
377 fn test_to_stat_kv_map() {
378 let stat_key = DatanodeStatKey { node_id: 100 };
379
380 let stat = Stat {
381 id: 100,
382 addr: "127.0.0.1:3001".to_string(),
383 ..Default::default()
384 };
385 let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap();
386
387 let kv = KeyValue {
388 key: stat_key.into(),
389 value: stat_val,
390 };
391
392 let kv_map = to_stat_kv_map(vec![kv]).unwrap();
393 assert_eq!(1, kv_map.len());
394 let _ = kv_map.get(&stat_key).unwrap();
395
396 let stat_val = kv_map.get(&stat_key).unwrap();
397 let stat = stat_val.stats.first().unwrap();
398
399 assert_eq!(100, stat.id);
400 assert_eq!("127.0.0.1:3001", stat.addr);
401 }
402
403 #[test]
404 fn test_check_resp_header() {
405 let header = Some(ResponseHeader {
406 error: None,
407 ..Default::default()
408 });
409 check_resp_header(&header, mock_ctx()).unwrap();
410
411 let result = check_resp_header(&None, mock_ctx());
412 assert!(result.is_err());
413 assert!(matches!(
414 result.err().unwrap(),
415 error::Error::ResponseHeaderNotFound { .. }
416 ));
417
418 let header = Some(ResponseHeader {
419 error: Some(Error {
420 code: ErrorCode::NotLeader as i32,
421 err_msg: "The current meta is not leader".to_string(),
422 }),
423 ..Default::default()
424 });
425 let result = check_resp_header(&header, mock_ctx());
426 assert!(result.is_err());
427 assert!(matches!(
428 result.err().unwrap(),
429 error::Error::IsNotLeader { .. }
430 ));
431 }
432
433 fn mock_ctx<'a>() -> Context<'a> {
434 Context { addr: "addr" }
435 }
436}