1use std::collections::HashMap;
16
17use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
18use common_meta::cluster::{
19 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
20};
21use common_meta::datanode::EnvVars;
22use common_meta::heartbeat::utils::{get_flownode_workloads, get_frontend_workloads};
23use common_meta::peer::Peer;
24use common_meta::rpc::store::PutRequest;
25use common_telemetry::warn;
26use snafu::ResultExt;
27use store_api::region_engine::RegionRole;
28
29use crate::Result;
30use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
31use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
32use crate::metasrv::Context;
33
34pub struct CollectFrontendClusterInfoHandler;
36
37#[async_trait::async_trait]
38impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
39 fn is_acceptable(&self, role: Role) -> bool {
40 role == Role::Frontend
41 }
42
43 async fn handle(
44 &self,
45 req: &HeartbeatRequest,
46 ctx: &mut Context,
47 _acc: &mut HeartbeatAccumulator,
48 ) -> Result<HandleControl> {
49 let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
50 return Ok(HandleControl::Continue);
51 };
52
53 let frontend_workloads = get_frontend_workloads(req.node_workloads.as_ref());
54
55 let value = NodeInfo {
56 peer,
57 last_activity_ts: common_time::util::current_time_millis(),
58 status: NodeStatus::Frontend(FrontendStatus {
59 workloads: frontend_workloads,
60 }),
61 version: info.version,
62 git_commit: info.git_commit,
63 start_time_ms: info.start_time_ms,
64 total_cpu_millicores: info.total_cpu_millicores,
65 total_memory_bytes: info.total_memory_bytes,
66 cpu_usage_millicores: info.cpu_usage_millicores,
67 memory_usage_bytes: info.memory_usage_bytes,
68 hostname: info.hostname,
69 env_vars,
70 };
71
72 put_into_memory_store(ctx, key, value).await?;
73
74 Ok(HandleControl::Continue)
75 }
76}
77
78pub struct CollectFlownodeClusterInfoHandler;
80#[async_trait::async_trait]
81impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
82 fn is_acceptable(&self, role: Role) -> bool {
83 role == Role::Flownode
84 }
85
86 async fn handle(
87 &self,
88 req: &HeartbeatRequest,
89 ctx: &mut Context,
90 _acc: &mut HeartbeatAccumulator,
91 ) -> Result<HandleControl> {
92 let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
93 return Ok(HandleControl::Continue);
94 };
95 let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
96
97 let value = NodeInfo {
98 peer,
99 last_activity_ts: common_time::util::current_time_millis(),
100 status: NodeStatus::Flownode(FlownodeStatus {
101 workloads: flownode_workloads,
102 }),
103 version: info.version,
104 git_commit: info.git_commit,
105 start_time_ms: info.start_time_ms,
106 total_cpu_millicores: info.total_cpu_millicores,
107 total_memory_bytes: info.total_memory_bytes,
108 cpu_usage_millicores: info.cpu_usage_millicores,
109 memory_usage_bytes: info.memory_usage_bytes,
110 hostname: info.hostname,
111 env_vars,
112 };
113
114 put_into_memory_store(ctx, key, value).await?;
115
116 Ok(HandleControl::Continue)
117 }
118}
119
120pub struct CollectDatanodeClusterInfoHandler;
122
123#[async_trait::async_trait]
124impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
125 fn is_acceptable(&self, role: Role) -> bool {
126 role == Role::Datanode
127 }
128
129 async fn handle(
130 &self,
131 req: &HeartbeatRequest,
132 ctx: &mut Context,
133 acc: &mut HeartbeatAccumulator,
134 ) -> Result<HandleControl> {
135 let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
136 return Ok(HandleControl::Continue);
137 };
138
139 let Some(stat) = &acc.stat else {
140 return Ok(HandleControl::Continue);
141 };
142
143 let leader_regions = stat
144 .region_stats
145 .iter()
146 .filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader))
147 .count();
148 let follower_regions = stat.region_stats.len() - leader_regions;
149
150 let value = NodeInfo {
151 peer,
152 last_activity_ts: stat.timestamp_millis,
153 status: NodeStatus::Datanode(DatanodeStatus {
154 rcus: stat.rcus,
155 wcus: stat.wcus,
156 leader_regions,
157 follower_regions,
158 workloads: stat.datanode_workloads.clone(),
159 }),
160 version: info.version,
161 git_commit: info.git_commit,
162 start_time_ms: info.start_time_ms,
163 total_cpu_millicores: info.total_cpu_millicores,
164 total_memory_bytes: info.total_memory_bytes,
165 cpu_usage_millicores: info.cpu_usage_millicores,
166 memory_usage_bytes: info.memory_usage_bytes,
167 hostname: info.hostname,
168 env_vars,
169 };
170
171 put_into_memory_store(ctx, key, value).await?;
172
173 Ok(HandleControl::Continue)
174 }
175}
176
177fn extract_base_info(
178 request: &HeartbeatRequest,
179) -> Option<(NodeInfoKey, Peer, PbNodeInfo, HashMap<String, String>)> {
180 let HeartbeatRequest { peer, info, .. } = request;
181 let key = NodeInfoKey::new(request)?;
182 let Some(peer) = &peer else {
183 return None;
184 };
185 let Some(info) = &info else {
186 return None;
187 };
188
189 let env_vars = EnvVars::from_extensions(&request.extensions)
190 .inspect_err(|e| {
191 warn!(e;
192 "Failed to deserialize __env_vars from heartbeat extensions, peer: {}", peer
193 );
194 })
195 .unwrap_or_default()
196 .map(|e| e.vars)
197 .unwrap_or_default();
198
199 Some((key, peer.clone(), info.clone(), env_vars))
200}
201
202async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
203 let key = (&key).into();
204 let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
205 let put_req = PutRequest {
206 key,
207 value,
208 ..Default::default()
209 };
210
211 ctx.in_memory
212 .put(put_req)
213 .await
214 .context(SaveClusterInfoSnafu)?;
215
216 Ok(())
217}