meta_srv/service/admin/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_meta::datanode::DatanodeStatValue;
18use serde::{Deserialize, Serialize};
19use snafu::ResultExt;
20use tonic::codegen::http;
21
22use crate::cluster::MetaPeerClientRef;
23use crate::error::{self, Result};
24use crate::service::admin::{util, HttpHandler};
25
26#[derive(Clone)]
27pub struct HeartBeatHandler {
28    pub meta_peer_client: MetaPeerClientRef,
29}
30
31#[async_trait::async_trait]
32impl HttpHandler for HeartBeatHandler {
33    async fn handle(
34        &self,
35        path: &str,
36        _: http::Method,
37        params: &HashMap<String, String>,
38    ) -> Result<http::Response<String>> {
39        if path.ends_with("/help") {
40            return util::to_text_response(
41                r#"
42            - GET /heartbeat
43            - GET /heartbeat?addr=127.0.0.1:3001
44            "#,
45            );
46        }
47
48        let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
49        let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
50
51        if let Some(addr) = params.get("addr") {
52            stat_vals = filter_by_addr(stat_vals, addr);
53        }
54        let result = StatValues { stat_vals }.try_into()?;
55
56        http::Response::builder()
57            .status(http::StatusCode::OK)
58            .body(result)
59            .context(error::InvalidHttpBodySnafu)
60    }
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64#[serde(transparent)]
65pub struct StatValues {
66    pub stat_vals: Vec<DatanodeStatValue>,
67}
68
69impl TryFrom<StatValues> for String {
70    type Error = error::Error;
71
72    fn try_from(vals: StatValues) -> Result<Self> {
73        serde_json::to_string(&vals).context(error::SerializeToJsonSnafu {
74            input: format!("{vals:?}"),
75        })
76    }
77}
78
79fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<DatanodeStatValue> {
80    stat_vals
81        .into_iter()
82        .filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr))
83        .collect()
84}
85
86#[cfg(test)]
87mod tests {
88    use common_meta::datanode::{DatanodeStatValue, Stat};
89
90    use crate::service::admin::heartbeat::filter_by_addr;
91
92    #[tokio::test]
93    async fn test_filter_by_addr() {
94        let stat_value1 = DatanodeStatValue {
95            stats: vec![
96                Stat {
97                    addr: "127.0.0.1:3001".to_string(),
98                    timestamp_millis: 1,
99                    ..Default::default()
100                },
101                Stat {
102                    addr: "127.0.0.1:3001".to_string(),
103                    timestamp_millis: 2,
104                    ..Default::default()
105                },
106            ],
107        };
108
109        let stat_value2 = DatanodeStatValue {
110            stats: vec![
111                Stat {
112                    addr: "127.0.0.1:3002".to_string(),
113                    timestamp_millis: 3,
114                    ..Default::default()
115                },
116                Stat {
117                    addr: "127.0.0.1:3002".to_string(),
118                    timestamp_millis: 4,
119                    ..Default::default()
120                },
121                Stat {
122                    addr: "127.0.0.1:3002".to_string(),
123                    timestamp_millis: 5,
124                    ..Default::default()
125                },
126            ],
127        };
128
129        let mut stat_vals = vec![stat_value1, stat_value2];
130        stat_vals = filter_by_addr(stat_vals, "127.0.0.1:3002");
131        assert_eq!(stat_vals.len(), 1);
132        assert_eq!(stat_vals.first().unwrap().stats.len(), 3);
133        assert_eq!(
134            stat_vals
135                .first()
136                .unwrap()
137                .stats
138                .first()
139                .unwrap()
140                .timestamp_millis,
141            3
142        );
143    }
144}