Skip to main content

meta_srv/handler/
collect_cluster_info_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
18use common_meta::cluster::{
19    DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
20};
21use common_meta::datanode::EnvVars;
22use common_meta::heartbeat::utils::{get_flownode_workloads, get_frontend_workloads};
23use common_meta::peer::Peer;
24use common_meta::rpc::store::PutRequest;
25use common_telemetry::warn;
26use snafu::ResultExt;
27use store_api::region_engine::RegionRole;
28
29use crate::Result;
30use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
31use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
32use crate::metasrv::Context;
33
34/// The handler to collect cluster info from the heartbeat request of frontend.
35pub struct CollectFrontendClusterInfoHandler;
36
37#[async_trait::async_trait]
38impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
39    fn is_acceptable(&self, role: Role) -> bool {
40        role == Role::Frontend
41    }
42
43    async fn handle(
44        &self,
45        req: &HeartbeatRequest,
46        ctx: &mut Context,
47        _acc: &mut HeartbeatAccumulator,
48    ) -> Result<HandleControl> {
49        let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
50            return Ok(HandleControl::Continue);
51        };
52
53        let frontend_workloads = get_frontend_workloads(req.node_workloads.as_ref());
54
55        let value = NodeInfo {
56            peer,
57            last_activity_ts: common_time::util::current_time_millis(),
58            status: NodeStatus::Frontend(FrontendStatus {
59                workloads: frontend_workloads,
60            }),
61            version: info.version,
62            git_commit: info.git_commit,
63            start_time_ms: info.start_time_ms,
64            total_cpu_millicores: info.total_cpu_millicores,
65            total_memory_bytes: info.total_memory_bytes,
66            cpu_usage_millicores: info.cpu_usage_millicores,
67            memory_usage_bytes: info.memory_usage_bytes,
68            hostname: info.hostname,
69            env_vars,
70        };
71
72        put_into_memory_store(ctx, key, value).await?;
73
74        Ok(HandleControl::Continue)
75    }
76}
77
78/// The handler to collect cluster info from the heartbeat request of flownode.
79pub struct CollectFlownodeClusterInfoHandler;
80#[async_trait::async_trait]
81impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
82    fn is_acceptable(&self, role: Role) -> bool {
83        role == Role::Flownode
84    }
85
86    async fn handle(
87        &self,
88        req: &HeartbeatRequest,
89        ctx: &mut Context,
90        _acc: &mut HeartbeatAccumulator,
91    ) -> Result<HandleControl> {
92        let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
93            return Ok(HandleControl::Continue);
94        };
95        let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
96
97        let value = NodeInfo {
98            peer,
99            last_activity_ts: common_time::util::current_time_millis(),
100            status: NodeStatus::Flownode(FlownodeStatus {
101                workloads: flownode_workloads,
102            }),
103            version: info.version,
104            git_commit: info.git_commit,
105            start_time_ms: info.start_time_ms,
106            total_cpu_millicores: info.total_cpu_millicores,
107            total_memory_bytes: info.total_memory_bytes,
108            cpu_usage_millicores: info.cpu_usage_millicores,
109            memory_usage_bytes: info.memory_usage_bytes,
110            hostname: info.hostname,
111            env_vars,
112        };
113
114        put_into_memory_store(ctx, key, value).await?;
115
116        Ok(HandleControl::Continue)
117    }
118}
119
120/// The handler to collect cluster info from the heartbeat request of datanode.
121pub struct CollectDatanodeClusterInfoHandler;
122
123#[async_trait::async_trait]
124impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
125    fn is_acceptable(&self, role: Role) -> bool {
126        role == Role::Datanode
127    }
128
129    async fn handle(
130        &self,
131        req: &HeartbeatRequest,
132        ctx: &mut Context,
133        acc: &mut HeartbeatAccumulator,
134    ) -> Result<HandleControl> {
135        let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
136            return Ok(HandleControl::Continue);
137        };
138
139        let Some(stat) = &acc.stat else {
140            return Ok(HandleControl::Continue);
141        };
142
143        let leader_regions = stat
144            .region_stats
145            .iter()
146            .filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader))
147            .count();
148        let follower_regions = stat.region_stats.len() - leader_regions;
149
150        let value = NodeInfo {
151            peer,
152            last_activity_ts: stat.timestamp_millis,
153            status: NodeStatus::Datanode(DatanodeStatus {
154                rcus: stat.rcus,
155                wcus: stat.wcus,
156                leader_regions,
157                follower_regions,
158                workloads: stat.datanode_workloads.clone(),
159            }),
160            version: info.version,
161            git_commit: info.git_commit,
162            start_time_ms: info.start_time_ms,
163            total_cpu_millicores: info.total_cpu_millicores,
164            total_memory_bytes: info.total_memory_bytes,
165            cpu_usage_millicores: info.cpu_usage_millicores,
166            memory_usage_bytes: info.memory_usage_bytes,
167            hostname: info.hostname,
168            env_vars,
169        };
170
171        put_into_memory_store(ctx, key, value).await?;
172
173        Ok(HandleControl::Continue)
174    }
175}
176
177fn extract_base_info(
178    request: &HeartbeatRequest,
179) -> Option<(NodeInfoKey, Peer, PbNodeInfo, HashMap<String, String>)> {
180    let HeartbeatRequest { peer, info, .. } = request;
181    let key = NodeInfoKey::new(request)?;
182    let Some(peer) = &peer else {
183        return None;
184    };
185    let Some(info) = &info else {
186        return None;
187    };
188
189    let env_vars = EnvVars::from_extensions(&request.extensions)
190        .inspect_err(|e| {
191            warn!(e;
192                "Failed to deserialize __env_vars from heartbeat extensions, peer: {}", peer
193            );
194        })
195        .unwrap_or_default()
196        .map(|e| e.vars)
197        .unwrap_or_default();
198
199    Some((key, peer.clone(), info.clone(), env_vars))
200}
201
202async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
203    let key = (&key).into();
204    let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
205    let put_req = PutRequest {
206        key,
207        value,
208        ..Default::default()
209    };
210
211    ctx.in_memory
212        .put(put_req)
213        .await
214        .context(SaveClusterInfoSnafu)?;
215
216    Ok(())
217}