meta_srv/service/admin/
heartbeat.rs1use std::collections::HashMap;
16
17use common_meta::datanode::DatanodeStatValue;
18use serde::{Deserialize, Serialize};
19use snafu::ResultExt;
20use tonic::codegen::http;
21
22use crate::cluster::MetaPeerClientRef;
23use crate::error::{self, Result};
24use crate::service::admin::{util, HttpHandler};
25
26#[derive(Clone)]
27pub struct HeartBeatHandler {
28 pub meta_peer_client: MetaPeerClientRef,
29}
30
31#[async_trait::async_trait]
32impl HttpHandler for HeartBeatHandler {
33 async fn handle(
34 &self,
35 path: &str,
36 _: http::Method,
37 params: &HashMap<String, String>,
38 ) -> Result<http::Response<String>> {
39 if path.ends_with("/help") {
40 return util::to_text_response(
41 r#"
42 - GET /heartbeat
43 - GET /heartbeat?addr=127.0.0.1:3001
44 "#,
45 );
46 }
47
48 let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
49 let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
50
51 if let Some(addr) = params.get("addr") {
52 stat_vals = filter_by_addr(stat_vals, addr);
53 }
54 let result = StatValues { stat_vals }.try_into()?;
55
56 http::Response::builder()
57 .status(http::StatusCode::OK)
58 .body(result)
59 .context(error::InvalidHttpBodySnafu)
60 }
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64#[serde(transparent)]
65pub struct StatValues {
66 pub stat_vals: Vec<DatanodeStatValue>,
67}
68
69impl TryFrom<StatValues> for String {
70 type Error = error::Error;
71
72 fn try_from(vals: StatValues) -> Result<Self> {
73 serde_json::to_string(&vals).context(error::SerializeToJsonSnafu {
74 input: format!("{vals:?}"),
75 })
76 }
77}
78
79fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<DatanodeStatValue> {
80 stat_vals
81 .into_iter()
82 .filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr))
83 .collect()
84}
85
86#[cfg(test)]
87mod tests {
88 use common_meta::datanode::{DatanodeStatValue, Stat};
89
90 use crate::service::admin::heartbeat::filter_by_addr;
91
92 #[tokio::test]
93 async fn test_filter_by_addr() {
94 let stat_value1 = DatanodeStatValue {
95 stats: vec![
96 Stat {
97 addr: "127.0.0.1:3001".to_string(),
98 timestamp_millis: 1,
99 ..Default::default()
100 },
101 Stat {
102 addr: "127.0.0.1:3001".to_string(),
103 timestamp_millis: 2,
104 ..Default::default()
105 },
106 ],
107 };
108
109 let stat_value2 = DatanodeStatValue {
110 stats: vec![
111 Stat {
112 addr: "127.0.0.1:3002".to_string(),
113 timestamp_millis: 3,
114 ..Default::default()
115 },
116 Stat {
117 addr: "127.0.0.1:3002".to_string(),
118 timestamp_millis: 4,
119 ..Default::default()
120 },
121 Stat {
122 addr: "127.0.0.1:3002".to_string(),
123 timestamp_millis: 5,
124 ..Default::default()
125 },
126 ],
127 };
128
129 let mut stat_vals = vec![stat_value1, stat_value2];
130 stat_vals = filter_by_addr(stat_vals, "127.0.0.1:3002");
131 assert_eq!(stat_vals.len(), 1);
132 assert_eq!(stat_vals.first().unwrap().stats.len(), 3);
133 assert_eq!(
134 stat_vals
135 .first()
136 .unwrap()
137 .stats
138 .first()
139 .unwrap()
140 .timestamp_millis,
141 3
142 );
143 }
144}