meta_srv/handler/
remap_flow_peer_handler.rs1use api::v1::meta::{HeartbeatRequest, Peer, Role};
16use common_meta::instruction::CacheIdent;
17use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
18use common_meta::key::{MetadataKey, MetadataValue};
19use common_meta::rpc::store::PutRequest;
20use common_telemetry::{error, info, warn};
21use dashmap::DashMap;
22
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::metasrv::Context;
25use crate::Result;
26
27#[derive(Debug, Default)]
28pub struct RemapFlowPeerHandler {
29 epoch_cache: DashMap<u64, u64>,
31}
32
33#[async_trait::async_trait]
34impl HeartbeatHandler for RemapFlowPeerHandler {
35 fn is_acceptable(&self, role: Role) -> bool {
36 role == Role::Flownode
37 }
38
39 async fn handle(
40 &self,
41 req: &HeartbeatRequest,
42 ctx: &mut Context,
43 _acc: &mut HeartbeatAccumulator,
44 ) -> Result<HandleControl> {
45 let Some(peer) = req.peer.as_ref() else {
46 return Ok(HandleControl::Continue);
47 };
48
49 let current_epoch = req.node_epoch;
50 let flow_node_id = peer.id;
51
52 let refresh = if let Some(mut epoch) = self.epoch_cache.get_mut(&flow_node_id) {
53 if current_epoch > *epoch.value() {
54 *epoch.value_mut() = current_epoch;
55 true
56 } else {
57 false
58 }
59 } else {
60 self.epoch_cache.insert(flow_node_id, current_epoch);
61 true
62 };
63
64 if refresh {
65 rewrite_node_address(ctx, peer).await;
66 }
67
68 Ok(HandleControl::Continue)
69 }
70}
71
72async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
73 let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
74 if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
75 let put = PutRequest {
76 key,
77 value,
78 prev_kv: false,
79 };
80
81 match ctx.leader_cached_kv_backend.put(put).await {
82 Ok(_) => {
83 info!("Successfully updated flow `NodeAddressValue`: {:?}", peer);
84 let cache_idents = vec![CacheIdent::FlowNodeAddressChange(peer.id)];
86 info!(
87 "Invalidate flow node cache for new address with cache idents: {:?}",
88 cache_idents
89 );
90 if let Err(e) = ctx
91 .cache_invalidator
92 .invalidate(&Default::default(), &cache_idents)
93 .await
94 {
95 error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
96 }
97 }
98 Err(e) => {
99 error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);
100 }
101 }
102 } else {
103 warn!("Failed to serialize flow `NodeAddressValue`: {:?}", peer);
104 }
105}