Skip to main content

meta_srv/handler/
collect_cluster_info_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17    DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::heartbeat::utils::get_flownode_workloads;
20use common_meta::peer::Peer;
21use common_meta::rpc::store::PutRequest;
22use snafu::ResultExt;
23use store_api::region_engine::RegionRole;
24
25use crate::Result;
26use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29
30/// The handler to collect cluster info from the heartbeat request of frontend.
31pub struct CollectFrontendClusterInfoHandler;
32
33#[async_trait::async_trait]
34impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
35    fn is_acceptable(&self, role: Role) -> bool {
36        role == Role::Frontend
37    }
38
39    async fn handle(
40        &self,
41        req: &HeartbeatRequest,
42        ctx: &mut Context,
43        _acc: &mut HeartbeatAccumulator,
44    ) -> Result<HandleControl> {
45        let Some((key, peer, info)) = extract_base_info(req) else {
46            return Ok(HandleControl::Continue);
47        };
48
49        let value = NodeInfo {
50            peer,
51            last_activity_ts: common_time::util::current_time_millis(),
52            status: NodeStatus::Frontend(FrontendStatus {}),
53            version: info.version,
54            git_commit: info.git_commit,
55            start_time_ms: info.start_time_ms,
56            total_cpu_millicores: info.total_cpu_millicores,
57            total_memory_bytes: info.total_memory_bytes,
58            cpu_usage_millicores: info.cpu_usage_millicores,
59            memory_usage_bytes: info.memory_usage_bytes,
60            hostname: info.hostname,
61        };
62
63        put_into_memory_store(ctx, key, value).await?;
64
65        Ok(HandleControl::Continue)
66    }
67}
68
69/// The handler to collect cluster info from the heartbeat request of flownode.
70pub struct CollectFlownodeClusterInfoHandler;
71#[async_trait::async_trait]
72impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
73    fn is_acceptable(&self, role: Role) -> bool {
74        role == Role::Flownode
75    }
76
77    async fn handle(
78        &self,
79        req: &HeartbeatRequest,
80        ctx: &mut Context,
81        _acc: &mut HeartbeatAccumulator,
82    ) -> Result<HandleControl> {
83        let Some((key, peer, info)) = extract_base_info(req) else {
84            return Ok(HandleControl::Continue);
85        };
86        let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
87
88        let value = NodeInfo {
89            peer,
90            last_activity_ts: common_time::util::current_time_millis(),
91            status: NodeStatus::Flownode(FlownodeStatus {
92                workloads: flownode_workloads,
93            }),
94            version: info.version,
95            git_commit: info.git_commit,
96            start_time_ms: info.start_time_ms,
97            total_cpu_millicores: info.total_cpu_millicores,
98            total_memory_bytes: info.total_memory_bytes,
99            cpu_usage_millicores: info.cpu_usage_millicores,
100            memory_usage_bytes: info.memory_usage_bytes,
101            hostname: info.hostname,
102        };
103
104        put_into_memory_store(ctx, key, value).await?;
105
106        Ok(HandleControl::Continue)
107    }
108}
109
110/// The handler to collect cluster info from the heartbeat request of datanode.
111pub struct CollectDatanodeClusterInfoHandler;
112
113#[async_trait::async_trait]
114impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
115    fn is_acceptable(&self, role: Role) -> bool {
116        role == Role::Datanode
117    }
118
119    async fn handle(
120        &self,
121        req: &HeartbeatRequest,
122        ctx: &mut Context,
123        acc: &mut HeartbeatAccumulator,
124    ) -> Result<HandleControl> {
125        let Some((key, peer, info)) = extract_base_info(req) else {
126            return Ok(HandleControl::Continue);
127        };
128
129        let Some(stat) = &acc.stat else {
130            return Ok(HandleControl::Continue);
131        };
132
133        let leader_regions = stat
134            .region_stats
135            .iter()
136            .filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader))
137            .count();
138        let follower_regions = stat.region_stats.len() - leader_regions;
139
140        let value = NodeInfo {
141            peer,
142            last_activity_ts: stat.timestamp_millis,
143            status: NodeStatus::Datanode(DatanodeStatus {
144                rcus: stat.rcus,
145                wcus: stat.wcus,
146                leader_regions,
147                follower_regions,
148                workloads: stat.datanode_workloads.clone(),
149            }),
150            version: info.version,
151            git_commit: info.git_commit,
152            start_time_ms: info.start_time_ms,
153            total_cpu_millicores: info.total_cpu_millicores,
154            total_memory_bytes: info.total_memory_bytes,
155            cpu_usage_millicores: info.cpu_usage_millicores,
156            memory_usage_bytes: info.memory_usage_bytes,
157            hostname: info.hostname,
158        };
159
160        put_into_memory_store(ctx, key, value).await?;
161
162        Ok(HandleControl::Continue)
163    }
164}
165
166fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
167    let HeartbeatRequest { peer, info, .. } = request;
168    let key = NodeInfoKey::new(request)?;
169    let Some(peer) = &peer else {
170        return None;
171    };
172    let Some(info) = &info else {
173        return None;
174    };
175
176    Some((key, peer.clone(), info.clone()))
177}
178
179async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
180    let key = (&key).into();
181    let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
182    let put_req = PutRequest {
183        key,
184        value,
185        ..Default::default()
186    };
187
188    ctx.in_memory
189        .put(put_req)
190        .await
191        .context(SaveClusterInfoSnafu)?;
192
193    Ok(())
194}