meta_srv/handler/
collect_topic_stats_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, Role};
16
17use crate::error::Result;
18use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
19use crate::metasrv::Context;
20
21pub struct CollectTopicStatsHandler;
22
23#[async_trait::async_trait]
24impl HeartbeatHandler for CollectTopicStatsHandler {
25    fn is_acceptable(&self, role: Role) -> bool {
26        role == Role::Datanode
27    }
28
29    async fn handle(
30        &self,
31        _req: &HeartbeatRequest,
32        ctx: &mut Context,
33        acc: &mut HeartbeatAccumulator,
34    ) -> Result<HandleControl> {
35        let Some(current_stat) = acc.stat.as_ref() else {
36            return Ok(HandleControl::Continue);
37        };
38
39        ctx.topic_stats_registry.add_stats(
40            current_stat.id,
41            &current_stat.topic_stats,
42            current_stat.timestamp_millis,
43        );
44
45        Ok(HandleControl::Continue)
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use common_meta::datanode::{Stat, TopicStat};
52    use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
53    use common_time::util::current_time_millis;
54
55    use super::*;
56    use crate::handler::test_utils::TestEnv;
57
58    #[tokio::test]
59    async fn test_handle_collect_topic_stats() {
60        let env = TestEnv::new();
61        let ctx = env.ctx();
62
63        let handler = CollectTopicStatsHandler;
64        let timestamp_millis = current_time_millis();
65        let aligned_ts = timestamp_millis - timestamp_millis % 1000;
66        handle_request_many_times(ctx.clone(), &handler, 1, timestamp_millis, 10).await;
67        handle_request_many_times(ctx.clone(), &handler, 2, timestamp_millis, 10).await;
68
69        // trigger the next window
70        let next_timestamp_millis =
71            timestamp_millis + (TOPIC_STATS_REPORT_INTERVAL_SECS * 1000) as i64;
72        handle_request_many_times(ctx.clone(), &handler, 1, next_timestamp_millis, 10).await;
73
74        let latest_entry_id = ctx
75            .topic_stats_registry
76            .get_latest_entry_id("test")
77            .unwrap();
78        assert_eq!(latest_entry_id, (15, aligned_ts));
79        let latest_entry_id = ctx
80            .topic_stats_registry
81            .get_latest_entry_id("test2")
82            .unwrap();
83        assert_eq!(latest_entry_id, (10, aligned_ts));
84        assert!(ctx
85            .topic_stats_registry
86            .get_latest_entry_id("test3")
87            .is_none());
88    }
89
90    async fn handle_request_many_times(
91        mut ctx: Context,
92        handler: &CollectTopicStatsHandler,
93        datanode_id: u64,
94        timestamp_millis: i64,
95        loop_times: i32,
96    ) {
97        let req = HeartbeatRequest::default();
98        for i in 1..=loop_times {
99            let mut acc = HeartbeatAccumulator {
100                stat: Some(Stat {
101                    id: datanode_id,
102                    region_num: i as _,
103                    timestamp_millis,
104                    topic_stats: vec![
105                        TopicStat {
106                            topic: "test".to_string(),
107                            latest_entry_id: 15,
108                            record_size: 1024,
109                            record_num: 2,
110                        },
111                        TopicStat {
112                            topic: "test2".to_string(),
113                            latest_entry_id: 10,
114                            record_size: 1024,
115                            record_num: 2,
116                        },
117                    ],
118                    ..Default::default()
119                }),
120                ..Default::default()
121            };
122            handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
123        }
124    }
125}