meta_srv/service/admin/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use axum::extract::{Query, State};
18use axum::response::{IntoResponse, Response};
19use axum::Json;
20use common_meta::datanode::DatanodeStatValue;
21use serde::{Deserialize, Serialize};
22use snafu::ResultExt;
23use tonic::codegen::http;
24
25use crate::cluster::MetaPeerClientRef;
26use crate::error::{self, Result};
27use crate::service::admin::util::ErrorHandler;
28use crate::service::admin::{util, HttpHandler};
29
30#[derive(Clone)]
31pub struct HeartBeatHandler {
32    pub meta_peer_client: MetaPeerClientRef,
33}
34
35impl HeartBeatHandler {
36    async fn get_heartbeat(&self, filter: Option<&str>) -> Result<StatValues> {
37        let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
38        let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
39
40        if let Some(addr) = filter {
41            stat_vals = filter_by_addr(stat_vals, addr);
42        }
43        Ok(StatValues { stat_vals })
44    }
45}
46
47fn find_addr_filter_from_params(params: &HashMap<String, String>) -> Option<&str> {
48    params.get("addr").map(|s| s.as_str())
49}
50
51/// Get the heartbeat handler.
52#[axum_macros::debug_handler]
53pub(crate) async fn get(
54    State(handler): State<HeartBeatHandler>,
55    Query(params): Query<HashMap<String, String>>,
56) -> Response {
57    let filter = find_addr_filter_from_params(&params);
58    handler
59        .get_heartbeat(filter)
60        .await
61        .map_err(ErrorHandler::new)
62        .map(Json)
63        .into_response()
64}
65
66/// Get the heartbeat help handler.
67#[axum_macros::debug_handler]
68pub(crate) async fn help() -> Response {
69    r#"
70    - GET /heartbeat
71    - GET /heartbeat?addr=127.0.0.1:3001
72    "#
73    .into_response()
74}
75
76#[async_trait::async_trait]
77impl HttpHandler for HeartBeatHandler {
78    async fn handle(
79        &self,
80        path: &str,
81        _: http::Method,
82        params: &HashMap<String, String>,
83    ) -> Result<http::Response<String>> {
84        if path.ends_with("/help") {
85            return util::to_text_response(
86                r#"
87            - GET /heartbeat
88            - GET /heartbeat?addr=127.0.0.1:3001
89            "#,
90            );
91        }
92
93        let filter = find_addr_filter_from_params(params);
94        let result = self.get_heartbeat(filter).await?.try_into()?;
95
96        http::Response::builder()
97            .status(http::StatusCode::OK)
98            .body(result)
99            .context(error::InvalidHttpBodySnafu)
100    }
101}
102
103#[derive(Debug, Serialize, Deserialize)]
104#[serde(transparent)]
105pub struct StatValues {
106    pub stat_vals: Vec<DatanodeStatValue>,
107}
108
109impl TryFrom<StatValues> for String {
110    type Error = error::Error;
111
112    fn try_from(vals: StatValues) -> Result<Self> {
113        serde_json::to_string(&vals).context(error::SerializeToJsonSnafu {
114            input: format!("{vals:?}"),
115        })
116    }
117}
118
119fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<DatanodeStatValue> {
120    stat_vals
121        .into_iter()
122        .filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr))
123        .collect()
124}
125
126#[cfg(test)]
127mod tests {
128    use std::sync::Arc;
129
130    use axum::body::{to_bytes, Body};
131    use axum::http::{self, Request};
132    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
133    use common_meta::kv_backend::memory::MemoryKvBackend;
134    use common_meta::kv_backend::KvBackendRef;
135    use common_meta::rpc::store::PutRequest;
136    use tower::ServiceExt;
137
138    use crate::cluster::MetaPeerClientBuilder;
139    use crate::service::admin::heartbeat::{self, filter_by_addr, HeartBeatHandler};
140
141    #[tokio::test]
142    async fn test_filter_by_addr() {
143        let stat_value1 = DatanodeStatValue {
144            stats: vec![
145                Stat {
146                    addr: "127.0.0.1:3001".to_string(),
147                    timestamp_millis: 1,
148                    ..Default::default()
149                },
150                Stat {
151                    addr: "127.0.0.1:3001".to_string(),
152                    timestamp_millis: 2,
153                    ..Default::default()
154                },
155            ],
156        };
157
158        let stat_value2 = DatanodeStatValue {
159            stats: vec![
160                Stat {
161                    addr: "127.0.0.1:3002".to_string(),
162                    timestamp_millis: 3,
163                    ..Default::default()
164                },
165                Stat {
166                    addr: "127.0.0.1:3002".to_string(),
167                    timestamp_millis: 4,
168                    ..Default::default()
169                },
170                Stat {
171                    addr: "127.0.0.1:3002".to_string(),
172                    timestamp_millis: 5,
173                    ..Default::default()
174                },
175            ],
176        };
177
178        let mut stat_vals = vec![stat_value1, stat_value2];
179        stat_vals = filter_by_addr(stat_vals, "127.0.0.1:3002");
180        assert_eq!(stat_vals.len(), 1);
181        assert_eq!(stat_vals.first().unwrap().stats.len(), 3);
182        assert_eq!(
183            stat_vals
184                .first()
185                .unwrap()
186                .stats
187                .first()
188                .unwrap()
189                .timestamp_millis,
190            3
191        );
192    }
193
194    async fn get_body_string(resp: axum::response::Response) -> String {
195        let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
196        String::from_utf8_lossy(&body_bytes).to_string()
197    }
198
199    async fn put_stat_value(kv_backend: &KvBackendRef, node_id: u64, addr: &str) {
200        let value: Vec<u8> = DatanodeStatValue {
201            stats: vec![Stat {
202                addr: addr.to_string(),
203                timestamp_millis: 3,
204                ..Default::default()
205            }],
206        }
207        .try_into()
208        .unwrap();
209        let put = PutRequest::new()
210            .with_key(DatanodeStatKey { node_id })
211            .with_value(value);
212        kv_backend.put(put).await.unwrap();
213    }
214
215    #[tokio::test]
216    async fn test_get_heartbeat_with_filter() {
217        common_telemetry::init_default_ut_logging();
218        let kv_backend = Arc::new(MemoryKvBackend::new());
219        let meta_peer_client = MetaPeerClientBuilder::default()
220            .election(None)
221            .in_memory(kv_backend.clone())
222            .build()
223            .map(Arc::new)
224            .unwrap();
225        let kv_backend = kv_backend as _;
226        put_stat_value(&kv_backend, 0, "127.0.0.1:3000").await;
227        put_stat_value(&kv_backend, 1, "127.0.0.1:3001").await;
228        put_stat_value(&kv_backend, 2, "127.0.0.1:3002").await;
229
230        let handler = HeartBeatHandler {
231            meta_peer_client: meta_peer_client.clone(),
232        };
233        let app = axum::Router::new()
234            .route("/", axum::routing::get(heartbeat::get))
235            .with_state(handler);
236
237        let req = Request::builder()
238            .uri("/?addr=127.0.0.1:3003")
239            .body(Body::empty())
240            .unwrap();
241        let resp = app.clone().oneshot(req).await.unwrap();
242        let status = resp.status();
243        let body = get_body_string(resp).await;
244        assert_eq!(status, http::StatusCode::OK);
245        assert_eq!(body, r#"[]"#);
246
247        let req = Request::builder()
248            .uri("/?addr=127.0.0.1:3001")
249            .body(Body::empty())
250            .unwrap();
251        let resp = app.oneshot(req).await.unwrap();
252        let status = resp.status();
253        let body = get_body_string(resp).await;
254        assert_eq!(status, http::StatusCode::OK);
255        assert_eq!(
256            body,
257            "[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]"
258        );
259    }
260}