meta_srv/service/admin/
heartbeat.rs1use std::collections::HashMap;
16
17use axum::extract::{Query, State};
18use axum::response::{IntoResponse, Response};
19use axum::Json;
20use common_meta::datanode::DatanodeStatValue;
21use serde::{Deserialize, Serialize};
22use snafu::ResultExt;
23use tonic::codegen::http;
24
25use crate::cluster::MetaPeerClientRef;
26use crate::error::{self, Result};
27use crate::service::admin::util::ErrorHandler;
28use crate::service::admin::{util, HttpHandler};
29
30#[derive(Clone)]
31pub struct HeartBeatHandler {
32 pub meta_peer_client: MetaPeerClientRef,
33}
34
35impl HeartBeatHandler {
36 async fn get_heartbeat(&self, filter: Option<&str>) -> Result<StatValues> {
37 let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
38 let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
39
40 if let Some(addr) = filter {
41 stat_vals = filter_by_addr(stat_vals, addr);
42 }
43 Ok(StatValues { stat_vals })
44 }
45}
46
47fn find_addr_filter_from_params(params: &HashMap<String, String>) -> Option<&str> {
48 params.get("addr").map(|s| s.as_str())
49}
50
51#[axum_macros::debug_handler]
53pub(crate) async fn get(
54 State(handler): State<HeartBeatHandler>,
55 Query(params): Query<HashMap<String, String>>,
56) -> Response {
57 let filter = find_addr_filter_from_params(¶ms);
58 handler
59 .get_heartbeat(filter)
60 .await
61 .map_err(ErrorHandler::new)
62 .map(Json)
63 .into_response()
64}
65
66#[axum_macros::debug_handler]
68pub(crate) async fn help() -> Response {
69 r#"
70 - GET /heartbeat
71 - GET /heartbeat?addr=127.0.0.1:3001
72 "#
73 .into_response()
74}
75
76#[async_trait::async_trait]
77impl HttpHandler for HeartBeatHandler {
78 async fn handle(
79 &self,
80 path: &str,
81 _: http::Method,
82 params: &HashMap<String, String>,
83 ) -> Result<http::Response<String>> {
84 if path.ends_with("/help") {
85 return util::to_text_response(
86 r#"
87 - GET /heartbeat
88 - GET /heartbeat?addr=127.0.0.1:3001
89 "#,
90 );
91 }
92
93 let filter = find_addr_filter_from_params(params);
94 let result = self.get_heartbeat(filter).await?.try_into()?;
95
96 http::Response::builder()
97 .status(http::StatusCode::OK)
98 .body(result)
99 .context(error::InvalidHttpBodySnafu)
100 }
101}
102
103#[derive(Debug, Serialize, Deserialize)]
104#[serde(transparent)]
105pub struct StatValues {
106 pub stat_vals: Vec<DatanodeStatValue>,
107}
108
109impl TryFrom<StatValues> for String {
110 type Error = error::Error;
111
112 fn try_from(vals: StatValues) -> Result<Self> {
113 serde_json::to_string(&vals).context(error::SerializeToJsonSnafu {
114 input: format!("{vals:?}"),
115 })
116 }
117}
118
119fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<DatanodeStatValue> {
120 stat_vals
121 .into_iter()
122 .filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr))
123 .collect()
124}
125
126#[cfg(test)]
127mod tests {
128 use std::sync::Arc;
129
130 use axum::body::{to_bytes, Body};
131 use axum::http::{self, Request};
132 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
133 use common_meta::kv_backend::memory::MemoryKvBackend;
134 use common_meta::kv_backend::KvBackendRef;
135 use common_meta::rpc::store::PutRequest;
136 use tower::ServiceExt;
137
138 use crate::cluster::MetaPeerClientBuilder;
139 use crate::service::admin::heartbeat::{self, filter_by_addr, HeartBeatHandler};
140
141 #[tokio::test]
142 async fn test_filter_by_addr() {
143 let stat_value1 = DatanodeStatValue {
144 stats: vec![
145 Stat {
146 addr: "127.0.0.1:3001".to_string(),
147 timestamp_millis: 1,
148 ..Default::default()
149 },
150 Stat {
151 addr: "127.0.0.1:3001".to_string(),
152 timestamp_millis: 2,
153 ..Default::default()
154 },
155 ],
156 };
157
158 let stat_value2 = DatanodeStatValue {
159 stats: vec![
160 Stat {
161 addr: "127.0.0.1:3002".to_string(),
162 timestamp_millis: 3,
163 ..Default::default()
164 },
165 Stat {
166 addr: "127.0.0.1:3002".to_string(),
167 timestamp_millis: 4,
168 ..Default::default()
169 },
170 Stat {
171 addr: "127.0.0.1:3002".to_string(),
172 timestamp_millis: 5,
173 ..Default::default()
174 },
175 ],
176 };
177
178 let mut stat_vals = vec![stat_value1, stat_value2];
179 stat_vals = filter_by_addr(stat_vals, "127.0.0.1:3002");
180 assert_eq!(stat_vals.len(), 1);
181 assert_eq!(stat_vals.first().unwrap().stats.len(), 3);
182 assert_eq!(
183 stat_vals
184 .first()
185 .unwrap()
186 .stats
187 .first()
188 .unwrap()
189 .timestamp_millis,
190 3
191 );
192 }
193
194 async fn get_body_string(resp: axum::response::Response) -> String {
195 let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
196 String::from_utf8_lossy(&body_bytes).to_string()
197 }
198
199 async fn put_stat_value(kv_backend: &KvBackendRef, node_id: u64, addr: &str) {
200 let value: Vec<u8> = DatanodeStatValue {
201 stats: vec![Stat {
202 addr: addr.to_string(),
203 timestamp_millis: 3,
204 ..Default::default()
205 }],
206 }
207 .try_into()
208 .unwrap();
209 let put = PutRequest::new()
210 .with_key(DatanodeStatKey { node_id })
211 .with_value(value);
212 kv_backend.put(put).await.unwrap();
213 }
214
215 #[tokio::test]
216 async fn test_get_heartbeat_with_filter() {
217 common_telemetry::init_default_ut_logging();
218 let kv_backend = Arc::new(MemoryKvBackend::new());
219 let meta_peer_client = MetaPeerClientBuilder::default()
220 .election(None)
221 .in_memory(kv_backend.clone())
222 .build()
223 .map(Arc::new)
224 .unwrap();
225 let kv_backend = kv_backend as _;
226 put_stat_value(&kv_backend, 0, "127.0.0.1:3000").await;
227 put_stat_value(&kv_backend, 1, "127.0.0.1:3001").await;
228 put_stat_value(&kv_backend, 2, "127.0.0.1:3002").await;
229
230 let handler = HeartBeatHandler {
231 meta_peer_client: meta_peer_client.clone(),
232 };
233 let app = axum::Router::new()
234 .route("/", axum::routing::get(heartbeat::get))
235 .with_state(handler);
236
237 let req = Request::builder()
238 .uri("/?addr=127.0.0.1:3003")
239 .body(Body::empty())
240 .unwrap();
241 let resp = app.clone().oneshot(req).await.unwrap();
242 let status = resp.status();
243 let body = get_body_string(resp).await;
244 assert_eq!(status, http::StatusCode::OK);
245 assert_eq!(body, r#"[]"#);
246
247 let req = Request::builder()
248 .uri("/?addr=127.0.0.1:3001")
249 .body(Body::empty())
250 .unwrap();
251 let resp = app.oneshot(req).await.unwrap();
252 let status = resp.status();
253 let body = get_body_string(resp).await;
254 assert_eq!(status, http::StatusCode::OK);
255 assert_eq!(
256 body,
257 "[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]"
258 );
259 }
260}