meta_srv/handler/
collect_topic_stats_handler.rs1use api::v1::meta::{HeartbeatRequest, Role};
16
17use crate::error::Result;
18use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
19use crate::metasrv::Context;
20
21pub struct CollectTopicStatsHandler;
22
23#[async_trait::async_trait]
24impl HeartbeatHandler for CollectTopicStatsHandler {
25 fn is_acceptable(&self, role: Role) -> bool {
26 role == Role::Datanode
27 }
28
29 async fn handle(
30 &self,
31 _req: &HeartbeatRequest,
32 ctx: &mut Context,
33 acc: &mut HeartbeatAccumulator,
34 ) -> Result<HandleControl> {
35 let Some(current_stat) = acc.stat.as_ref() else {
36 return Ok(HandleControl::Continue);
37 };
38
39 ctx.topic_stats_registry.add_stats(
40 current_stat.id,
41 ¤t_stat.topic_stats,
42 current_stat.timestamp_millis,
43 );
44
45 Ok(HandleControl::Continue)
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use common_meta::datanode::{Stat, TopicStat};
52 use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
53 use common_time::util::current_time_millis;
54
55 use super::*;
56 use crate::handler::test_utils::TestEnv;
57
58 #[tokio::test]
59 async fn test_handle_collect_topic_stats() {
60 let env = TestEnv::new();
61 let ctx = env.ctx();
62
63 let handler = CollectTopicStatsHandler;
64 let timestamp_millis = current_time_millis();
65 let aligned_ts = timestamp_millis - timestamp_millis % 1000;
66 handle_request_many_times(ctx.clone(), &handler, 1, timestamp_millis, 10).await;
67 handle_request_many_times(ctx.clone(), &handler, 2, timestamp_millis, 10).await;
68
69 let next_timestamp_millis =
71 timestamp_millis + (TOPIC_STATS_REPORT_INTERVAL_SECS * 1000) as i64;
72 handle_request_many_times(ctx.clone(), &handler, 1, next_timestamp_millis, 10).await;
73
74 let latest_entry_id = ctx
75 .topic_stats_registry
76 .get_latest_entry_id("test")
77 .unwrap();
78 assert_eq!(latest_entry_id, (15, aligned_ts));
79 let latest_entry_id = ctx
80 .topic_stats_registry
81 .get_latest_entry_id("test2")
82 .unwrap();
83 assert_eq!(latest_entry_id, (10, aligned_ts));
84 assert!(ctx
85 .topic_stats_registry
86 .get_latest_entry_id("test3")
87 .is_none());
88 }
89
90 async fn handle_request_many_times(
91 mut ctx: Context,
92 handler: &CollectTopicStatsHandler,
93 datanode_id: u64,
94 timestamp_millis: i64,
95 loop_times: i32,
96 ) {
97 let req = HeartbeatRequest::default();
98 for i in 1..=loop_times {
99 let mut acc = HeartbeatAccumulator {
100 stat: Some(Stat {
101 id: datanode_id,
102 region_num: i as _,
103 timestamp_millis,
104 topic_stats: vec![
105 TopicStat {
106 topic: "test".to_string(),
107 latest_entry_id: 15,
108 record_size: 1024,
109 record_num: 2,
110 },
111 TopicStat {
112 topic: "test2".to_string(),
113 latest_entry_id: 10,
114 record_size: 1024,
115 record_num: 2,
116 },
117 ],
118 ..Default::default()
119 }),
120 ..Default::default()
121 };
122 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
123 }
124 }
125}