1use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::Result;
25use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28
29pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34 fn is_acceptable(&self, role: Role) -> bool {
35 role == Role::Frontend
36 }
37
38 async fn handle(
39 &self,
40 req: &HeartbeatRequest,
41 ctx: &mut Context,
42 _acc: &mut HeartbeatAccumulator,
43 ) -> Result<HandleControl> {
44 let Some((key, peer, info)) = extract_base_info(req) else {
45 return Ok(HandleControl::Continue);
46 };
47
48 let value = NodeInfo {
49 peer,
50 last_activity_ts: common_time::util::current_time_millis(),
51 status: NodeStatus::Frontend(FrontendStatus {}),
52 version: info.version,
53 git_commit: info.git_commit,
54 start_time_ms: info.start_time_ms,
55 total_cpu_millicores: info.total_cpu_millicores,
56 total_memory_bytes: info.total_memory_bytes,
57 cpu_usage_millicores: info.cpu_usage_millicores,
58 memory_usage_bytes: info.memory_usage_bytes,
59 hostname: info.hostname,
60 };
61
62 put_into_memory_store(ctx, key, value).await?;
63
64 Ok(HandleControl::Continue)
65 }
66}
67
68pub struct CollectFlownodeClusterInfoHandler;
70#[async_trait::async_trait]
71impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
72 fn is_acceptable(&self, role: Role) -> bool {
73 role == Role::Flownode
74 }
75
76 async fn handle(
77 &self,
78 req: &HeartbeatRequest,
79 ctx: &mut Context,
80 _acc: &mut HeartbeatAccumulator,
81 ) -> Result<HandleControl> {
82 let Some((key, peer, info)) = extract_base_info(req) else {
83 return Ok(HandleControl::Continue);
84 };
85
86 let value = NodeInfo {
87 peer,
88 last_activity_ts: common_time::util::current_time_millis(),
89 status: NodeStatus::Flownode(FlownodeStatus {}),
90 version: info.version,
91 git_commit: info.git_commit,
92 start_time_ms: info.start_time_ms,
93 total_cpu_millicores: info.total_cpu_millicores,
94 total_memory_bytes: info.total_memory_bytes,
95 cpu_usage_millicores: info.cpu_usage_millicores,
96 memory_usage_bytes: info.memory_usage_bytes,
97 hostname: info.hostname,
98 };
99
100 put_into_memory_store(ctx, key, value).await?;
101
102 Ok(HandleControl::Continue)
103 }
104}
105
106pub struct CollectDatanodeClusterInfoHandler;
108
109#[async_trait::async_trait]
110impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
111 fn is_acceptable(&self, role: Role) -> bool {
112 role == Role::Datanode
113 }
114
115 async fn handle(
116 &self,
117 req: &HeartbeatRequest,
118 ctx: &mut Context,
119 acc: &mut HeartbeatAccumulator,
120 ) -> Result<HandleControl> {
121 let Some((key, peer, info)) = extract_base_info(req) else {
122 return Ok(HandleControl::Continue);
123 };
124
125 let Some(stat) = &acc.stat else {
126 return Ok(HandleControl::Continue);
127 };
128
129 let leader_regions = stat
130 .region_stats
131 .iter()
132 .filter(|s| s.role == RegionRole::Leader)
133 .count();
134 let follower_regions = stat.region_stats.len() - leader_regions;
135
136 let value = NodeInfo {
137 peer,
138 last_activity_ts: stat.timestamp_millis,
139 status: NodeStatus::Datanode(DatanodeStatus {
140 rcus: stat.rcus,
141 wcus: stat.wcus,
142 leader_regions,
143 follower_regions,
144 workloads: stat.datanode_workloads.clone(),
145 }),
146 version: info.version,
147 git_commit: info.git_commit,
148 start_time_ms: info.start_time_ms,
149 total_cpu_millicores: info.total_cpu_millicores,
150 total_memory_bytes: info.total_memory_bytes,
151 cpu_usage_millicores: info.cpu_usage_millicores,
152 memory_usage_bytes: info.memory_usage_bytes,
153 hostname: info.hostname,
154 };
155
156 put_into_memory_store(ctx, key, value).await?;
157
158 Ok(HandleControl::Continue)
159 }
160}
161
162fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
163 let HeartbeatRequest { peer, info, .. } = request;
164 let key = NodeInfoKey::new(request)?;
165 let Some(peer) = &peer else {
166 return None;
167 };
168 let Some(info) = &info else {
169 return None;
170 };
171
172 Some((key, peer.clone(), info.clone()))
173}
174
175async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
176 let key = (&key).into();
177 let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
178 let put_req = PutRequest {
179 key,
180 value,
181 ..Default::default()
182 };
183
184 ctx.in_memory
185 .put(put_req)
186 .await
187 .context(SaveClusterInfoSnafu)?;
188
189 Ok(())
190}