meta_srv/handler/
collect_cluster_info_handler.rs1use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
25use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
26use crate::metasrv::Context;
27use crate::Result;
28
29pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34 fn is_acceptable(&self, role: Role) -> bool {
35 role == Role::Frontend
36 }
37
38 async fn handle(
39 &self,
40 req: &HeartbeatRequest,
41 ctx: &mut Context,
42 _acc: &mut HeartbeatAccumulator,
43 ) -> Result<HandleControl> {
44 let Some((key, peer, info)) = extract_base_info(req) else {
45 return Ok(HandleControl::Continue);
46 };
47
48 let value = NodeInfo {
49 peer,
50 last_activity_ts: common_time::util::current_time_millis(),
51 status: NodeStatus::Frontend(FrontendStatus {}),
52 version: info.version,
53 git_commit: info.git_commit,
54 start_time_ms: info.start_time_ms,
55 };
56
57 put_into_memory_store(ctx, key, value).await?;
58
59 Ok(HandleControl::Continue)
60 }
61}
62
63pub struct CollectFlownodeClusterInfoHandler;
65#[async_trait::async_trait]
66impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
67 fn is_acceptable(&self, role: Role) -> bool {
68 role == Role::Flownode
69 }
70
71 async fn handle(
72 &self,
73 req: &HeartbeatRequest,
74 ctx: &mut Context,
75 _acc: &mut HeartbeatAccumulator,
76 ) -> Result<HandleControl> {
77 let Some((key, peer, info)) = extract_base_info(req) else {
78 return Ok(HandleControl::Continue);
79 };
80
81 let value = NodeInfo {
82 peer,
83 last_activity_ts: common_time::util::current_time_millis(),
84 status: NodeStatus::Flownode(FlownodeStatus {}),
85 version: info.version,
86 git_commit: info.git_commit,
87 start_time_ms: info.start_time_ms,
88 };
89
90 put_into_memory_store(ctx, key, value).await?;
91
92 Ok(HandleControl::Continue)
93 }
94}
95
96pub struct CollectDatanodeClusterInfoHandler;
98
99#[async_trait::async_trait]
100impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
101 fn is_acceptable(&self, role: Role) -> bool {
102 role == Role::Datanode
103 }
104
105 async fn handle(
106 &self,
107 req: &HeartbeatRequest,
108 ctx: &mut Context,
109 acc: &mut HeartbeatAccumulator,
110 ) -> Result<HandleControl> {
111 let Some((key, peer, info)) = extract_base_info(req) else {
112 return Ok(HandleControl::Continue);
113 };
114
115 let Some(stat) = &acc.stat else {
116 return Ok(HandleControl::Continue);
117 };
118
119 let leader_regions = stat
120 .region_stats
121 .iter()
122 .filter(|s| s.role == RegionRole::Leader)
123 .count();
124 let follower_regions = stat.region_stats.len() - leader_regions;
125
126 let value = NodeInfo {
127 peer,
128 last_activity_ts: stat.timestamp_millis,
129 status: NodeStatus::Datanode(DatanodeStatus {
130 rcus: stat.rcus,
131 wcus: stat.wcus,
132 leader_regions,
133 follower_regions,
134 workloads: stat.datanode_workloads.clone(),
135 }),
136 version: info.version,
137 git_commit: info.git_commit,
138 start_time_ms: info.start_time_ms,
139 };
140
141 put_into_memory_store(ctx, key, value).await?;
142
143 Ok(HandleControl::Continue)
144 }
145}
146
147fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
148 let HeartbeatRequest { peer, info, .. } = request;
149 let key = NodeInfoKey::new(request)?;
150 let Some(peer) = &peer else {
151 return None;
152 };
153 let Some(info) = &info else {
154 return None;
155 };
156
157 Some((key, peer.clone(), info.clone()))
158}
159
160async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
161 let key = (&key).into();
162 let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
163 let put_req = PutRequest {
164 key,
165 value,
166 ..Default::default()
167 };
168
169 ctx.in_memory
170 .put(put_req)
171 .await
172 .context(SaveClusterInfoSnafu)?;
173
174 Ok(())
175}