meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use common_meta::datanode::Stat;
22use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
23use common_meta::key::maintenance::MaintenanceModeManagerRef;
24use common_meta::leadership_notifier::LeadershipChangeListener;
25use common_meta::peer::{Peer, PeerLookupServiceRef};
26use common_meta::DatanodeId;
27use common_runtime::JoinHandle;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::storage::RegionId;
33use tokio::sync::mpsc::{Receiver, Sender};
34use tokio::time::{interval, MissedTickBehavior};
35
36use crate::error::{self, Result};
37use crate::failure_detector::PhiAccrualFailureDetectorOptions;
38use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
39use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
40use crate::procedure::region_migration::{
41    RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
42};
43use crate::region::failure_detector::RegionFailureDetector;
44use crate::selector::SelectorOptions;
45
46/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
47/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
48/// and a timestamp indicating when the heartbeat was sent.
49#[derive(Debug)]
50pub(crate) struct DatanodeHeartbeat {
51    datanode_id: DatanodeId,
52    // TODO(weny): Considers collecting the memtable size in regions.
53    regions: Vec<RegionId>,
54    timestamp: i64,
55}
56
57impl From<&Stat> for DatanodeHeartbeat {
58    fn from(value: &Stat) -> Self {
59        DatanodeHeartbeat {
60            datanode_id: value.id,
61            regions: value.region_stats.iter().map(|x| x.id).collect(),
62            timestamp: value.timestamp_millis,
63        }
64    }
65}
66
67/// `Event` represents various types of events that can be processed by the region supervisor.
68/// These events are crucial for managing state transitions and handling specific scenarios
69/// in the region lifecycle.
70///
71/// Variants:
72/// - `Tick`: This event is used to trigger region failure detection periodically.
73/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
74/// - `Clear`: This event is used to reset the state of the supervisor, typically used
75///   when a system-wide reset or reinitialization is needed.
76/// - `Dump`: (Available only in test) This event triggers a dump of the
77///   current state for debugging purposes. It allows developers to inspect the internal state
78///   of the supervisor during tests.
79pub(crate) enum Event {
80    Tick,
81    RegisterFailureDetectors(Vec<DetectingRegion>),
82    DeregisterFailureDetectors(Vec<DetectingRegion>),
83    HeartbeatArrived(DatanodeHeartbeat),
84    Clear,
85    #[cfg(test)]
86    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
87}
88
89#[cfg(test)]
90impl Event {
91    pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
92        match self {
93            Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
94            _ => unreachable!(),
95        }
96    }
97}
98
99impl Debug for Event {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Self::Tick => write!(f, "Tick"),
103            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
104            Self::Clear => write!(f, "Clear"),
105            Self::RegisterFailureDetectors(arg0) => f
106                .debug_tuple("RegisterFailureDetectors")
107                .field(arg0)
108                .finish(),
109            Self::DeregisterFailureDetectors(arg0) => f
110                .debug_tuple("DeregisterFailureDetectors")
111                .field(arg0)
112                .finish(),
113            #[cfg(test)]
114            Self::Dump(_) => f.debug_struct("Dump").finish(),
115        }
116    }
117}
118
119pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
120
121/// A background job to generate [`Event::Tick`] type events.
122#[derive(Debug)]
123pub struct RegionSupervisorTicker {
124    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
125    tick_handle: Mutex<Option<JoinHandle<()>>>,
126
127    /// The interval of tick.
128    tick_interval: Duration,
129
130    /// Sends [Event]s.
131    sender: Sender<Event>,
132}
133
134#[async_trait]
135impl LeadershipChangeListener for RegionSupervisorTicker {
136    fn name(&self) -> &'static str {
137        "RegionSupervisorTicker"
138    }
139
140    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
141        self.start();
142        Ok(())
143    }
144
145    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
146        self.stop();
147        Ok(())
148    }
149}
150
151impl RegionSupervisorTicker {
152    pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
153        Self {
154            tick_handle: Mutex::new(None),
155            tick_interval,
156            sender,
157        }
158    }
159
160    /// Starts the ticker.
161    pub fn start(&self) {
162        let mut handle = self.tick_handle.lock().unwrap();
163        if handle.is_none() {
164            let sender = self.sender.clone();
165            let tick_interval = self.tick_interval;
166            let ticker_loop = tokio::spawn(async move {
167                let mut interval = interval(tick_interval);
168                interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
169                if let Err(err) = sender.send(Event::Clear).await {
170                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
171                    return;
172                }
173                loop {
174                    interval.tick().await;
175                    if sender.send(Event::Tick).await.is_err() {
176                        info!("EventReceiver is dropped, tick loop is stopped");
177                        break;
178                    }
179                }
180            });
181            *handle = Some(ticker_loop);
182        }
183    }
184
185    /// Stops the ticker.
186    pub fn stop(&self) {
187        let handle = self.tick_handle.lock().unwrap().take();
188        if let Some(handle) = handle {
189            handle.abort();
190            info!("The tick loop is stopped.");
191        }
192    }
193}
194
195impl Drop for RegionSupervisorTicker {
196    fn drop(&mut self) {
197        self.stop();
198    }
199}
200
201pub type RegionSupervisorRef = Arc<RegionSupervisor>;
202
203/// The default tick interval.
204pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
205
206/// Selector for region supervisor.
207pub enum RegionSupervisorSelector {
208    NaiveSelector(SelectorRef),
209    RegionStatAwareSelector(RegionStatAwareSelectorRef),
210}
211
212/// The [`RegionSupervisor`] is used to detect Region failures
213/// and initiate Region failover upon detection, ensuring uninterrupted region service.
214pub struct RegionSupervisor {
215    /// Used to detect the failure of regions.
216    failure_detector: RegionFailureDetector,
217    /// Tracks the number of failovers for each region.
218    failover_counts: HashMap<DetectingRegion, u32>,
219    /// Receives [Event]s.
220    receiver: Receiver<Event>,
221    /// The context of [`SelectorRef`]
222    selector_context: SelectorContext,
223    /// Candidate node selector.
224    selector: RegionSupervisorSelector,
225    /// Region migration manager.
226    region_migration_manager: RegionMigrationManagerRef,
227    /// The maintenance mode manager.
228    maintenance_mode_manager: MaintenanceModeManagerRef,
229    /// Peer lookup service
230    peer_lookup: PeerLookupServiceRef,
231}
232
233/// Controller for managing failure detectors for regions.
234#[derive(Debug, Clone)]
235pub struct RegionFailureDetectorControl {
236    sender: Sender<Event>,
237}
238
239impl RegionFailureDetectorControl {
240    pub(crate) fn new(sender: Sender<Event>) -> Self {
241        Self { sender }
242    }
243}
244
245#[async_trait::async_trait]
246impl RegionFailureDetectorController for RegionFailureDetectorControl {
247    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
248        if let Err(err) = self
249            .sender
250            .send(Event::RegisterFailureDetectors(detecting_regions))
251            .await
252        {
253            error!(err; "RegionSupervisor has stop receiving heartbeat.");
254        }
255    }
256
257    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
258        if let Err(err) = self
259            .sender
260            .send(Event::DeregisterFailureDetectors(detecting_regions))
261            .await
262        {
263            error!(err; "RegionSupervisor has stop receiving heartbeat.");
264        }
265    }
266}
267
268/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
269#[derive(Clone)]
270pub(crate) struct HeartbeatAcceptor {
271    sender: Sender<Event>,
272}
273
274impl HeartbeatAcceptor {
275    pub(crate) fn new(sender: Sender<Event>) -> Self {
276        Self { sender }
277    }
278
279    /// Accepts heartbeats from datanodes.
280    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
281        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
282            error!(err; "RegionSupervisor has stop receiving heartbeat.");
283        }
284    }
285}
286
287impl RegionSupervisor {
288    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
289    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
290        tokio::sync::mpsc::channel(1024)
291    }
292
293    pub(crate) fn new(
294        event_receiver: Receiver<Event>,
295        options: PhiAccrualFailureDetectorOptions,
296        selector_context: SelectorContext,
297        selector: RegionSupervisorSelector,
298        region_migration_manager: RegionMigrationManagerRef,
299        maintenance_mode_manager: MaintenanceModeManagerRef,
300        peer_lookup: PeerLookupServiceRef,
301    ) -> Self {
302        Self {
303            failure_detector: RegionFailureDetector::new(options),
304            failover_counts: HashMap::new(),
305            receiver: event_receiver,
306            selector_context,
307            selector,
308            region_migration_manager,
309            maintenance_mode_manager,
310            peer_lookup,
311        }
312    }
313
314    /// Runs the main loop.
315    pub(crate) async fn run(&mut self) {
316        while let Some(event) = self.receiver.recv().await {
317            match event {
318                Event::Tick => {
319                    let regions = self.detect_region_failure();
320                    self.handle_region_failures(regions).await;
321                }
322                Event::RegisterFailureDetectors(detecting_regions) => {
323                    self.register_failure_detectors(detecting_regions).await
324                }
325                Event::DeregisterFailureDetectors(detecting_regions) => {
326                    self.deregister_failure_detectors(detecting_regions).await
327                }
328                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
329                Event::Clear => self.clear(),
330                #[cfg(test)]
331                Event::Dump(sender) => {
332                    let _ = sender.send(self.failure_detector.dump());
333                }
334            }
335        }
336        info!("RegionSupervisor is stopped!");
337    }
338
339    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
340        let ts_millis = current_time_millis();
341        for region in detecting_regions {
342            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
343            self.failure_detector
344                .maybe_init_region_failure_detector(region, ts_millis);
345        }
346    }
347
348    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
349        for region in detecting_regions {
350            self.failure_detector.remove(&region);
351            self.failover_counts.remove(&region);
352        }
353    }
354
355    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
356        if regions.is_empty() {
357            return;
358        }
359        match self.is_maintenance_mode_enabled().await {
360            Ok(false) => {}
361            Ok(true) => {
362                warn!(
363                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
364                    regions
365                );
366                return;
367            }
368            Err(err) => {
369                error!(err; "Failed to check maintenance mode");
370                return;
371            }
372        }
373
374        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
375        let migrating_regions = regions
376            .extract_if(.., |(_, region_id)| {
377                self.region_migration_manager.tracker().contains(*region_id)
378            })
379            .collect::<Vec<_>>();
380
381        for (datanode_id, region_id) in migrating_regions {
382            debug!(
383                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
384            );
385        }
386
387        if regions.is_empty() {
388            // If all detected regions are failover or migrating, just return.
389            return;
390        }
391
392        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
393            HashMap::with_capacity(regions.len());
394        for (datanode_id, region_id) in regions {
395            grouped_regions
396                .entry(datanode_id)
397                .or_default()
398                .push(region_id);
399        }
400
401        for (datanode_id, regions) in grouped_regions {
402            warn!(
403                "Detects region failures on datanode: {}, regions: {:?}",
404                datanode_id, regions
405            );
406            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
407            // because there may be false positives in failure detection on the datanode.
408            // So we only consider the datanode that reports the failure.
409            let failed_datanodes = [datanode_id];
410            match self
411                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
412                .await
413            {
414                Ok(tasks) => {
415                    for (task, count) in tasks {
416                        let region_id = task.region_id;
417                        let datanode_id = task.from_peer.id;
418                        if let Err(err) = self.do_failover(task, count).await {
419                            error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
420                        }
421                    }
422                }
423                Err(err) => error!(err; "Failed to generate failover tasks"),
424            }
425        }
426    }
427
428    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
429        self.maintenance_mode_manager
430            .maintenance_mode()
431            .await
432            .context(error::MaintenanceModeManagerSnafu)
433    }
434
435    async fn select_peers(
436        &self,
437        from_peer_id: DatanodeId,
438        regions: &[RegionId],
439        failure_datanodes: &[DatanodeId],
440    ) -> Result<Vec<(RegionId, Peer)>> {
441        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
442        match &self.selector {
443            RegionSupervisorSelector::NaiveSelector(selector) => {
444                let opt = SelectorOptions {
445                    min_required_items: regions.len(),
446                    allow_duplication: true,
447                    exclude_peer_ids,
448                };
449                let peers = selector.select(&self.selector_context, opt).await?;
450                ensure!(
451                    peers.len() == regions.len(),
452                    error::NoEnoughAvailableNodeSnafu {
453                        required: regions.len(),
454                        available: peers.len(),
455                        select_target: SelectTarget::Datanode,
456                    }
457                );
458                let region_peers = regions
459                    .iter()
460                    .zip(peers)
461                    .map(|(region_id, peer)| (*region_id, peer))
462                    .collect::<Vec<_>>();
463
464                Ok(region_peers)
465            }
466            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
467                let peers = selector
468                    .select(
469                        &self.selector_context,
470                        from_peer_id,
471                        regions,
472                        exclude_peer_ids,
473                    )
474                    .await?;
475                ensure!(
476                    peers.len() == regions.len(),
477                    error::NoEnoughAvailableNodeSnafu {
478                        required: regions.len(),
479                        available: peers.len(),
480                        select_target: SelectTarget::Datanode,
481                    }
482                );
483
484                Ok(peers)
485            }
486        }
487    }
488
489    async fn generate_failover_tasks(
490        &mut self,
491        from_peer_id: DatanodeId,
492        regions: &[RegionId],
493        failed_datanodes: &[DatanodeId],
494    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
495        let mut tasks = Vec::with_capacity(regions.len());
496        let from_peer = self
497            .peer_lookup
498            .datanode(from_peer_id)
499            .await
500            .context(error::LookupPeerSnafu {
501                peer_id: from_peer_id,
502            })?
503            .context(error::PeerUnavailableSnafu {
504                peer_id: from_peer_id,
505            })?;
506        let region_peers = self
507            .select_peers(from_peer_id, regions, failed_datanodes)
508            .await?;
509
510        for (region_id, peer) in region_peers {
511            let count = *self
512                .failover_counts
513                .entry((from_peer_id, region_id))
514                .and_modify(|count| *count += 1)
515                .or_insert(1);
516            let task = RegionMigrationProcedureTask {
517                region_id,
518                from_peer: from_peer.clone(),
519                to_peer: peer,
520                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
521            };
522            tasks.push((task, count));
523        }
524
525        Ok(tasks)
526    }
527
528    async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
529        let from_peer_id = task.from_peer.id;
530        let region_id = task.region_id;
531
532        info!(
533            "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
534            task.region_id, task.from_peer, task.to_peer, task.timeout, count
535        );
536
537        if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
538            return match err {
539                // Returns Ok if it's running or table is dropped.
540                MigrationRunning { .. } => {
541                    info!(
542                        "Another region migration is running, skip failover for region: {}, datanode: {}",
543                        region_id, from_peer_id
544                    );
545                    Ok(())
546                }
547                TableRouteNotFound { .. } => {
548                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
549                        .await;
550                    info!(
551                        "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
552                        region_id, from_peer_id
553                    );
554                    Ok(())
555                }
556                LeaderPeerChanged { .. } => {
557                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
558                        .await;
559                    info!(
560                        "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
561                        region_id, from_peer_id
562                    );
563                    Ok(())
564                }
565                err => Err(err),
566            };
567        };
568
569        Ok(())
570    }
571
572    /// Detects the failure of regions.
573    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
574        self.failure_detector
575            .iter()
576            .filter_map(|e| {
577                // Intentionally not place `current_time_millis()` out of the iteration.
578                // The failure detection determination should be happened "just in time",
579                // i.e., failed or not has to be compared with the most recent "now".
580                // Besides, it might reduce the false positive of failure detection,
581                // because during the iteration, heartbeats are coming in as usual,
582                // and the `phi`s are still updating.
583                if !e.failure_detector().is_available(current_time_millis()) {
584                    Some(*e.region_ident())
585                } else {
586                    None
587                }
588            })
589            .collect::<Vec<_>>()
590    }
591
592    /// Updates the state of corresponding failure detectors.
593    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
594        for region_id in heartbeat.regions {
595            let detecting_region = (heartbeat.datanode_id, region_id);
596            let mut detector = self
597                .failure_detector
598                .region_failure_detector(detecting_region);
599            detector.heartbeat(heartbeat.timestamp);
600        }
601    }
602
603    fn clear(&self) {
604        self.failure_detector.clear();
605    }
606}
607
608#[cfg(test)]
609pub(crate) mod tests {
610    use std::assert_matches::assert_matches;
611    use std::sync::{Arc, Mutex};
612    use std::time::Duration;
613
614    use common_meta::ddl::RegionFailureDetectorController;
615    use common_meta::key::maintenance;
616    use common_meta::peer::Peer;
617    use common_meta::test_util::NoopPeerLookupService;
618    use common_time::util::current_time_millis;
619    use rand::Rng;
620    use store_api::storage::RegionId;
621    use tokio::sync::mpsc::Sender;
622    use tokio::sync::oneshot;
623    use tokio::time::sleep;
624
625    use super::RegionSupervisorSelector;
626    use crate::procedure::region_migration::manager::RegionMigrationManager;
627    use crate::procedure::region_migration::test_util::TestingEnv;
628    use crate::region::supervisor::{
629        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
630        RegionSupervisorTicker,
631    };
632    use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
633
634    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
635        let env = TestingEnv::new();
636        let selector_context = new_test_selector_context();
637        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
638        let context_factory = env.context_factory();
639        let region_migration_manager = Arc::new(RegionMigrationManager::new(
640            env.procedure_manager().clone(),
641            context_factory,
642        ));
643        let maintenance_mode_manager =
644            Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
645        let peer_lookup = Arc::new(NoopPeerLookupService);
646        let (tx, rx) = RegionSupervisor::channel();
647
648        (
649            RegionSupervisor::new(
650                rx,
651                Default::default(),
652                selector_context,
653                RegionSupervisorSelector::NaiveSelector(selector),
654                region_migration_manager,
655                maintenance_mode_manager,
656                peer_lookup,
657            ),
658            tx,
659        )
660    }
661
662    #[tokio::test]
663    async fn test_heartbeat() {
664        let (mut supervisor, sender) = new_test_supervisor();
665        tokio::spawn(async move { supervisor.run().await });
666
667        sender
668            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
669                datanode_id: 0,
670                regions: vec![RegionId::new(1, 1)],
671                timestamp: 100,
672            }))
673            .await
674            .unwrap();
675        let (tx, rx) = oneshot::channel();
676        sender.send(Event::Dump(tx)).await.unwrap();
677        let detector = rx.await.unwrap();
678        assert!(detector.contains(&(0, RegionId::new(1, 1))));
679
680        // Clear up
681        sender.send(Event::Clear).await.unwrap();
682        let (tx, rx) = oneshot::channel();
683        sender.send(Event::Dump(tx)).await.unwrap();
684        assert!(rx.await.unwrap().is_empty());
685
686        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
687            let mut rng = rand::rng();
688            let start = current_time_millis();
689            (0..2000)
690                .map(|i| DatanodeHeartbeat {
691                    timestamp: start + i * 1000 + rng.random_range(0..100),
692                    datanode_id,
693                    regions: region_ids
694                        .iter()
695                        .map(|number| RegionId::new(0, *number))
696                        .collect(),
697                })
698                .collect::<Vec<_>>()
699        }
700
701        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
702        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
703        for heartbeat in heartbeats {
704            sender
705                .send(Event::HeartbeatArrived(heartbeat))
706                .await
707                .unwrap();
708        }
709
710        let (tx, rx) = oneshot::channel();
711        sender.send(Event::Dump(tx)).await.unwrap();
712        let detector = rx.await.unwrap();
713        assert_eq!(detector.len(), 3);
714
715        for e in detector.iter() {
716            let fd = e.failure_detector();
717            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
718            let start = last_heartbeat_time;
719
720            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
721            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
722                let now = start + i * 1000;
723                assert_eq!(fd.phi(now), 0.0);
724            }
725
726            // ... then in less than two seconds, phi is above the threshold.
727            // The same effect can be seen in the diagrams in Akka's document.
728            let now = start + acceptable_heartbeat_pause_millis + 1000;
729            assert!(fd.phi(now) < fd.threshold() as _);
730            let now = start + acceptable_heartbeat_pause_millis + 2000;
731            assert!(fd.phi(now) > fd.threshold() as _);
732        }
733    }
734
735    #[tokio::test]
736    async fn test_supervisor_ticker() {
737        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
738        let ticker = RegionSupervisorTicker {
739            tick_handle: Mutex::new(None),
740            tick_interval: Duration::from_millis(10),
741            sender: tx,
742        };
743        // It's ok if we start the ticker again.
744        for _ in 0..2 {
745            ticker.start();
746            sleep(Duration::from_millis(100)).await;
747            ticker.stop();
748            assert!(!rx.is_empty());
749            while let Ok(event) = rx.try_recv() {
750                assert_matches!(event, Event::Tick | Event::Clear);
751            }
752        }
753    }
754
755    #[tokio::test]
756    async fn test_region_failure_detector_controller() {
757        let (mut supervisor, sender) = new_test_supervisor();
758        let controller = RegionFailureDetectorControl::new(sender.clone());
759        tokio::spawn(async move { supervisor.run().await });
760        let detecting_region = (1, RegionId::new(1, 1));
761        controller
762            .register_failure_detectors(vec![detecting_region])
763            .await;
764
765        let (tx, rx) = oneshot::channel();
766        sender.send(Event::Dump(tx)).await.unwrap();
767        let detector = rx.await.unwrap();
768        let region_detector = detector.region_failure_detector(detecting_region).clone();
769
770        // Registers failure detector again
771        controller
772            .register_failure_detectors(vec![detecting_region])
773            .await;
774        let (tx, rx) = oneshot::channel();
775        sender.send(Event::Dump(tx)).await.unwrap();
776        let detector = rx.await.unwrap();
777        let got = detector.region_failure_detector(detecting_region).clone();
778        assert_eq!(region_detector, got);
779
780        controller
781            .deregister_failure_detectors(vec![detecting_region])
782            .await;
783        let (tx, rx) = oneshot::channel();
784        sender.send(Event::Dump(tx)).await.unwrap();
785        assert!(rx.await.unwrap().is_empty());
786    }
787}