Skip to main content

meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47    RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
58/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
59/// and a timestamp indicating when the heartbeat was sent.
60#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62    datanode_id: DatanodeId,
63    // TODO(weny): Considers collecting the memtable size in regions.
64    regions: Vec<RegionId>,
65    timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69    fn from(value: &Stat) -> Self {
70        DatanodeHeartbeat {
71            datanode_id: value.id,
72            regions: value.region_stats.iter().map(|x| x.id).collect(),
73            timestamp: value.timestamp_millis,
74        }
75    }
76}
77
78/// `Event` represents various types of events that can be processed by the region supervisor.
79/// These events are crucial for managing state transitions and handling specific scenarios
80/// in the region lifecycle.
81///
82/// Variants:
83/// - `Tick`: This event is used to trigger region failure detection periodically.
84/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
85/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
86/// - `ResetFailureDetectors`: This event is used to reset failure detectors for regions.
87/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
88/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
89/// - `Clear`: This event is used to reset the state of the supervisor, typically used
90///   when a system-wide reset or reinitialization is needed.
91/// - `Dump`: (Available only in test) This event triggers a dump of the
92///   current state for debugging purposes. It allows developers to inspect the internal state
93///   of the supervisor during tests.
94pub(crate) enum Event {
95    Tick,
96    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
97    RegisterFailureDetectors(Vec<DetectingRegion>),
98    DeregisterFailureDetectors(Vec<DetectingRegion>),
99    ResetFailureDetectors(Vec<DetectingRegion>),
100    HeartbeatArrived(DatanodeHeartbeat),
101    Clear,
102    #[cfg(test)]
103    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
104}
105
106impl Debug for Event {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        match self {
109            Self::Tick => write!(f, "Tick"),
110            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
111            Self::Clear => write!(f, "Clear"),
112            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
113            Self::RegisterFailureDetectors(arg0) => f
114                .debug_tuple("RegisterFailureDetectors")
115                .field(arg0)
116                .finish(),
117            Self::ResetFailureDetectors(arg0) => {
118                f.debug_tuple("ResetFailureDetectors").field(arg0).finish()
119            }
120            Self::DeregisterFailureDetectors(arg0) => f
121                .debug_tuple("DeregisterFailureDetectors")
122                .field(arg0)
123                .finish(),
124            #[cfg(test)]
125            Self::Dump(_) => f.debug_struct("Dump").finish(),
126        }
127    }
128}
129
130pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
131
132/// A background job to generate [`Event::Tick`] type events.
133#[derive(Debug)]
134pub struct RegionSupervisorTicker {
135    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
136    tick_handle: Mutex<Option<JoinHandle<()>>>,
137
138    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
139    initialization_handle: Mutex<Option<JoinHandle<()>>>,
140
141    /// The interval of tick.
142    tick_interval: Duration,
143
144    /// The delay before initializing all region failure detectors.
145    initialization_delay: Duration,
146
147    /// The retry period for initializing all region failure detectors.
148    initialization_retry_period: Duration,
149
150    /// Sends [Event]s.
151    sender: Sender<Event>,
152}
153
154#[async_trait]
155impl LeadershipChangeListener for RegionSupervisorTicker {
156    fn name(&self) -> &'static str {
157        "RegionSupervisorTicker"
158    }
159
160    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
161        self.start();
162        Ok(())
163    }
164
165    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
166        self.stop();
167        Ok(())
168    }
169}
170
171impl RegionSupervisorTicker {
172    pub(crate) fn new(
173        tick_interval: Duration,
174        initialization_delay: Duration,
175        initialization_retry_period: Duration,
176        sender: Sender<Event>,
177    ) -> Self {
178        info!(
179            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
180            tick_interval, initialization_delay, initialization_retry_period
181        );
182        Self {
183            tick_handle: Mutex::new(None),
184            initialization_handle: Mutex::new(None),
185            tick_interval,
186            initialization_delay,
187            initialization_retry_period,
188            sender,
189        }
190    }
191
192    /// Starts the ticker.
193    pub fn start(&self) {
194        let mut handle = self.tick_handle.lock().unwrap();
195        if handle.is_none() {
196            let sender = self.sender.clone();
197            let tick_interval = self.tick_interval;
198            let initialization_delay = self.initialization_delay;
199
200            let mut initialization_interval = interval_at(
201                tokio::time::Instant::now() + initialization_delay,
202                self.initialization_retry_period,
203            );
204            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
205            let initialization_handler = common_runtime::spawn_global(async move {
206                loop {
207                    initialization_interval.tick().await;
208                    let (tx, rx) = oneshot::channel();
209                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
210                        info!(
211                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
212                        );
213                        break;
214                    }
215                    if rx.await.is_ok() {
216                        info!("All region failure detectors are initialized.");
217                        break;
218                    }
219                }
220            });
221            *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
222
223            let sender = self.sender.clone();
224            let ticker_loop = tokio::spawn(async move {
225                let mut tick_interval = interval(tick_interval);
226                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
227
228                if let Err(err) = sender.send(Event::Clear).await {
229                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
230                    return;
231                }
232                loop {
233                    tick_interval.tick().await;
234                    if sender.send(Event::Tick).await.is_err() {
235                        info!("EventReceiver is dropped, tick loop is stopped");
236                        break;
237                    }
238                }
239            });
240            *handle = Some(ticker_loop);
241        }
242    }
243
244    /// Stops the ticker.
245    pub fn stop(&self) {
246        let handle = self.tick_handle.lock().unwrap().take();
247        if let Some(handle) = handle {
248            handle.abort();
249            info!("The tick loop is stopped.");
250        }
251        let initialization_handler = self.initialization_handle.lock().unwrap().take();
252        if let Some(initialization_handler) = initialization_handler {
253            initialization_handler.abort();
254            info!("The initialization loop is stopped.");
255        }
256    }
257}
258
259impl Drop for RegionSupervisorTicker {
260    fn drop(&mut self) {
261        self.stop();
262    }
263}
264
265pub type RegionSupervisorRef = Arc<RegionSupervisor>;
266
267/// The default tick interval.
268pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
269/// The default initialization retry period.
270pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
271
272/// Selector for region supervisor.
273pub enum RegionSupervisorSelector {
274    NaiveSelector(SelectorRef),
275    RegionStatAwareSelector(RegionStatAwareSelectorRef),
276}
277
278/// The [`RegionSupervisor`] is used to detect Region failures
279/// and initiate Region failover upon detection, ensuring uninterrupted region service.
280pub struct RegionSupervisor {
281    /// Used to detect the failure of regions.
282    failure_detector: RegionFailureDetector,
283    /// Tracks the number of failovers for each region.
284    failover_counts: HashMap<DetectingRegion, u32>,
285    /// Receives [Event]s.
286    receiver: Receiver<Event>,
287    /// The context of [`SelectorRef`]
288    selector_context: SelectorContext,
289    /// Candidate node selector.
290    selector: RegionSupervisorSelector,
291    /// Region migration manager.
292    region_migration_manager: RegionMigrationManagerRef,
293    /// The maintenance mode manager.
294    runtime_switch_manager: RuntimeSwitchManagerRef,
295    /// Peer resolver
296    peer_resolver: PeerResolverRef,
297    /// The kv backend.
298    kv_backend: KvBackendRef,
299    /// The meta state, used to check if the current metasrv is the leader.
300    state: Option<StateRef>,
301}
302
303/// Controller for managing failure detectors for regions.
304#[derive(Debug, Clone)]
305pub struct RegionFailureDetectorControl {
306    sender: Sender<Event>,
307}
308
309impl RegionFailureDetectorControl {
310    pub(crate) fn new(sender: Sender<Event>) -> Self {
311        Self { sender }
312    }
313}
314
315#[async_trait::async_trait]
316impl RegionFailureDetectorController for RegionFailureDetectorControl {
317    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
318        if let Err(err) = self
319            .sender
320            .send(Event::RegisterFailureDetectors(detecting_regions))
321            .await
322        {
323            error!(err; "RegionSupervisor has stop receiving heartbeat.");
324        }
325    }
326
327    async fn reset_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
328        if let Err(err) = self
329            .sender
330            .send(Event::ResetFailureDetectors(detecting_regions))
331            .await
332        {
333            error!(err; "RegionSupervisor has stop receiving heartbeat.");
334        }
335    }
336
337    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
338        if let Err(err) = self
339            .sender
340            .send(Event::DeregisterFailureDetectors(detecting_regions))
341            .await
342        {
343            error!(err; "RegionSupervisor has stop receiving heartbeat.");
344        }
345    }
346}
347
348/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
349#[derive(Clone)]
350pub(crate) struct HeartbeatAcceptor {
351    sender: Sender<Event>,
352}
353
354impl HeartbeatAcceptor {
355    pub(crate) fn new(sender: Sender<Event>) -> Self {
356        Self { sender }
357    }
358
359    /// Accepts heartbeats from datanodes.
360    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
361        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
362            error!(err; "RegionSupervisor has stop receiving heartbeat.");
363        }
364    }
365}
366
367impl RegionSupervisor {
368    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
369    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
370        tokio::sync::mpsc::channel(1024)
371    }
372
373    #[allow(clippy::too_many_arguments)]
374    pub(crate) fn new(
375        event_receiver: Receiver<Event>,
376        options: PhiAccrualFailureDetectorOptions,
377        selector_context: SelectorContext,
378        selector: RegionSupervisorSelector,
379        region_migration_manager: RegionMigrationManagerRef,
380        runtime_switch_manager: RuntimeSwitchManagerRef,
381        peer_resolver: PeerResolverRef,
382        kv_backend: KvBackendRef,
383    ) -> Self {
384        Self {
385            failure_detector: RegionFailureDetector::new(options),
386            failover_counts: HashMap::new(),
387            receiver: event_receiver,
388            selector_context,
389            selector,
390            region_migration_manager,
391            runtime_switch_manager,
392            peer_resolver,
393            kv_backend,
394            state: None,
395        }
396    }
397
398    /// Sets the meta state.
399    pub(crate) fn with_state(mut self, state: StateRef) -> Self {
400        self.state = Some(state);
401        self
402    }
403
404    /// Runs the main loop.
405    pub(crate) async fn run(&mut self) {
406        while let Some(event) = self.receiver.recv().await {
407            if let Some(state) = self.state.as_ref()
408                && !state.read().unwrap().is_leader()
409            {
410                warn!(
411                    "The current metasrv is not the leader, ignore {:?} event",
412                    event
413                );
414                continue;
415            }
416
417            match event {
418                Event::InitializeAllRegions(sender) => {
419                    match self.is_maintenance_mode_enabled().await {
420                        Ok(false) => {}
421                        Ok(true) => {
422                            warn!(
423                                "Skipping initialize all regions since maintenance mode is enabled."
424                            );
425                            continue;
426                        }
427                        Err(err) => {
428                            error!(err; "Failed to check maintenance mode during initialize all regions.");
429                            continue;
430                        }
431                    }
432
433                    if let Err(err) = self.initialize_all().await {
434                        error!(err; "Failed to initialize all regions.");
435                    } else {
436                        // Ignore the error.
437                        let _ = sender.send(());
438                    }
439                }
440                Event::Tick => {
441                    let regions = self.detect_region_failure();
442                    self.handle_region_failures(regions).await;
443                }
444                Event::RegisterFailureDetectors(detecting_regions) => {
445                    self.register_failure_detectors(detecting_regions).await
446                }
447                Event::ResetFailureDetectors(detecting_regions) => {
448                    self.reset_failure_detectors(detecting_regions).await
449                }
450                Event::DeregisterFailureDetectors(detecting_regions) => {
451                    self.deregister_failure_detectors(detecting_regions).await
452                }
453                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
454                Event::Clear => {
455                    self.clear();
456                    info!("Region supervisor is initialized.");
457                }
458                #[cfg(test)]
459                Event::Dump(sender) => {
460                    let _ = sender.send(self.failure_detector.dump());
461                }
462            }
463        }
464        info!("RegionSupervisor is stopped!");
465    }
466
467    async fn initialize_all(&self) -> Result<()> {
468        let now = Instant::now();
469        let regions = self.regions();
470        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
471        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
472            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
473        })
474        .into_stream();
475
476        let mut stream = stream
477            .map_ok(|(_, value)| {
478                TableRouteValue::try_from_raw_value(&value)
479                    .context(error::TableMetadataManagerSnafu)
480            })
481            .boxed();
482        let mut detecting_regions = Vec::new();
483        while let Some(route) = stream
484            .try_next()
485            .await
486            .context(error::TableMetadataManagerSnafu)?
487        {
488            let route = route?;
489            if !route.is_physical() {
490                continue;
491            }
492
493            let physical_table_route = route.into_physical_table_route();
494            physical_table_route
495                .region_routes
496                .iter()
497                .for_each(|region_route| {
498                    if !regions.contains(&region_route.region.id)
499                        && let Some(leader_peer) = &region_route.leader_peer
500                    {
501                        detecting_regions.push((leader_peer.id, region_route.region.id));
502                    }
503                });
504        }
505
506        let num_detecting_regions = detecting_regions.len();
507        if !detecting_regions.is_empty() {
508            self.register_failure_detectors(detecting_regions).await;
509        }
510
511        info!(
512            "Initialize {} region failure detectors, elapsed: {:?}",
513            num_detecting_regions,
514            now.elapsed()
515        );
516
517        Ok(())
518    }
519
520    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
521        let ts_millis = current_time_millis();
522        for region in detecting_regions {
523            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
524            self.failure_detector
525                .maybe_init_region_failure_detector(region, ts_millis);
526        }
527    }
528
529    async fn reset_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
530        let ts_millis = current_time_millis();
531        for region in detecting_regions {
532            self.failure_detector
533                .reset_region_failure_detector(region, ts_millis);
534        }
535    }
536
537    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
538        for region in detecting_regions {
539            self.failure_detector.remove(&region);
540            self.failover_counts.remove(&region);
541        }
542    }
543
544    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
545        if regions.is_empty() {
546            return;
547        }
548        match self.is_maintenance_mode_enabled().await {
549            Ok(false) => {}
550            Ok(true) => {
551                warn!(
552                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
553                    regions
554                );
555                return;
556            }
557            Err(err) => {
558                error!(err; "Failed to check maintenance mode");
559                return;
560            }
561        }
562
563        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
564        let migrating_regions = regions
565            .extract_if(.., |(_, region_id)| {
566                self.region_migration_manager.tracker().contains(*region_id)
567            })
568            .collect::<Vec<_>>();
569
570        for (datanode_id, region_id) in migrating_regions {
571            debug!(
572                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
573            );
574        }
575
576        if regions.is_empty() {
577            // If all detected regions are failover or migrating, just return.
578            return;
579        }
580
581        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
582            HashMap::with_capacity(regions.len());
583        for (datanode_id, region_id) in regions {
584            grouped_regions
585                .entry(datanode_id)
586                .or_default()
587                .push(region_id);
588        }
589
590        for (datanode_id, regions) in grouped_regions {
591            warn!(
592                "Detects region failures on datanode: {}, regions: {:?}",
593                datanode_id, regions
594            );
595            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
596            // because there may be false positives in failure detection on the datanode.
597            // So we only consider the datanode that reports the failure.
598            let failed_datanodes = [datanode_id];
599            match self
600                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
601                .await
602            {
603                Ok(tasks) => {
604                    let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
605                    for (task, count) in tasks {
606                        grouped_tasks
607                            .entry((task.from_peer.id, task.to_peer.id))
608                            .or_default()
609                            .push((task, count));
610                    }
611
612                    for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
613                        if tasks.is_empty() {
614                            continue;
615                        }
616                        let task = RegionMigrationTaskBatch::from_tasks(tasks);
617                        let region_ids = task.region_ids.clone();
618                        if let Err(err) = self.do_failover_tasks(task).await {
619                            error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
620                        }
621                    }
622                }
623                Err(err) => error!(err; "Failed to generate failover tasks"),
624            }
625        }
626    }
627
628    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
629        self.runtime_switch_manager
630            .maintenance_mode()
631            .await
632            .context(error::RuntimeSwitchManagerSnafu)
633    }
634
635    async fn select_peers(
636        &self,
637        from_peer_id: DatanodeId,
638        regions: &[RegionId],
639        failure_datanodes: &[DatanodeId],
640    ) -> Result<Vec<(RegionId, Peer)>> {
641        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
642        match &self.selector {
643            RegionSupervisorSelector::NaiveSelector(selector) => {
644                let opt = SelectorOptions {
645                    min_required_items: regions.len(),
646                    allow_duplication: true,
647                    exclude_peer_ids,
648                    workload_filter: Some(accept_ingest_workload),
649                    extensions: Default::default(),
650                };
651                let peers = selector.select(&self.selector_context, opt).await?;
652                ensure!(
653                    peers.len() == regions.len(),
654                    error::NoEnoughAvailableNodeSnafu {
655                        required: regions.len(),
656                        available: peers.len(),
657                        select_target: SelectTarget::Datanode,
658                    }
659                );
660                let region_peers = regions
661                    .iter()
662                    .zip(peers)
663                    .map(|(region_id, peer)| (*region_id, peer))
664                    .collect::<Vec<_>>();
665
666                Ok(region_peers)
667            }
668            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
669                let peers = selector
670                    .select(
671                        &self.selector_context,
672                        from_peer_id,
673                        regions,
674                        exclude_peer_ids,
675                    )
676                    .await?;
677                ensure!(
678                    peers.len() == regions.len(),
679                    error::NoEnoughAvailableNodeSnafu {
680                        required: regions.len(),
681                        available: peers.len(),
682                        select_target: SelectTarget::Datanode,
683                    }
684                );
685
686                Ok(peers)
687            }
688        }
689    }
690
691    async fn generate_failover_tasks(
692        &mut self,
693        from_peer_id: DatanodeId,
694        regions: &[RegionId],
695        failed_datanodes: &[DatanodeId],
696    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
697        let mut tasks = Vec::with_capacity(regions.len());
698        let from_peer = self
699            .peer_resolver
700            .datanode(from_peer_id)
701            .await
702            .ok()
703            .flatten()
704            .unwrap_or_else(|| Peer::empty(from_peer_id));
705
706        let region_peers = self
707            .select_peers(from_peer_id, regions, failed_datanodes)
708            .await?;
709
710        for (region_id, peer) in region_peers {
711            let count = *self
712                .failover_counts
713                .entry((from_peer_id, region_id))
714                .and_modify(|count| *count += 1)
715                .or_insert(1);
716            let task = RegionMigrationProcedureTask {
717                region_id,
718                from_peer: from_peer.clone(),
719                to_peer: peer,
720                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
721                trigger_reason: RegionMigrationTriggerReason::Failover,
722            };
723            tasks.push((task, count));
724        }
725
726        Ok(tasks)
727    }
728
729    async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
730        let from_peer_id = task.from_peer.id;
731        let to_peer_id = task.to_peer.id;
732        let timeout = task.timeout;
733        let trigger_reason = task.trigger_reason;
734        let result = self
735            .region_migration_manager
736            .submit_region_migration_task(task)
737            .await?;
738        self.handle_submit_region_migration_task_result(
739            from_peer_id,
740            to_peer_id,
741            timeout,
742            trigger_reason,
743            result,
744        )
745        .await
746    }
747
748    async fn handle_submit_region_migration_task_result(
749        &mut self,
750        from_peer_id: DatanodeId,
751        to_peer_id: DatanodeId,
752        timeout: Duration,
753        trigger_reason: RegionMigrationTriggerReason,
754        result: SubmitRegionMigrationTaskResult,
755    ) -> Result<()> {
756        if !result.migrated.is_empty() {
757            let detecting_regions = result
758                .migrated
759                .iter()
760                .map(|region_id| (from_peer_id, *region_id))
761                .collect::<Vec<_>>();
762            self.deregister_failure_detectors(detecting_regions).await;
763            info!(
764                "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
765                to_peer_id, result.migrated,
766            )
767        }
768        if !result.migrating.is_empty() {
769            info!(
770                "Region is still migrating, skipping failover for regions: {:?}",
771                result.migrating
772            );
773        }
774        if !result.region_not_found.is_empty() {
775            let detecting_regions = result
776                .region_not_found
777                .iter()
778                .map(|region_id| (from_peer_id, *region_id))
779                .collect::<Vec<_>>();
780            self.deregister_failure_detectors(detecting_regions).await;
781            info!(
782                "Region route not found, removed failover detectors for regions: {:?}",
783                result.region_not_found
784            );
785        }
786        if !result.table_not_found.is_empty() {
787            let detecting_regions = result
788                .table_not_found
789                .iter()
790                .map(|region_id| (from_peer_id, *region_id))
791                .collect::<Vec<_>>();
792            self.deregister_failure_detectors(detecting_regions).await;
793            info!(
794                "Table is not found, removed failover detectors for regions: {:?}",
795                result.table_not_found
796            );
797        }
798        if !result.leader_changed.is_empty() {
799            let detecting_regions = result
800                .leader_changed
801                .iter()
802                .map(|region_id| (from_peer_id, *region_id))
803                .collect::<Vec<_>>();
804            self.deregister_failure_detectors(detecting_regions).await;
805            info!(
806                "Region's leader peer changed, removed failover detectors for regions: {:?}",
807                result.leader_changed
808            );
809        }
810        if !result.peer_conflict.is_empty() {
811            info!(
812                "Region has peer conflict, ignore failover for regions: {:?}",
813                result.peer_conflict
814            );
815        }
816        if !result.submitted.is_empty() {
817            info!(
818                "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
819                result.submitted,
820                from_peer_id,
821                to_peer_id,
822                result.procedure_id,
823                timeout,
824                trigger_reason,
825            );
826        }
827
828        Ok(())
829    }
830
831    /// Detects the failure of regions.
832    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
833        self.failure_detector
834            .iter()
835            .filter_map(|e| {
836                // Intentionally not place `current_time_millis()` out of the iteration.
837                // The failure detection determination should be happened "just in time",
838                // i.e., failed or not has to be compared with the most recent "now".
839                // Besides, it might reduce the false positive of failure detection,
840                // because during the iteration, heartbeats are coming in as usual,
841                // and the `phi`s are still updating.
842                if !e.failure_detector().is_available(current_time_millis()) {
843                    Some(*e.region_ident())
844                } else {
845                    None
846                }
847            })
848            .collect::<Vec<_>>()
849    }
850
851    /// Returns all regions that registered in the failure detector.
852    fn regions(&self) -> HashSet<RegionId> {
853        self.failure_detector
854            .iter()
855            .map(|e| e.region_ident().1)
856            .collect::<HashSet<_>>()
857    }
858
859    /// Updates the state of corresponding failure detectors.
860    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
861        for region_id in heartbeat.regions {
862            let detecting_region = (heartbeat.datanode_id, region_id);
863            let mut detector = self
864                .failure_detector
865                .region_failure_detector(detecting_region);
866            detector.heartbeat(heartbeat.timestamp);
867        }
868    }
869
870    fn clear(&self) {
871        self.failure_detector.clear();
872    }
873}
874
875#[cfg(test)]
876pub(crate) mod tests {
877    use std::assert_matches;
878    use std::collections::HashMap;
879    use std::sync::{Arc, Mutex};
880    use std::time::Duration;
881
882    use common_meta::ddl::RegionFailureDetectorController;
883    use common_meta::ddl::test_util::{
884        test_create_logical_table_task, test_create_physical_table_task,
885    };
886    use common_meta::key::table_route::{
887        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
888    };
889    use common_meta::key::{TableMetadataManager, runtime_switch};
890    use common_meta::peer::Peer;
891    use common_meta::rpc::router::{Region, RegionRoute};
892    use common_meta::test_util::NoopPeerResolver;
893    use common_telemetry::info;
894    use common_time::util::current_time_millis;
895    use rand::Rng;
896    use store_api::storage::RegionId;
897    use tokio::sync::mpsc::Sender;
898    use tokio::sync::oneshot;
899    use tokio::time::sleep;
900
901    use super::RegionSupervisorSelector;
902    use crate::procedure::region_migration::RegionMigrationTriggerReason;
903    use crate::procedure::region_migration::manager::{
904        RegionMigrationManager, SubmitRegionMigrationTaskResult,
905    };
906    use crate::procedure::region_migration::test_util::TestingEnv;
907    use crate::region::supervisor::{
908        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
909        RegionSupervisorTicker,
910    };
911    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
912
913    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
914        let env = TestingEnv::new();
915        let selector_context = new_test_selector_context();
916        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
917        let context_factory = env.context_factory();
918        let region_migration_manager = Arc::new(RegionMigrationManager::new(
919            env.procedure_manager().clone(),
920            context_factory,
921        ));
922        let runtime_switch_manager =
923            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
924        let peer_resolver = Arc::new(NoopPeerResolver);
925        let (tx, rx) = RegionSupervisor::channel();
926        let kv_backend = env.kv_backend();
927
928        (
929            RegionSupervisor::new(
930                rx,
931                Default::default(),
932                selector_context,
933                RegionSupervisorSelector::NaiveSelector(selector),
934                region_migration_manager,
935                runtime_switch_manager,
936                peer_resolver,
937                kv_backend,
938            ),
939            tx,
940        )
941    }
942
943    #[tokio::test]
944    async fn test_heartbeat() {
945        let (mut supervisor, sender) = new_test_supervisor();
946        tokio::spawn(async move { supervisor.run().await });
947
948        sender
949            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
950                datanode_id: 0,
951                regions: vec![RegionId::new(1, 1)],
952                timestamp: 100,
953            }))
954            .await
955            .unwrap();
956        let (tx, rx) = oneshot::channel();
957        sender.send(Event::Dump(tx)).await.unwrap();
958        let detector = rx.await.unwrap();
959        assert!(detector.contains(&(0, RegionId::new(1, 1))));
960
961        // Clear up
962        sender.send(Event::Clear).await.unwrap();
963        let (tx, rx) = oneshot::channel();
964        sender.send(Event::Dump(tx)).await.unwrap();
965        assert!(rx.await.unwrap().is_empty());
966
967        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
968            let mut rng = rand::rng();
969            let start = current_time_millis();
970            (0..2000)
971                .map(|i| DatanodeHeartbeat {
972                    timestamp: start + i * 1000 + rng.random_range(0..100),
973                    datanode_id,
974                    regions: region_ids
975                        .iter()
976                        .map(|number| RegionId::new(0, *number))
977                        .collect(),
978                })
979                .collect::<Vec<_>>()
980        }
981
982        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
983        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
984        for heartbeat in heartbeats {
985            sender
986                .send(Event::HeartbeatArrived(heartbeat))
987                .await
988                .unwrap();
989        }
990
991        let (tx, rx) = oneshot::channel();
992        sender.send(Event::Dump(tx)).await.unwrap();
993        let detector = rx.await.unwrap();
994        assert_eq!(detector.len(), 3);
995
996        for e in detector.iter() {
997            let fd = e.failure_detector();
998            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
999            let start = last_heartbeat_time;
1000
1001            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
1002            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
1003                let now = start + i * 1000;
1004                assert_eq!(fd.phi(now), 0.0);
1005            }
1006
1007            // ... then in less than two seconds, phi is above the threshold.
1008            // The same effect can be seen in the diagrams in Akka's document.
1009            let now = start + acceptable_heartbeat_pause_millis + 1000;
1010            assert!(fd.phi(now) < fd.threshold() as _);
1011            let now = start + acceptable_heartbeat_pause_millis + 2000;
1012            assert!(fd.phi(now) > fd.threshold() as _);
1013        }
1014    }
1015
1016    #[tokio::test]
1017    async fn test_supervisor_ticker() {
1018        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1019        let ticker = RegionSupervisorTicker {
1020            tick_handle: Mutex::new(None),
1021            initialization_handle: Mutex::new(None),
1022            tick_interval: Duration::from_millis(10),
1023            initialization_delay: Duration::from_millis(100),
1024            initialization_retry_period: Duration::from_millis(100),
1025            sender: tx,
1026        };
1027        // It's ok if we start the ticker again.
1028        for _ in 0..2 {
1029            ticker.start();
1030            sleep(Duration::from_millis(100)).await;
1031            ticker.stop();
1032            assert!(!rx.is_empty());
1033            while let Ok(event) = rx.try_recv() {
1034                assert_matches!(
1035                    event,
1036                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
1037                );
1038            }
1039            assert!(ticker.initialization_handle.lock().unwrap().is_none());
1040            assert!(ticker.tick_handle.lock().unwrap().is_none());
1041        }
1042    }
1043
1044    #[tokio::test]
1045    async fn test_initialize_all_regions_event_handling() {
1046        common_telemetry::init_default_ut_logging();
1047        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1048        let ticker = RegionSupervisorTicker {
1049            tick_handle: Mutex::new(None),
1050            initialization_handle: Mutex::new(None),
1051            tick_interval: Duration::from_millis(1000),
1052            initialization_delay: Duration::from_millis(50),
1053            initialization_retry_period: Duration::from_millis(50),
1054            sender: tx,
1055        };
1056        ticker.start();
1057        sleep(Duration::from_millis(60)).await;
1058        let handle = tokio::spawn(async move {
1059            let mut counter = 0;
1060            while let Some(event) = rx.recv().await {
1061                if let Event::InitializeAllRegions(tx) = event {
1062                    if counter == 0 {
1063                        // Ignore the first event
1064                        counter += 1;
1065                        continue;
1066                    }
1067                    tx.send(()).unwrap();
1068                    info!("Responded initialize all regions event");
1069                    break;
1070                }
1071            }
1072            rx
1073        });
1074
1075        let rx = handle.await.unwrap();
1076        for _ in 0..3 {
1077            sleep(Duration::from_millis(100)).await;
1078            assert!(rx.is_empty());
1079        }
1080    }
1081
1082    #[tokio::test]
1083    async fn test_initialize_all_regions() {
1084        common_telemetry::init_default_ut_logging();
1085        let (mut supervisor, sender) = new_test_supervisor();
1086        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1087
1088        // Create a physical table metadata
1089        let table_id = 1024;
1090        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1091        create_physical_table_task.set_table_id(table_id);
1092        let table_info = create_physical_table_task.table_info;
1093        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1094            region: Region {
1095                id: RegionId::new(table_id, 0),
1096                ..Default::default()
1097            },
1098            leader_peer: Some(Peer::empty(1)),
1099            ..Default::default()
1100        }]);
1101        let table_route_value = TableRouteValue::Physical(table_route);
1102        table_metadata_manager
1103            .create_table_metadata(table_info, table_route_value, HashMap::new())
1104            .await
1105            .unwrap();
1106
1107        // Create a logical table metadata
1108        let logical_table_id = 1025;
1109        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1110        test_create_logical_table_task.set_table_id(logical_table_id);
1111        let table_info = test_create_logical_table_task.table_info;
1112        let table_route = LogicalTableRouteValue::new(1024);
1113        let table_route_value = TableRouteValue::Logical(table_route);
1114        table_metadata_manager
1115            .create_table_metadata(table_info, table_route_value, HashMap::new())
1116            .await
1117            .unwrap();
1118        tokio::spawn(async move { supervisor.run().await });
1119        let (tx, rx) = oneshot::channel();
1120        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1121        assert!(rx.await.is_ok());
1122
1123        let (tx, rx) = oneshot::channel();
1124        sender.send(Event::Dump(tx)).await.unwrap();
1125        let detector = rx.await.unwrap();
1126        assert_eq!(detector.len(), 1);
1127        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1128    }
1129
1130    #[tokio::test]
1131    async fn test_initialize_all_regions_with_maintenance_mode() {
1132        common_telemetry::init_default_ut_logging();
1133        let (mut supervisor, sender) = new_test_supervisor();
1134
1135        supervisor
1136            .runtime_switch_manager
1137            .set_maintenance_mode()
1138            .await
1139            .unwrap();
1140        tokio::spawn(async move { supervisor.run().await });
1141        let (tx, rx) = oneshot::channel();
1142        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1143        // The sender is dropped, so the receiver will receive an error.
1144        assert!(rx.await.is_err());
1145    }
1146
1147    #[tokio::test]
1148    async fn test_region_failure_detector_controller() {
1149        let (mut supervisor, sender) = new_test_supervisor();
1150        let controller = RegionFailureDetectorControl::new(sender.clone());
1151        tokio::spawn(async move { supervisor.run().await });
1152        let detecting_region = (1, RegionId::new(1, 1));
1153        controller
1154            .register_failure_detectors(vec![detecting_region])
1155            .await;
1156
1157        let (tx, rx) = oneshot::channel();
1158        sender.send(Event::Dump(tx)).await.unwrap();
1159        let detector = rx.await.unwrap();
1160        let region_detector = detector.region_failure_detector(detecting_region).clone();
1161
1162        // Registers failure detector again
1163        controller
1164            .register_failure_detectors(vec![detecting_region])
1165            .await;
1166        let (tx, rx) = oneshot::channel();
1167        sender.send(Event::Dump(tx)).await.unwrap();
1168        let detector = rx.await.unwrap();
1169        let got = detector.region_failure_detector(detecting_region).clone();
1170        assert_eq!(region_detector, got);
1171
1172        controller
1173            .deregister_failure_detectors(vec![detecting_region])
1174            .await;
1175        let (tx, rx) = oneshot::channel();
1176        sender.send(Event::Dump(tx)).await.unwrap();
1177        assert!(rx.await.unwrap().is_empty());
1178    }
1179
1180    #[tokio::test]
1181    async fn test_handle_submit_region_migration_task_result_migrated() {
1182        common_telemetry::init_default_ut_logging();
1183        let (mut supervisor, _) = new_test_supervisor();
1184        let region_id = RegionId::new(1, 1);
1185        let detecting_region = (1, region_id);
1186        supervisor
1187            .register_failure_detectors(vec![detecting_region])
1188            .await;
1189        supervisor.failover_counts.insert(detecting_region, 1);
1190        let result = SubmitRegionMigrationTaskResult {
1191            migrated: vec![region_id],
1192            ..Default::default()
1193        };
1194        supervisor
1195            .handle_submit_region_migration_task_result(
1196                1,
1197                2,
1198                Duration::from_millis(1000),
1199                RegionMigrationTriggerReason::Manual,
1200                result,
1201            )
1202            .await
1203            .unwrap();
1204        assert!(!supervisor.failure_detector.contains(&detecting_region));
1205        assert!(supervisor.failover_counts.is_empty());
1206    }
1207
1208    #[tokio::test]
1209    async fn test_handle_submit_region_migration_task_result_migrating() {
1210        common_telemetry::init_default_ut_logging();
1211        let (mut supervisor, _) = new_test_supervisor();
1212        let region_id = RegionId::new(1, 1);
1213        let detecting_region = (1, region_id);
1214        supervisor
1215            .register_failure_detectors(vec![detecting_region])
1216            .await;
1217        supervisor.failover_counts.insert(detecting_region, 1);
1218        let result = SubmitRegionMigrationTaskResult {
1219            migrating: vec![region_id],
1220            ..Default::default()
1221        };
1222        supervisor
1223            .handle_submit_region_migration_task_result(
1224                1,
1225                2,
1226                Duration::from_millis(1000),
1227                RegionMigrationTriggerReason::Manual,
1228                result,
1229            )
1230            .await
1231            .unwrap();
1232        assert!(supervisor.failure_detector.contains(&detecting_region));
1233        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_handle_submit_region_migration_task_result_table_not_found() {
1238        common_telemetry::init_default_ut_logging();
1239        let (mut supervisor, _) = new_test_supervisor();
1240        let region_id = RegionId::new(1, 1);
1241        let detecting_region = (1, region_id);
1242        supervisor
1243            .register_failure_detectors(vec![detecting_region])
1244            .await;
1245        supervisor.failover_counts.insert(detecting_region, 1);
1246        let result = SubmitRegionMigrationTaskResult {
1247            table_not_found: vec![region_id],
1248            ..Default::default()
1249        };
1250        supervisor
1251            .handle_submit_region_migration_task_result(
1252                1,
1253                2,
1254                Duration::from_millis(1000),
1255                RegionMigrationTriggerReason::Manual,
1256                result,
1257            )
1258            .await
1259            .unwrap();
1260        assert!(!supervisor.failure_detector.contains(&detecting_region));
1261        assert!(supervisor.failover_counts.is_empty());
1262    }
1263
1264    #[tokio::test]
1265    async fn test_handle_submit_region_migration_task_result_region_not_found() {
1266        common_telemetry::init_default_ut_logging();
1267        let (mut supervisor, _) = new_test_supervisor();
1268        let region_id = RegionId::new(1, 1);
1269        let detecting_region = (1, region_id);
1270        supervisor
1271            .register_failure_detectors(vec![detecting_region])
1272            .await;
1273        supervisor.failover_counts.insert(detecting_region, 1);
1274        let result = SubmitRegionMigrationTaskResult {
1275            region_not_found: vec![region_id],
1276            ..Default::default()
1277        };
1278        supervisor
1279            .handle_submit_region_migration_task_result(
1280                1,
1281                2,
1282                Duration::from_millis(1000),
1283                RegionMigrationTriggerReason::Manual,
1284                result,
1285            )
1286            .await
1287            .unwrap();
1288        assert!(!supervisor.failure_detector.contains(&detecting_region));
1289        assert!(supervisor.failover_counts.is_empty());
1290    }
1291
1292    #[tokio::test]
1293    async fn test_handle_submit_region_migration_task_result_leader_changed() {
1294        common_telemetry::init_default_ut_logging();
1295        let (mut supervisor, _) = new_test_supervisor();
1296        let region_id = RegionId::new(1, 1);
1297        let detecting_region = (1, region_id);
1298        supervisor
1299            .register_failure_detectors(vec![detecting_region])
1300            .await;
1301        supervisor.failover_counts.insert(detecting_region, 1);
1302        let result = SubmitRegionMigrationTaskResult {
1303            leader_changed: vec![region_id],
1304            ..Default::default()
1305        };
1306        supervisor
1307            .handle_submit_region_migration_task_result(
1308                1,
1309                2,
1310                Duration::from_millis(1000),
1311                RegionMigrationTriggerReason::Manual,
1312                result,
1313            )
1314            .await
1315            .unwrap();
1316        assert!(!supervisor.failure_detector.contains(&detecting_region));
1317        assert!(supervisor.failover_counts.is_empty());
1318    }
1319
1320    #[tokio::test]
1321    async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1322        common_telemetry::init_default_ut_logging();
1323        let (mut supervisor, _) = new_test_supervisor();
1324        let region_id = RegionId::new(1, 1);
1325        let detecting_region = (1, region_id);
1326        supervisor
1327            .register_failure_detectors(vec![detecting_region])
1328            .await;
1329        supervisor.failover_counts.insert(detecting_region, 1);
1330        let result = SubmitRegionMigrationTaskResult {
1331            peer_conflict: vec![region_id],
1332            ..Default::default()
1333        };
1334        supervisor
1335            .handle_submit_region_migration_task_result(
1336                1,
1337                2,
1338                Duration::from_millis(1000),
1339                RegionMigrationTriggerReason::Manual,
1340                result,
1341            )
1342            .await
1343            .unwrap();
1344        assert!(supervisor.failure_detector.contains(&detecting_region));
1345        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1346    }
1347
1348    #[tokio::test]
1349    async fn test_handle_submit_region_migration_task_result_submitted() {
1350        common_telemetry::init_default_ut_logging();
1351        let (mut supervisor, _) = new_test_supervisor();
1352        let region_id = RegionId::new(1, 1);
1353        let detecting_region = (1, region_id);
1354        supervisor
1355            .register_failure_detectors(vec![detecting_region])
1356            .await;
1357        supervisor.failover_counts.insert(detecting_region, 1);
1358        let result = SubmitRegionMigrationTaskResult {
1359            submitted: vec![region_id],
1360            ..Default::default()
1361        };
1362        supervisor
1363            .handle_submit_region_migration_task_result(
1364                1,
1365                2,
1366                Duration::from_millis(1000),
1367                RegionMigrationTriggerReason::Manual,
1368                result,
1369            )
1370            .await
1371            .unwrap();
1372        assert!(supervisor.failure_detector.contains(&detecting_region));
1373        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1374    }
1375}