meta_srv/handler/
failure_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, Role};
16use async_trait::async_trait;
17use common_telemetry::info;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};
23
24pub struct RegionFailureHandler {
25    heartbeat_acceptor: HeartbeatAcceptor,
26}
27
28impl RegionFailureHandler {
29    pub(crate) fn new(
30        mut region_supervisor: RegionSupervisor,
31        heartbeat_acceptor: HeartbeatAcceptor,
32    ) -> Self {
33        info!("Starting region supervisor");
34        common_runtime::spawn_global(async move { region_supervisor.run().await });
35        Self { heartbeat_acceptor }
36    }
37}
38
39#[async_trait]
40impl HeartbeatHandler for RegionFailureHandler {
41    fn is_acceptable(&self, role: Role) -> bool {
42        role == Role::Datanode
43    }
44
45    async fn handle(
46        &self,
47        _: &HeartbeatRequest,
48        _ctx: &mut Context,
49        acc: &mut HeartbeatAccumulator,
50    ) -> Result<HandleControl> {
51        let Some(stat) = acc.stat.as_ref() else {
52            return Ok(HandleControl::Continue);
53        };
54
55        self.heartbeat_acceptor
56            .accept(DatanodeHeartbeat::from(stat))
57            .await;
58
59        Ok(HandleControl::Continue)
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use api::v1::meta::HeartbeatRequest;
66    use common_catalog::consts::default_engine;
67    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
68    use store_api::region_engine::RegionRole;
69    use store_api::storage::RegionId;
70    use tokio::sync::oneshot;
71
72    use crate::handler::failure_handler::RegionFailureHandler;
73    use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
74    use crate::metasrv::builder::MetasrvBuilder;
75    use crate::region::supervisor::tests::new_test_supervisor;
76    use crate::region::supervisor::{Event, HeartbeatAcceptor};
77
78    #[tokio::test]
79    async fn test_handle_heartbeat() {
80        let (supervisor, sender) = new_test_supervisor();
81        let heartbeat_acceptor = HeartbeatAcceptor::new(sender.clone());
82        let handler = RegionFailureHandler::new(supervisor, heartbeat_acceptor);
83        let req = &HeartbeatRequest::default();
84        let builder = MetasrvBuilder::new();
85        let metasrv = builder.build().await.unwrap();
86        let mut ctx = metasrv.new_ctx();
87        let acc = &mut HeartbeatAccumulator::default();
88        fn new_region_stat(region_id: u64) -> RegionStat {
89            RegionStat {
90                id: RegionId::from_u64(region_id),
91                rcus: 0,
92                wcus: 0,
93                approximate_bytes: 0,
94                engine: default_engine().to_string(),
95                role: RegionRole::Follower,
96                num_rows: 0,
97                memtable_size: 0,
98                manifest_size: 0,
99                sst_size: 0,
100                index_size: 0,
101                region_manifest: RegionManifestInfo::Mito {
102                    manifest_version: 0,
103                    flushed_entry_id: 0,
104                },
105                data_topic_latest_entry_id: 0,
106                metadata_topic_latest_entry_id: 0,
107            }
108        }
109        acc.stat = Some(Stat {
110            id: 42,
111            region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
112            timestamp_millis: 1000,
113            ..Default::default()
114        });
115
116        handler.handle(req, &mut ctx, acc).await.unwrap();
117        let (tx, rx) = oneshot::channel();
118        sender.send(Event::Dump(tx)).await.unwrap();
119        let detector = rx.await.unwrap();
120        assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
121    }
122}