meta_srv/handler/
collect_cluster_info_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17    DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::Result;
25use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28
29/// The handler to collect cluster info from the heartbeat request of frontend.
30pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34    fn is_acceptable(&self, role: Role) -> bool {
35        role == Role::Frontend
36    }
37
38    async fn handle(
39        &self,
40        req: &HeartbeatRequest,
41        ctx: &mut Context,
42        _acc: &mut HeartbeatAccumulator,
43    ) -> Result<HandleControl> {
44        let Some((key, peer, info)) = extract_base_info(req) else {
45            return Ok(HandleControl::Continue);
46        };
47
48        let value = NodeInfo {
49            peer,
50            last_activity_ts: common_time::util::current_time_millis(),
51            status: NodeStatus::Frontend(FrontendStatus {}),
52            version: info.version,
53            git_commit: info.git_commit,
54            start_time_ms: info.start_time_ms,
55            cpus: info.cpus,
56            memory_bytes: info.memory_bytes,
57        };
58
59        put_into_memory_store(ctx, key, value).await?;
60
61        Ok(HandleControl::Continue)
62    }
63}
64
65/// The handler to collect cluster info from the heartbeat request of flownode.
66pub struct CollectFlownodeClusterInfoHandler;
67#[async_trait::async_trait]
68impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
69    fn is_acceptable(&self, role: Role) -> bool {
70        role == Role::Flownode
71    }
72
73    async fn handle(
74        &self,
75        req: &HeartbeatRequest,
76        ctx: &mut Context,
77        _acc: &mut HeartbeatAccumulator,
78    ) -> Result<HandleControl> {
79        let Some((key, peer, info)) = extract_base_info(req) else {
80            return Ok(HandleControl::Continue);
81        };
82
83        let value = NodeInfo {
84            peer,
85            last_activity_ts: common_time::util::current_time_millis(),
86            status: NodeStatus::Flownode(FlownodeStatus {}),
87            version: info.version,
88            git_commit: info.git_commit,
89            start_time_ms: info.start_time_ms,
90            cpus: info.cpus,
91            memory_bytes: info.memory_bytes,
92        };
93
94        put_into_memory_store(ctx, key, value).await?;
95
96        Ok(HandleControl::Continue)
97    }
98}
99
100/// The handler to collect cluster info from the heartbeat request of datanode.
101pub struct CollectDatanodeClusterInfoHandler;
102
103#[async_trait::async_trait]
104impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
105    fn is_acceptable(&self, role: Role) -> bool {
106        role == Role::Datanode
107    }
108
109    async fn handle(
110        &self,
111        req: &HeartbeatRequest,
112        ctx: &mut Context,
113        acc: &mut HeartbeatAccumulator,
114    ) -> Result<HandleControl> {
115        let Some((key, peer, info)) = extract_base_info(req) else {
116            return Ok(HandleControl::Continue);
117        };
118
119        let Some(stat) = &acc.stat else {
120            return Ok(HandleControl::Continue);
121        };
122
123        let leader_regions = stat
124            .region_stats
125            .iter()
126            .filter(|s| s.role == RegionRole::Leader)
127            .count();
128        let follower_regions = stat.region_stats.len() - leader_regions;
129
130        let value = NodeInfo {
131            peer,
132            last_activity_ts: stat.timestamp_millis,
133            status: NodeStatus::Datanode(DatanodeStatus {
134                rcus: stat.rcus,
135                wcus: stat.wcus,
136                leader_regions,
137                follower_regions,
138                workloads: stat.datanode_workloads.clone(),
139            }),
140            version: info.version,
141            git_commit: info.git_commit,
142            start_time_ms: info.start_time_ms,
143            cpus: info.cpus,
144            memory_bytes: info.memory_bytes,
145        };
146
147        put_into_memory_store(ctx, key, value).await?;
148
149        Ok(HandleControl::Continue)
150    }
151}
152
153fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
154    let HeartbeatRequest { peer, info, .. } = request;
155    let key = NodeInfoKey::new(request)?;
156    let Some(peer) = &peer else {
157        return None;
158    };
159    let Some(info) = &info else {
160        return None;
161    };
162
163    Some((key, peer.clone(), info.clone()))
164}
165
166async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
167    let key = (&key).into();
168    let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
169    let put_req = PutRequest {
170        key,
171        value,
172        ..Default::default()
173    };
174
175    ctx.in_memory
176        .put(put_req)
177        .await
178        .context(SaveClusterInfoSnafu)?;
179
180    Ok(())
181}