common_meta/key/flow/
flownode_addr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use crate::error::Result;
18use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
19use crate::key::{MetadataKey, MetadataValue};
20use crate::kv_backend::KvBackendRef;
21use crate::rpc::store::BatchGetRequest;
22
23/// Get the addresses of the flownodes.
24/// The result is a map: node_id -> NodeAddressValue
25pub(crate) async fn get_flownode_addresses(
26    kv_backend: &KvBackendRef,
27    keys: Vec<NodeAddressKey>,
28) -> Result<HashMap<u64, NodeAddressValue>> {
29    if keys.is_empty() {
30        return Ok(HashMap::default());
31    }
32
33    let req = BatchGetRequest {
34        keys: keys.into_iter().map(|k| k.to_bytes()).collect(),
35    };
36    kv_backend
37        .batch_get(req)
38        .await?
39        .kvs
40        .into_iter()
41        .map(|kv| {
42            let key = NodeAddressKey::from_bytes(&kv.key)?;
43            let value = NodeAddressValue::try_from_raw_value(&kv.value)?;
44            Ok((key.node_id, value))
45        })
46        .collect()
47}