1use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::heartbeat::utils::get_flownode_workloads;
20use common_meta::peer::Peer;
21use common_meta::rpc::store::PutRequest;
22use snafu::ResultExt;
23use store_api::region_engine::RegionRole;
24
25use crate::Result;
26use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29
30pub struct CollectFrontendClusterInfoHandler;
32
33#[async_trait::async_trait]
34impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
35 fn is_acceptable(&self, role: Role) -> bool {
36 role == Role::Frontend
37 }
38
39 async fn handle(
40 &self,
41 req: &HeartbeatRequest,
42 ctx: &mut Context,
43 _acc: &mut HeartbeatAccumulator,
44 ) -> Result<HandleControl> {
45 let Some((key, peer, info)) = extract_base_info(req) else {
46 return Ok(HandleControl::Continue);
47 };
48
49 let value = NodeInfo {
50 peer,
51 last_activity_ts: common_time::util::current_time_millis(),
52 status: NodeStatus::Frontend(FrontendStatus {}),
53 version: info.version,
54 git_commit: info.git_commit,
55 start_time_ms: info.start_time_ms,
56 total_cpu_millicores: info.total_cpu_millicores,
57 total_memory_bytes: info.total_memory_bytes,
58 cpu_usage_millicores: info.cpu_usage_millicores,
59 memory_usage_bytes: info.memory_usage_bytes,
60 hostname: info.hostname,
61 };
62
63 put_into_memory_store(ctx, key, value).await?;
64
65 Ok(HandleControl::Continue)
66 }
67}
68
69pub struct CollectFlownodeClusterInfoHandler;
71#[async_trait::async_trait]
72impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
73 fn is_acceptable(&self, role: Role) -> bool {
74 role == Role::Flownode
75 }
76
77 async fn handle(
78 &self,
79 req: &HeartbeatRequest,
80 ctx: &mut Context,
81 _acc: &mut HeartbeatAccumulator,
82 ) -> Result<HandleControl> {
83 let Some((key, peer, info)) = extract_base_info(req) else {
84 return Ok(HandleControl::Continue);
85 };
86 let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
87
88 let value = NodeInfo {
89 peer,
90 last_activity_ts: common_time::util::current_time_millis(),
91 status: NodeStatus::Flownode(FlownodeStatus {
92 workloads: flownode_workloads,
93 }),
94 version: info.version,
95 git_commit: info.git_commit,
96 start_time_ms: info.start_time_ms,
97 total_cpu_millicores: info.total_cpu_millicores,
98 total_memory_bytes: info.total_memory_bytes,
99 cpu_usage_millicores: info.cpu_usage_millicores,
100 memory_usage_bytes: info.memory_usage_bytes,
101 hostname: info.hostname,
102 };
103
104 put_into_memory_store(ctx, key, value).await?;
105
106 Ok(HandleControl::Continue)
107 }
108}
109
110pub struct CollectDatanodeClusterInfoHandler;
112
113#[async_trait::async_trait]
114impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
115 fn is_acceptable(&self, role: Role) -> bool {
116 role == Role::Datanode
117 }
118
119 async fn handle(
120 &self,
121 req: &HeartbeatRequest,
122 ctx: &mut Context,
123 acc: &mut HeartbeatAccumulator,
124 ) -> Result<HandleControl> {
125 let Some((key, peer, info)) = extract_base_info(req) else {
126 return Ok(HandleControl::Continue);
127 };
128
129 let Some(stat) = &acc.stat else {
130 return Ok(HandleControl::Continue);
131 };
132
133 let leader_regions = stat
134 .region_stats
135 .iter()
136 .filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader))
137 .count();
138 let follower_regions = stat.region_stats.len() - leader_regions;
139
140 let value = NodeInfo {
141 peer,
142 last_activity_ts: stat.timestamp_millis,
143 status: NodeStatus::Datanode(DatanodeStatus {
144 rcus: stat.rcus,
145 wcus: stat.wcus,
146 leader_regions,
147 follower_regions,
148 workloads: stat.datanode_workloads.clone(),
149 }),
150 version: info.version,
151 git_commit: info.git_commit,
152 start_time_ms: info.start_time_ms,
153 total_cpu_millicores: info.total_cpu_millicores,
154 total_memory_bytes: info.total_memory_bytes,
155 cpu_usage_millicores: info.cpu_usage_millicores,
156 memory_usage_bytes: info.memory_usage_bytes,
157 hostname: info.hostname,
158 };
159
160 put_into_memory_store(ctx, key, value).await?;
161
162 Ok(HandleControl::Continue)
163 }
164}
165
166fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
167 let HeartbeatRequest { peer, info, .. } = request;
168 let key = NodeInfoKey::new(request)?;
169 let Some(peer) = &peer else {
170 return None;
171 };
172 let Some(info) = &info else {
173 return None;
174 };
175
176 Some((key, peer.clone(), info.clone()))
177}
178
179async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
180 let key = (&key).into();
181 let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
182 let put_req = PutRequest {
183 key,
184 value,
185 ..Default::default()
186 };
187
188 ctx.in_memory
189 .put(put_req)
190 .await
191 .context(SaveClusterInfoSnafu)?;
192
193 Ok(())
194}