meta_srv/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::cluster_client::ClusterClient;
21use api::v1::meta::{
22    BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
23    RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
24};
25use common_grpc::channel_manager::ChannelManager;
26use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
27use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
28use common_meta::rpc::KeyValue;
29use common_meta::rpc::store::{
30    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
32    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
33};
34use common_meta::util;
35use common_telemetry::warn;
36use derive_builder::Builder;
37use snafu::{OptionExt, ResultExt, ensure};
38
39use crate::error::{self, Result, match_for_io_error};
40use crate::metasrv::ElectionRef;
41
42pub type MetaPeerClientRef = Arc<MetaPeerClient>;
43
44#[derive(Builder)]
45pub struct MetaPeerClient {
46    election: Option<ElectionRef>,
47    in_memory: ResettableKvBackendRef,
48    #[builder(default = "ChannelManager::default()")]
49    channel_manager: ChannelManager,
50    #[builder(default = "3")]
51    max_retry_count: usize,
52    #[builder(default = "1000")]
53    retry_interval_ms: u64,
54}
55
56#[async_trait::async_trait]
57impl TxnService for MetaPeerClient {
58    type Error = error::Error;
59}
60
61#[async_trait::async_trait]
62impl KvBackend for MetaPeerClient {
63    fn name(&self) -> &str {
64        "MetaPeerClient"
65    }
66
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
72        if self.is_leader() {
73            return self
74                .in_memory
75                .range(req)
76                .await
77                .context(error::KvBackendSnafu);
78        }
79
80        let max_retry_count = self.max_retry_count;
81        let retry_interval_ms = self.retry_interval_ms;
82
83        for _ in 0..max_retry_count {
84            match self
85                .remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
86                .await
87            {
88                Ok(res) => return Ok(res),
89                Err(e) => {
90                    if need_retry(&e) {
91                        warn!(e; "Encountered an error that need to retry");
92                        tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
93                    } else {
94                        return Err(e);
95                    }
96                }
97            }
98        }
99
100        error::ExceededRetryLimitSnafu {
101            func_name: "range",
102            retry_num: max_retry_count,
103        }
104        .fail()
105    }
106
107    // MetaPeerClient does not support mutable methods listed below.
108    async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
109        error::UnsupportedSnafu {
110            operation: "put".to_string(),
111        }
112        .fail()
113    }
114
115    async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
116        error::UnsupportedSnafu {
117            operation: "batch put".to_string(),
118        }
119        .fail()
120    }
121
122    // Get kv information from the leader's in_mem kv store
123    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
124        if self.is_leader() {
125            return self
126                .in_memory
127                .batch_get(req)
128                .await
129                .context(error::KvBackendSnafu);
130        }
131
132        let max_retry_count = self.max_retry_count;
133        let retry_interval_ms = self.retry_interval_ms;
134
135        for _ in 0..max_retry_count {
136            match self.remote_batch_get(req.keys.clone()).await {
137                Ok(res) => return Ok(res),
138                Err(e) => {
139                    if need_retry(&e) {
140                        warn!(e; "Encountered an error that need to retry");
141                        tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
142                    } else {
143                        return Err(e);
144                    }
145                }
146            }
147        }
148
149        error::ExceededRetryLimitSnafu {
150            func_name: "batch_get",
151            retry_num: max_retry_count,
152        }
153        .fail()
154    }
155
156    async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
157        error::UnsupportedSnafu {
158            operation: "delete range".to_string(),
159        }
160        .fail()
161    }
162
163    async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
164        error::UnsupportedSnafu {
165            operation: "batch delete".to_string(),
166        }
167        .fail()
168    }
169
170    async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
171        error::UnsupportedSnafu {
172            operation: "compare and put".to_string(),
173        }
174        .fail()
175    }
176
177    async fn put_conditionally(
178        &self,
179        _key: Vec<u8>,
180        _value: Vec<u8>,
181        _if_not_exists: bool,
182    ) -> Result<bool> {
183        error::UnsupportedSnafu {
184            operation: "put conditionally".to_string(),
185        }
186        .fail()
187    }
188
189    async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
190        error::UnsupportedSnafu {
191            operation: "delete".to_string(),
192        }
193        .fail()
194    }
195}
196
197impl MetaPeerClient {
198    async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
199        let key = DatanodeStatKey::prefix_key();
200        let range_end = util::get_prefix_end_key(&key);
201        let range_request = RangeRequest {
202            key,
203            range_end,
204            keys_only,
205            ..Default::default()
206        };
207        self.range(range_request).await.map(|res| res.kvs)
208    }
209
210    // Get all datanode stat kvs from leader meta.
211    pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
212        let kvs = self.get_dn_key_value(false).await?;
213        to_stat_kv_map(kvs)
214    }
215
216    pub async fn get_node_cnt(&self) -> Result<i32> {
217        let kvs = self.get_dn_key_value(true).await?;
218        kvs.into_iter()
219            .map(|kv| {
220                kv.key
221                    .try_into()
222                    .context(error::InvalidDatanodeStatFormatSnafu {})
223            })
224            .collect::<Result<HashSet<DatanodeStatKey>>>()
225            .map(|hash_set| hash_set.len() as i32)
226    }
227
228    // Get datanode stat kvs from leader meta by input keys.
229    pub async fn get_dn_stat_kvs(
230        &self,
231        keys: Vec<DatanodeStatKey>,
232    ) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
233        let stat_keys = keys.into_iter().map(|key| key.into()).collect();
234        let batch_get_req = BatchGetRequest { keys: stat_keys };
235
236        let res = self.batch_get(batch_get_req).await?;
237
238        to_stat_kv_map(res.kvs)
239    }
240
241    async fn remote_range(
242        &self,
243        key: Vec<u8>,
244        range_end: Vec<u8>,
245        keys_only: bool,
246    ) -> Result<RangeResponse> {
247        // Safety: when self.is_leader() == false, election must not empty.
248        let election = self.election.as_ref().unwrap();
249
250        let leader_addr = election.leader().await?.0;
251
252        let channel = self
253            .channel_manager
254            .get(&leader_addr)
255            .context(error::CreateChannelSnafu)?;
256
257        let request = tonic::Request::new(PbRangeRequest {
258            key,
259            range_end,
260            keys_only,
261            ..Default::default()
262        });
263
264        let response: PbRangeResponse = ClusterClient::new(channel)
265            .range(request)
266            .await
267            .context(error::RangeSnafu)?
268            .into_inner();
269
270        check_resp_header(&response.header, Context { addr: &leader_addr })?;
271
272        Ok(RangeResponse {
273            kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
274            more: response.more,
275        })
276    }
277
278    async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
279        // Safety: when self.is_leader() == false, election must not empty.
280        let election = self.election.as_ref().unwrap();
281
282        let leader_addr = election.leader().await?.0;
283
284        let channel = self
285            .channel_manager
286            .get(&leader_addr)
287            .context(error::CreateChannelSnafu)?;
288
289        let request = tonic::Request::new(PbBatchGetRequest {
290            keys,
291            ..Default::default()
292        });
293
294        let response: PbBatchGetResponse = ClusterClient::new(channel)
295            .batch_get(request)
296            .await
297            .context(error::BatchGetSnafu)?
298            .into_inner();
299
300        check_resp_header(&response.header, Context { addr: &leader_addr })?;
301
302        Ok(BatchGetResponse {
303            kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
304        })
305    }
306
307    // Check if the meta node is a leader node.
308    // Note: when self.election is None, we also consider the meta node is leader
309    pub(crate) fn is_leader(&self) -> bool {
310        self.election
311            .as_ref()
312            .map(|election| election.is_leader())
313            .unwrap_or(true)
314    }
315
316    #[cfg(test)]
317    pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
318        self.in_memory.clone()
319    }
320}
321
322fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
323    let mut map = HashMap::with_capacity(kvs.len());
324    for kv in kvs {
325        let _ = map.insert(
326            kv.key
327                .try_into()
328                .context(error::InvalidDatanodeStatFormatSnafu {})?,
329            kv.value
330                .try_into()
331                .context(error::InvalidDatanodeStatFormatSnafu {})?,
332        );
333    }
334    Ok(map)
335}
336
337struct Context<'a> {
338    addr: &'a str,
339}
340
341fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()> {
342    let header = header
343        .as_ref()
344        .context(error::ResponseHeaderNotFoundSnafu)?;
345
346    ensure!(
347        !header.is_not_leader(),
348        error::IsNotLeaderSnafu {
349            node_addr: ctx.addr
350        }
351    );
352
353    Ok(())
354}
355
356fn need_retry(error: &error::Error) -> bool {
357    match error {
358        error::Error::IsNotLeader { .. } => true,
359        error::Error::Range { error, .. } | error::Error::BatchGet { error, .. } => {
360            match_for_io_error(error).is_some()
361        }
362        _ => false,
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use api::v1::meta::{Error, ErrorCode, ResponseHeader};
369    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
370    use common_meta::rpc::KeyValue;
371
372    use super::{Context, check_resp_header, to_stat_kv_map};
373    use crate::error;
374
375    #[test]
376    fn test_to_stat_kv_map() {
377        let stat_key = DatanodeStatKey { node_id: 100 };
378
379        let stat = Stat {
380            id: 100,
381            addr: "127.0.0.1:3001".to_string(),
382            ..Default::default()
383        };
384        let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap();
385
386        let kv = KeyValue {
387            key: stat_key.into(),
388            value: stat_val,
389        };
390
391        let kv_map = to_stat_kv_map(vec![kv]).unwrap();
392        assert_eq!(1, kv_map.len());
393        let _ = kv_map.get(&stat_key).unwrap();
394
395        let stat_val = kv_map.get(&stat_key).unwrap();
396        let stat = stat_val.stats.first().unwrap();
397
398        assert_eq!(100, stat.id);
399        assert_eq!("127.0.0.1:3001", stat.addr);
400    }
401
402    #[test]
403    fn test_check_resp_header() {
404        let header = Some(ResponseHeader {
405            error: None,
406            ..Default::default()
407        });
408        check_resp_header(&header, mock_ctx()).unwrap();
409
410        let result = check_resp_header(&None, mock_ctx());
411        assert!(result.is_err());
412        assert!(matches!(
413            result.err().unwrap(),
414            error::Error::ResponseHeaderNotFound { .. }
415        ));
416
417        let header = Some(ResponseHeader {
418            error: Some(Error {
419                code: ErrorCode::NotLeader as i32,
420                err_msg: "The current meta is not leader".to_string(),
421            }),
422            ..Default::default()
423        });
424        let result = check_resp_header(&header, mock_ctx());
425        assert!(result.is_err());
426        assert!(matches!(
427            result.err().unwrap(),
428            error::Error::IsNotLeader { .. }
429        ));
430    }
431
432    fn mock_ctx<'a>() -> Context<'a> {
433        Context { addr: "addr" }
434    }
435}