meta_srv/handler/
collect_cluster_info_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17    DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::Result;
25use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28
29/// The handler to collect cluster info from the heartbeat request of frontend.
30pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34    fn is_acceptable(&self, role: Role) -> bool {
35        role == Role::Frontend
36    }
37
38    async fn handle(
39        &self,
40        req: &HeartbeatRequest,
41        ctx: &mut Context,
42        _acc: &mut HeartbeatAccumulator,
43    ) -> Result<HandleControl> {
44        let Some((key, peer, info)) = extract_base_info(req) else {
45            return Ok(HandleControl::Continue);
46        };
47
48        let value = NodeInfo {
49            peer,
50            last_activity_ts: common_time::util::current_time_millis(),
51            status: NodeStatus::Frontend(FrontendStatus {}),
52            version: info.version,
53            git_commit: info.git_commit,
54            start_time_ms: info.start_time_ms,
55            total_cpu_millicores: info.total_cpu_millicores,
56            total_memory_bytes: info.total_memory_bytes,
57            cpu_usage_millicores: info.cpu_usage_millicores,
58            memory_usage_bytes: info.memory_usage_bytes,
59            hostname: info.hostname,
60        };
61
62        put_into_memory_store(ctx, key, value).await?;
63
64        Ok(HandleControl::Continue)
65    }
66}
67
68/// The handler to collect cluster info from the heartbeat request of flownode.
69pub struct CollectFlownodeClusterInfoHandler;
70#[async_trait::async_trait]
71impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
72    fn is_acceptable(&self, role: Role) -> bool {
73        role == Role::Flownode
74    }
75
76    async fn handle(
77        &self,
78        req: &HeartbeatRequest,
79        ctx: &mut Context,
80        _acc: &mut HeartbeatAccumulator,
81    ) -> Result<HandleControl> {
82        let Some((key, peer, info)) = extract_base_info(req) else {
83            return Ok(HandleControl::Continue);
84        };
85
86        let value = NodeInfo {
87            peer,
88            last_activity_ts: common_time::util::current_time_millis(),
89            status: NodeStatus::Flownode(FlownodeStatus {}),
90            version: info.version,
91            git_commit: info.git_commit,
92            start_time_ms: info.start_time_ms,
93            total_cpu_millicores: info.total_cpu_millicores,
94            total_memory_bytes: info.total_memory_bytes,
95            cpu_usage_millicores: info.cpu_usage_millicores,
96            memory_usage_bytes: info.memory_usage_bytes,
97            hostname: info.hostname,
98        };
99
100        put_into_memory_store(ctx, key, value).await?;
101
102        Ok(HandleControl::Continue)
103    }
104}
105
106/// The handler to collect cluster info from the heartbeat request of datanode.
107pub struct CollectDatanodeClusterInfoHandler;
108
109#[async_trait::async_trait]
110impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
111    fn is_acceptable(&self, role: Role) -> bool {
112        role == Role::Datanode
113    }
114
115    async fn handle(
116        &self,
117        req: &HeartbeatRequest,
118        ctx: &mut Context,
119        acc: &mut HeartbeatAccumulator,
120    ) -> Result<HandleControl> {
121        let Some((key, peer, info)) = extract_base_info(req) else {
122            return Ok(HandleControl::Continue);
123        };
124
125        let Some(stat) = &acc.stat else {
126            return Ok(HandleControl::Continue);
127        };
128
129        let leader_regions = stat
130            .region_stats
131            .iter()
132            .filter(|s| s.role == RegionRole::Leader)
133            .count();
134        let follower_regions = stat.region_stats.len() - leader_regions;
135
136        let value = NodeInfo {
137            peer,
138            last_activity_ts: stat.timestamp_millis,
139            status: NodeStatus::Datanode(DatanodeStatus {
140                rcus: stat.rcus,
141                wcus: stat.wcus,
142                leader_regions,
143                follower_regions,
144                workloads: stat.datanode_workloads.clone(),
145            }),
146            version: info.version,
147            git_commit: info.git_commit,
148            start_time_ms: info.start_time_ms,
149            total_cpu_millicores: info.total_cpu_millicores,
150            total_memory_bytes: info.total_memory_bytes,
151            cpu_usage_millicores: info.cpu_usage_millicores,
152            memory_usage_bytes: info.memory_usage_bytes,
153            hostname: info.hostname,
154        };
155
156        put_into_memory_store(ctx, key, value).await?;
157
158        Ok(HandleControl::Continue)
159    }
160}
161
162fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
163    let HeartbeatRequest { peer, info, .. } = request;
164    let key = NodeInfoKey::new(request)?;
165    let Some(peer) = &peer else {
166        return None;
167    };
168    let Some(info) = &info else {
169        return None;
170    };
171
172    Some((key, peer.clone(), info.clone()))
173}
174
175async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
176    let key = (&key).into();
177    let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
178    let put_req = PutRequest {
179        key,
180        value,
181        ..Default::default()
182    };
183
184    ctx.in_memory
185        .put(put_req)
186        .await
187        .context(SaveClusterInfoSnafu)?;
188
189    Ok(())
190}