meta_srv/handler/
collect_cluster_info_handler.rs1use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::Result;
25use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28
29pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34 fn is_acceptable(&self, role: Role) -> bool {
35 role == Role::Frontend
36 }
37
38 async fn handle(
39 &self,
40 req: &HeartbeatRequest,
41 ctx: &mut Context,
42 _acc: &mut HeartbeatAccumulator,
43 ) -> Result<HandleControl> {
44 let Some((key, peer, info)) = extract_base_info(req) else {
45 return Ok(HandleControl::Continue);
46 };
47
48 let value = NodeInfo {
49 peer,
50 last_activity_ts: common_time::util::current_time_millis(),
51 status: NodeStatus::Frontend(FrontendStatus {}),
52 version: info.version,
53 git_commit: info.git_commit,
54 start_time_ms: info.start_time_ms,
55 cpus: info.cpus,
56 memory_bytes: info.memory_bytes,
57 };
58
59 put_into_memory_store(ctx, key, value).await?;
60
61 Ok(HandleControl::Continue)
62 }
63}
64
65pub struct CollectFlownodeClusterInfoHandler;
67#[async_trait::async_trait]
68impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
69 fn is_acceptable(&self, role: Role) -> bool {
70 role == Role::Flownode
71 }
72
73 async fn handle(
74 &self,
75 req: &HeartbeatRequest,
76 ctx: &mut Context,
77 _acc: &mut HeartbeatAccumulator,
78 ) -> Result<HandleControl> {
79 let Some((key, peer, info)) = extract_base_info(req) else {
80 return Ok(HandleControl::Continue);
81 };
82
83 let value = NodeInfo {
84 peer,
85 last_activity_ts: common_time::util::current_time_millis(),
86 status: NodeStatus::Flownode(FlownodeStatus {}),
87 version: info.version,
88 git_commit: info.git_commit,
89 start_time_ms: info.start_time_ms,
90 cpus: info.cpus,
91 memory_bytes: info.memory_bytes,
92 };
93
94 put_into_memory_store(ctx, key, value).await?;
95
96 Ok(HandleControl::Continue)
97 }
98}
99
100pub struct CollectDatanodeClusterInfoHandler;
102
103#[async_trait::async_trait]
104impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
105 fn is_acceptable(&self, role: Role) -> bool {
106 role == Role::Datanode
107 }
108
109 async fn handle(
110 &self,
111 req: &HeartbeatRequest,
112 ctx: &mut Context,
113 acc: &mut HeartbeatAccumulator,
114 ) -> Result<HandleControl> {
115 let Some((key, peer, info)) = extract_base_info(req) else {
116 return Ok(HandleControl::Continue);
117 };
118
119 let Some(stat) = &acc.stat else {
120 return Ok(HandleControl::Continue);
121 };
122
123 let leader_regions = stat
124 .region_stats
125 .iter()
126 .filter(|s| s.role == RegionRole::Leader)
127 .count();
128 let follower_regions = stat.region_stats.len() - leader_regions;
129
130 let value = NodeInfo {
131 peer,
132 last_activity_ts: stat.timestamp_millis,
133 status: NodeStatus::Datanode(DatanodeStatus {
134 rcus: stat.rcus,
135 wcus: stat.wcus,
136 leader_regions,
137 follower_regions,
138 workloads: stat.datanode_workloads.clone(),
139 }),
140 version: info.version,
141 git_commit: info.git_commit,
142 start_time_ms: info.start_time_ms,
143 cpus: info.cpus,
144 memory_bytes: info.memory_bytes,
145 };
146
147 put_into_memory_store(ctx, key, value).await?;
148
149 Ok(HandleControl::Continue)
150 }
151}
152
153fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
154 let HeartbeatRequest { peer, info, .. } = request;
155 let key = NodeInfoKey::new(request)?;
156 let Some(peer) = &peer else {
157 return None;
158 };
159 let Some(info) = &info else {
160 return None;
161 };
162
163 Some((key, peer.clone(), info.clone()))
164}
165
166async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
167 let key = (&key).into();
168 let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
169 let put_req = PutRequest {
170 key,
171 value,
172 ..Default::default()
173 };
174
175 ctx.in_memory
176 .put(put_req)
177 .await
178 .context(SaveClusterInfoSnafu)?;
179
180 Ok(())
181}