meta_srv/handler/
collect_topic_stats_handler.rs1use api::v1::meta::{HeartbeatRequest, Role};
16
17use crate::error::Result;
18use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
19use crate::metasrv::Context;
20
21pub struct CollectTopicStatsHandler;
22
23#[async_trait::async_trait]
24impl HeartbeatHandler for CollectTopicStatsHandler {
25 fn is_acceptable(&self, role: Role) -> bool {
26 role == Role::Datanode
27 }
28
29 async fn handle(
30 &self,
31 _req: &HeartbeatRequest,
32 ctx: &mut Context,
33 acc: &mut HeartbeatAccumulator,
34 ) -> Result<HandleControl> {
35 let Some(current_stat) = acc.stat.as_ref() else {
36 return Ok(HandleControl::Continue);
37 };
38
39 ctx.topic_stats_registry.add_stats(
40 current_stat.id,
41 ¤t_stat.topic_stats,
42 current_stat.timestamp_millis,
43 );
44
45 Ok(HandleControl::Continue)
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use common_meta::datanode::{Stat, TopicStat};
52 use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
53 use common_time::util::current_time_millis;
54
55 use super::*;
56 use crate::handler::test_utils::TestEnv;
57
58 #[tokio::test]
59 async fn test_handle_collect_topic_stats() {
60 let env = TestEnv::new();
61 let ctx = env.ctx();
62
63 let handler = CollectTopicStatsHandler;
64 let timestamp_millis = current_time_millis();
65 let aligned_ts = timestamp_millis - timestamp_millis % 1000;
66 handle_request_many_times(ctx.clone(), &handler, 1, timestamp_millis, 10).await;
67 handle_request_many_times(ctx.clone(), &handler, 2, timestamp_millis, 10).await;
68
69 let next_timestamp_millis =
71 timestamp_millis + (TOPIC_STATS_REPORT_INTERVAL_SECS * 1000) as i64;
72 handle_request_many_times(ctx.clone(), &handler, 1, next_timestamp_millis, 10).await;
73
74 let latest_entry_id = ctx
75 .topic_stats_registry
76 .get_latest_entry_id("test")
77 .unwrap();
78 assert_eq!(latest_entry_id, (15, aligned_ts));
79 let latest_entry_id = ctx
80 .topic_stats_registry
81 .get_latest_entry_id("test2")
82 .unwrap();
83 assert_eq!(latest_entry_id, (10, aligned_ts));
84 assert!(
85 ctx.topic_stats_registry
86 .get_latest_entry_id("test3")
87 .is_none()
88 );
89 }
90
91 async fn handle_request_many_times(
92 mut ctx: Context,
93 handler: &CollectTopicStatsHandler,
94 datanode_id: u64,
95 timestamp_millis: i64,
96 loop_times: i32,
97 ) {
98 let req = HeartbeatRequest::default();
99 for i in 1..=loop_times {
100 let mut acc = HeartbeatAccumulator {
101 stat: Some(Stat {
102 id: datanode_id,
103 region_num: i as _,
104 timestamp_millis,
105 topic_stats: vec![
106 TopicStat {
107 topic: "test".to_string(),
108 latest_entry_id: 15,
109 record_size: 1024,
110 record_num: 2,
111 },
112 TopicStat {
113 topic: "test2".to_string(),
114 latest_entry_id: 10,
115 record_size: 1024,
116 record_num: 2,
117 },
118 ],
119 ..Default::default()
120 }),
121 ..Default::default()
122 };
123 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
124 }
125 }
126}