meta_srv/handler/
failure_handler.rs1use api::v1::meta::{HeartbeatRequest, Role};
16use async_trait::async_trait;
17use common_telemetry::info;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};
23
24pub struct RegionFailureHandler {
25 heartbeat_acceptor: HeartbeatAcceptor,
26}
27
28impl RegionFailureHandler {
29 pub(crate) fn new(
30 mut region_supervisor: RegionSupervisor,
31 heartbeat_acceptor: HeartbeatAcceptor,
32 ) -> Self {
33 info!("Starting region supervisor");
34 common_runtime::spawn_global(async move { region_supervisor.run().await });
35 Self { heartbeat_acceptor }
36 }
37}
38
39#[async_trait]
40impl HeartbeatHandler for RegionFailureHandler {
41 fn is_acceptable(&self, role: Role) -> bool {
42 role == Role::Datanode
43 }
44
45 async fn handle(
46 &self,
47 _: &HeartbeatRequest,
48 _ctx: &mut Context,
49 acc: &mut HeartbeatAccumulator,
50 ) -> Result<HandleControl> {
51 let Some(stat) = acc.stat.as_ref() else {
52 return Ok(HandleControl::Continue);
53 };
54
55 self.heartbeat_acceptor
56 .accept(DatanodeHeartbeat::from(stat))
57 .await;
58
59 Ok(HandleControl::Continue)
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use api::v1::meta::HeartbeatRequest;
66 use common_catalog::consts::default_engine;
67 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
68 use store_api::region_engine::RegionRole;
69 use store_api::storage::RegionId;
70 use tokio::sync::oneshot;
71
72 use crate::handler::failure_handler::RegionFailureHandler;
73 use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
74 use crate::metasrv::builder::MetasrvBuilder;
75 use crate::region::supervisor::tests::new_test_supervisor;
76 use crate::region::supervisor::{Event, HeartbeatAcceptor};
77
78 #[tokio::test]
79 async fn test_handle_heartbeat() {
80 let (supervisor, sender) = new_test_supervisor();
81 let heartbeat_acceptor = HeartbeatAcceptor::new(sender.clone());
82 let handler = RegionFailureHandler::new(supervisor, heartbeat_acceptor);
83 let req = &HeartbeatRequest::default();
84 let builder = MetasrvBuilder::new();
85 let metasrv = builder.build().await.unwrap();
86 let mut ctx = metasrv.new_ctx();
87 let acc = &mut HeartbeatAccumulator::default();
88 fn new_region_stat(region_id: u64) -> RegionStat {
89 RegionStat {
90 id: RegionId::from_u64(region_id),
91 rcus: 0,
92 wcus: 0,
93 approximate_bytes: 0,
94 engine: default_engine().to_string(),
95 role: RegionRole::Follower,
96 num_rows: 0,
97 memtable_size: 0,
98 manifest_size: 0,
99 sst_size: 0,
100 index_size: 0,
101 region_manifest: RegionManifestInfo::Mito {
102 manifest_version: 0,
103 flushed_entry_id: 0,
104 },
105 data_topic_latest_entry_id: 0,
106 metadata_topic_latest_entry_id: 0,
107 }
108 }
109 acc.stat = Some(Stat {
110 id: 42,
111 region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
112 timestamp_millis: 1000,
113 ..Default::default()
114 });
115
116 handler.handle(req, &mut ctx, acc).await.unwrap();
117 let (tx, rx) = oneshot::channel();
118 sender.send(Event::Dump(tx)).await.unwrap();
119 let detector = rx.await.unwrap();
120 assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
121 }
122}