use std::collections::HashMap;
use std::hash::Hash;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackend;
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, DatanodeId, FlownodeId};
use common_time::util as time_util;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::error::{Error, Result};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
move |v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64)
< lease_secs.checked_mul(1000).unwrap_or(u64::MAX)
}
}
pub async fn lookup_datanode_peer(
datanode_id: DatanodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
let is_alive = lease_filter(&lease_value);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
pub async fn alive_datanodes(
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<DatanodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(DatanodeLeaseKey::prefix_key(), meta_peer_client, |v| {
predicate(v)
})
.await
}
pub async fn lookup_flownode_peer(
flownode_id: FlownodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = FlownodeLeaseKey {
node_id: flownode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
let is_alive = lease_filter(&lease_value);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
pub async fn alive_flownodes(
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<FlownodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(
FlownodeLeaseKey::prefix_key_by_cluster(),
meta_peer_client,
|v| predicate(v),
)
.await
}
pub async fn filter<P, K>(
key: Vec<u8>,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<K, LeaseValue>>
where
P: Fn(&LeaseValue) -> bool,
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
{
let range_end = util::get_prefix_end_key(&key);
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
keys_only: false,
..Default::default()
};
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: K = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
if !predicate(&lease_value) {
continue;
}
let _ = lease_kvs.insert(lease_key, lease_value);
}
Ok(lease_kvs)
}
#[derive(Clone)]
pub struct MetaPeerLookupService {
pub meta_peer_client: MetaPeerClientRef,
}
impl MetaPeerLookupService {
pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
Self { meta_peer_client }
}
}
#[async_trait::async_trait]
impl PeerLookupService for MetaPeerLookupService {
async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}