common_meta/key/flow/
flownode_addr_helper.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use crate::error::Result;
use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
use crate::key::{MetadataKey, MetadataValue};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;

/// Get the addresses of the flownodes.
/// The result is a map: node_id -> NodeAddressValue
pub(crate) async fn get_flownode_addresses(
    kv_backend: &KvBackendRef,
    keys: Vec<NodeAddressKey>,
) -> Result<HashMap<u64, NodeAddressValue>> {
    if keys.is_empty() {
        return Ok(HashMap::default());
    }

    let req = BatchGetRequest {
        keys: keys.into_iter().map(|k| k.to_bytes()).collect(),
    };
    kv_backend
        .batch_get(req)
        .await?
        .kvs
        .into_iter()
        .map(|kv| {
            let key = NodeAddressKey::from_bytes(&kv.key)?;
            let value = NodeAddressValue::try_from_raw_value(&kv.value)?;
            Ok((key.node_id, value))
        })
        .collect()
}