meta_srv/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{
22    BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
23    RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
27use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
28use common_meta::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
31    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
32};
33use common_meta::rpc::KeyValue;
34use common_meta::util;
35use common_telemetry::warn;
36use derive_builder::Builder;
37use snafu::{ensure, OptionExt, ResultExt};
38
39use crate::error;
40use crate::error::{match_for_io_error, Result};
41use crate::metasrv::ElectionRef;
42
43pub type MetaPeerClientRef = Arc<MetaPeerClient>;
44
45#[derive(Builder)]
46pub struct MetaPeerClient {
47    election: Option<ElectionRef>,
48    in_memory: ResettableKvBackendRef,
49    #[builder(default = "ChannelManager::default()")]
50    channel_manager: ChannelManager,
51    #[builder(default = "3")]
52    max_retry_count: usize,
53    #[builder(default = "1000")]
54    retry_interval_ms: u64,
55}
56
57#[async_trait::async_trait]
58impl TxnService for MetaPeerClient {
59    type Error = error::Error;
60}
61
62#[async_trait::async_trait]
63impl KvBackend for MetaPeerClient {
64    fn name(&self) -> &str {
65        "MetaPeerClient"
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71
72    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
73        if self.is_leader() {
74            return self
75                .in_memory
76                .range(req)
77                .await
78                .context(error::KvBackendSnafu);
79        }
80
81        let max_retry_count = self.max_retry_count;
82        let retry_interval_ms = self.retry_interval_ms;
83
84        for _ in 0..max_retry_count {
85            match self
86                .remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
87                .await
88            {
89                Ok(res) => return Ok(res),
90                Err(e) => {
91                    if need_retry(&e) {
92                        warn!(e; "Encountered an error that need to retry");
93                        tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
94                    } else {
95                        return Err(e);
96                    }
97                }
98            }
99        }
100
101        error::ExceededRetryLimitSnafu {
102            func_name: "range",
103            retry_num: max_retry_count,
104        }
105        .fail()
106    }
107
108    // MetaPeerClient does not support mutable methods listed below.
109    async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
110        error::UnsupportedSnafu {
111            operation: "put".to_string(),
112        }
113        .fail()
114    }
115
116    async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
117        error::UnsupportedSnafu {
118            operation: "batch put".to_string(),
119        }
120        .fail()
121    }
122
123    // Get kv information from the leader's in_mem kv store
124    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
125        if self.is_leader() {
126            return self
127                .in_memory
128                .batch_get(req)
129                .await
130                .context(error::KvBackendSnafu);
131        }
132
133        let max_retry_count = self.max_retry_count;
134        let retry_interval_ms = self.retry_interval_ms;
135
136        for _ in 0..max_retry_count {
137            match self.remote_batch_get(req.keys.clone()).await {
138                Ok(res) => return Ok(res),
139                Err(e) => {
140                    if need_retry(&e) {
141                        warn!(e; "Encountered an error that need to retry");
142                        tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
143                    } else {
144                        return Err(e);
145                    }
146                }
147            }
148        }
149
150        error::ExceededRetryLimitSnafu {
151            func_name: "batch_get",
152            retry_num: max_retry_count,
153        }
154        .fail()
155    }
156
157    async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
158        error::UnsupportedSnafu {
159            operation: "delete range".to_string(),
160        }
161        .fail()
162    }
163
164    async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
165        error::UnsupportedSnafu {
166            operation: "batch delete".to_string(),
167        }
168        .fail()
169    }
170
171    async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
172        error::UnsupportedSnafu {
173            operation: "compare and put".to_string(),
174        }
175        .fail()
176    }
177
178    async fn put_conditionally(
179        &self,
180        _key: Vec<u8>,
181        _value: Vec<u8>,
182        _if_not_exists: bool,
183    ) -> Result<bool> {
184        error::UnsupportedSnafu {
185            operation: "put conditionally".to_string(),
186        }
187        .fail()
188    }
189
190    async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
191        error::UnsupportedSnafu {
192            operation: "delete".to_string(),
193        }
194        .fail()
195    }
196}
197
198impl MetaPeerClient {
199    async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
200        let key = DatanodeStatKey::prefix_key();
201        let range_end = util::get_prefix_end_key(&key);
202        let range_request = RangeRequest {
203            key,
204            range_end,
205            keys_only,
206            ..Default::default()
207        };
208        self.range(range_request).await.map(|res| res.kvs)
209    }
210
211    // Get all datanode stat kvs from leader meta.
212    pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
213        let kvs = self.get_dn_key_value(false).await?;
214        to_stat_kv_map(kvs)
215    }
216
217    pub async fn get_node_cnt(&self) -> Result<i32> {
218        let kvs = self.get_dn_key_value(true).await?;
219        kvs.into_iter()
220            .map(|kv| {
221                kv.key
222                    .try_into()
223                    .context(error::InvalidDatanodeStatFormatSnafu {})
224            })
225            .collect::<Result<HashSet<DatanodeStatKey>>>()
226            .map(|hash_set| hash_set.len() as i32)
227    }
228
229    // Get datanode stat kvs from leader meta by input keys.
230    pub async fn get_dn_stat_kvs(
231        &self,
232        keys: Vec<DatanodeStatKey>,
233    ) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
234        let stat_keys = keys.into_iter().map(|key| key.into()).collect();
235        let batch_get_req = BatchGetRequest { keys: stat_keys };
236
237        let res = self.batch_get(batch_get_req).await?;
238
239        to_stat_kv_map(res.kvs)
240    }
241
242    async fn remote_range(
243        &self,
244        key: Vec<u8>,
245        range_end: Vec<u8>,
246        keys_only: bool,
247    ) -> Result<RangeResponse> {
248        // Safety: when self.is_leader() == false, election must not empty.
249        let election = self.election.as_ref().unwrap();
250
251        let leader_addr = election.leader().await?.0;
252
253        let channel = self
254            .channel_manager
255            .get(&leader_addr)
256            .context(error::CreateChannelSnafu)?;
257
258        let request = tonic::Request::new(PbRangeRequest {
259            key,
260            range_end,
261            keys_only,
262            ..Default::default()
263        });
264
265        let response: PbRangeResponse = ClusterClient::new(channel)
266            .range(request)
267            .await
268            .context(error::RangeSnafu)?
269            .into_inner();
270
271        check_resp_header(&response.header, Context { addr: &leader_addr })?;
272
273        Ok(RangeResponse {
274            kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
275            more: response.more,
276        })
277    }
278
279    async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
280        // Safety: when self.is_leader() == false, election must not empty.
281        let election = self.election.as_ref().unwrap();
282
283        let leader_addr = election.leader().await?.0;
284
285        let channel = self
286            .channel_manager
287            .get(&leader_addr)
288            .context(error::CreateChannelSnafu)?;
289
290        let request = tonic::Request::new(PbBatchGetRequest {
291            keys,
292            ..Default::default()
293        });
294
295        let response: PbBatchGetResponse = ClusterClient::new(channel)
296            .batch_get(request)
297            .await
298            .context(error::BatchGetSnafu)?
299            .into_inner();
300
301        check_resp_header(&response.header, Context { addr: &leader_addr })?;
302
303        Ok(BatchGetResponse {
304            kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
305        })
306    }
307
308    // Check if the meta node is a leader node.
309    // Note: when self.election is None, we also consider the meta node is leader
310    pub(crate) fn is_leader(&self) -> bool {
311        self.election
312            .as_ref()
313            .map(|election| election.is_leader())
314            .unwrap_or(true)
315    }
316
317    #[cfg(test)]
318    pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
319        self.in_memory.clone()
320    }
321}
322
323fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
324    let mut map = HashMap::with_capacity(kvs.len());
325    for kv in kvs {
326        let _ = map.insert(
327            kv.key
328                .try_into()
329                .context(error::InvalidDatanodeStatFormatSnafu {})?,
330            kv.value
331                .try_into()
332                .context(error::InvalidDatanodeStatFormatSnafu {})?,
333        );
334    }
335    Ok(map)
336}
337
338struct Context<'a> {
339    addr: &'a str,
340}
341
342fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()> {
343    let header = header
344        .as_ref()
345        .context(error::ResponseHeaderNotFoundSnafu)?;
346
347    ensure!(
348        !header.is_not_leader(),
349        error::IsNotLeaderSnafu {
350            node_addr: ctx.addr
351        }
352    );
353
354    Ok(())
355}
356
357fn need_retry(error: &error::Error) -> bool {
358    match error {
359        error::Error::IsNotLeader { .. } => true,
360        error::Error::Range { error, .. } | error::Error::BatchGet { error, .. } => {
361            match_for_io_error(error).is_some()
362        }
363        _ => false,
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use api::v1::meta::{Error, ErrorCode, ResponseHeader};
370    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
371    use common_meta::rpc::KeyValue;
372
373    use super::{check_resp_header, to_stat_kv_map, Context};
374    use crate::error;
375
376    #[test]
377    fn test_to_stat_kv_map() {
378        let stat_key = DatanodeStatKey { node_id: 100 };
379
380        let stat = Stat {
381            id: 100,
382            addr: "127.0.0.1:3001".to_string(),
383            ..Default::default()
384        };
385        let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap();
386
387        let kv = KeyValue {
388            key: stat_key.into(),
389            value: stat_val,
390        };
391
392        let kv_map = to_stat_kv_map(vec![kv]).unwrap();
393        assert_eq!(1, kv_map.len());
394        let _ = kv_map.get(&stat_key).unwrap();
395
396        let stat_val = kv_map.get(&stat_key).unwrap();
397        let stat = stat_val.stats.first().unwrap();
398
399        assert_eq!(100, stat.id);
400        assert_eq!("127.0.0.1:3001", stat.addr);
401    }
402
403    #[test]
404    fn test_check_resp_header() {
405        let header = Some(ResponseHeader {
406            error: None,
407            ..Default::default()
408        });
409        check_resp_header(&header, mock_ctx()).unwrap();
410
411        let result = check_resp_header(&None, mock_ctx());
412        assert!(result.is_err());
413        assert!(matches!(
414            result.err().unwrap(),
415            error::Error::ResponseHeaderNotFound { .. }
416        ));
417
418        let header = Some(ResponseHeader {
419            error: Some(Error {
420                code: ErrorCode::NotLeader as i32,
421                err_msg: "The current meta is not leader".to_string(),
422            }),
423            ..Default::default()
424        });
425        let result = check_resp_header(&header, mock_ctx());
426        assert!(result.is_err());
427        assert!(matches!(
428            result.err().unwrap(),
429            error::Error::IsNotLeader { .. }
430        ));
431    }
432
433    fn mock_ctx<'a>() -> Context<'a> {
434        Context { addr: "addr" }
435    }
436}