meta_srv/handler/
collect_cluster_info_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
16use common_meta::cluster::{
17    DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
18};
19use common_meta::peer::Peer;
20use common_meta::rpc::store::PutRequest;
21use snafu::ResultExt;
22use store_api::region_engine::RegionRole;
23
24use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
25use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
26use crate::metasrv::Context;
27use crate::Result;
28
29/// The handler to collect cluster info from the heartbeat request of frontend.
30pub struct CollectFrontendClusterInfoHandler;
31
32#[async_trait::async_trait]
33impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
34    fn is_acceptable(&self, role: Role) -> bool {
35        role == Role::Frontend
36    }
37
38    async fn handle(
39        &self,
40        req: &HeartbeatRequest,
41        ctx: &mut Context,
42        _acc: &mut HeartbeatAccumulator,
43    ) -> Result<HandleControl> {
44        let Some((key, peer, info)) = extract_base_info(req) else {
45            return Ok(HandleControl::Continue);
46        };
47
48        let value = NodeInfo {
49            peer,
50            last_activity_ts: common_time::util::current_time_millis(),
51            status: NodeStatus::Frontend(FrontendStatus {}),
52            version: info.version,
53            git_commit: info.git_commit,
54            start_time_ms: info.start_time_ms,
55        };
56
57        put_into_memory_store(ctx, key, value).await?;
58
59        Ok(HandleControl::Continue)
60    }
61}
62
63/// The handler to collect cluster info from the heartbeat request of flownode.
64pub struct CollectFlownodeClusterInfoHandler;
65#[async_trait::async_trait]
66impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
67    fn is_acceptable(&self, role: Role) -> bool {
68        role == Role::Flownode
69    }
70
71    async fn handle(
72        &self,
73        req: &HeartbeatRequest,
74        ctx: &mut Context,
75        _acc: &mut HeartbeatAccumulator,
76    ) -> Result<HandleControl> {
77        let Some((key, peer, info)) = extract_base_info(req) else {
78            return Ok(HandleControl::Continue);
79        };
80
81        let value = NodeInfo {
82            peer,
83            last_activity_ts: common_time::util::current_time_millis(),
84            status: NodeStatus::Flownode(FlownodeStatus {}),
85            version: info.version,
86            git_commit: info.git_commit,
87            start_time_ms: info.start_time_ms,
88        };
89
90        put_into_memory_store(ctx, key, value).await?;
91
92        Ok(HandleControl::Continue)
93    }
94}
95
96/// The handler to collect cluster info from the heartbeat request of datanode.
97pub struct CollectDatanodeClusterInfoHandler;
98
99#[async_trait::async_trait]
100impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
101    fn is_acceptable(&self, role: Role) -> bool {
102        role == Role::Datanode
103    }
104
105    async fn handle(
106        &self,
107        req: &HeartbeatRequest,
108        ctx: &mut Context,
109        acc: &mut HeartbeatAccumulator,
110    ) -> Result<HandleControl> {
111        let Some((key, peer, info)) = extract_base_info(req) else {
112            return Ok(HandleControl::Continue);
113        };
114
115        let Some(stat) = &acc.stat else {
116            return Ok(HandleControl::Continue);
117        };
118
119        let leader_regions = stat
120            .region_stats
121            .iter()
122            .filter(|s| s.role == RegionRole::Leader)
123            .count();
124        let follower_regions = stat.region_stats.len() - leader_regions;
125
126        let value = NodeInfo {
127            peer,
128            last_activity_ts: stat.timestamp_millis,
129            status: NodeStatus::Datanode(DatanodeStatus {
130                rcus: stat.rcus,
131                wcus: stat.wcus,
132                leader_regions,
133                follower_regions,
134                workloads: stat.datanode_workloads.clone(),
135            }),
136            version: info.version,
137            git_commit: info.git_commit,
138            start_time_ms: info.start_time_ms,
139        };
140
141        put_into_memory_store(ctx, key, value).await?;
142
143        Ok(HandleControl::Continue)
144    }
145}
146
147fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
148    let HeartbeatRequest { peer, info, .. } = request;
149    let key = NodeInfoKey::new(request)?;
150    let Some(peer) = &peer else {
151        return None;
152    };
153    let Some(info) = &info else {
154        return None;
155    };
156
157    Some((key, peer.clone(), info.clone()))
158}
159
160async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
161    let key = (&key).into();
162    let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
163    let put_req = PutRequest {
164        key,
165        value,
166        ..Default::default()
167    };
168
169    ctx.in_memory
170        .put(put_req)
171        .await
172        .context(SaveClusterInfoSnafu)?;
173
174    Ok(())
175}