meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47    RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
58/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
59/// and a timestamp indicating when the heartbeat was sent.
60#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62    datanode_id: DatanodeId,
63    // TODO(weny): Considers collecting the memtable size in regions.
64    regions: Vec<RegionId>,
65    timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69    fn from(value: &Stat) -> Self {
70        DatanodeHeartbeat {
71            datanode_id: value.id,
72            regions: value.region_stats.iter().map(|x| x.id).collect(),
73            timestamp: value.timestamp_millis,
74        }
75    }
76}
77
78/// `Event` represents various types of events that can be processed by the region supervisor.
79/// These events are crucial for managing state transitions and handling specific scenarios
80/// in the region lifecycle.
81///
82/// Variants:
83/// - `Tick`: This event is used to trigger region failure detection periodically.
84/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
85/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
86/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
87/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
88/// - `Clear`: This event is used to reset the state of the supervisor, typically used
89///   when a system-wide reset or reinitialization is needed.
90/// - `Dump`: (Available only in test) This event triggers a dump of the
91///   current state for debugging purposes. It allows developers to inspect the internal state
92///   of the supervisor during tests.
93pub(crate) enum Event {
94    Tick,
95    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
96    RegisterFailureDetectors(Vec<DetectingRegion>),
97    DeregisterFailureDetectors(Vec<DetectingRegion>),
98    HeartbeatArrived(DatanodeHeartbeat),
99    Clear,
100    #[cfg(test)]
101    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
102}
103
104impl Debug for Event {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Tick => write!(f, "Tick"),
108            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
109            Self::Clear => write!(f, "Clear"),
110            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
111            Self::RegisterFailureDetectors(arg0) => f
112                .debug_tuple("RegisterFailureDetectors")
113                .field(arg0)
114                .finish(),
115            Self::DeregisterFailureDetectors(arg0) => f
116                .debug_tuple("DeregisterFailureDetectors")
117                .field(arg0)
118                .finish(),
119            #[cfg(test)]
120            Self::Dump(_) => f.debug_struct("Dump").finish(),
121        }
122    }
123}
124
125pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
126
127/// A background job to generate [`Event::Tick`] type events.
128#[derive(Debug)]
129pub struct RegionSupervisorTicker {
130    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
131    tick_handle: Mutex<Option<JoinHandle<()>>>,
132
133    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
134    initialization_handle: Mutex<Option<JoinHandle<()>>>,
135
136    /// The interval of tick.
137    tick_interval: Duration,
138
139    /// The delay before initializing all region failure detectors.
140    initialization_delay: Duration,
141
142    /// The retry period for initializing all region failure detectors.
143    initialization_retry_period: Duration,
144
145    /// Sends [Event]s.
146    sender: Sender<Event>,
147}
148
149#[async_trait]
150impl LeadershipChangeListener for RegionSupervisorTicker {
151    fn name(&self) -> &'static str {
152        "RegionSupervisorTicker"
153    }
154
155    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
156        self.start();
157        Ok(())
158    }
159
160    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
161        self.stop();
162        Ok(())
163    }
164}
165
166impl RegionSupervisorTicker {
167    pub(crate) fn new(
168        tick_interval: Duration,
169        initialization_delay: Duration,
170        initialization_retry_period: Duration,
171        sender: Sender<Event>,
172    ) -> Self {
173        info!(
174            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
175            tick_interval, initialization_delay, initialization_retry_period
176        );
177        Self {
178            tick_handle: Mutex::new(None),
179            initialization_handle: Mutex::new(None),
180            tick_interval,
181            initialization_delay,
182            initialization_retry_period,
183            sender,
184        }
185    }
186
187    /// Starts the ticker.
188    pub fn start(&self) {
189        let mut handle = self.tick_handle.lock().unwrap();
190        if handle.is_none() {
191            let sender = self.sender.clone();
192            let tick_interval = self.tick_interval;
193            let initialization_delay = self.initialization_delay;
194
195            let mut initialization_interval = interval_at(
196                tokio::time::Instant::now() + initialization_delay,
197                self.initialization_retry_period,
198            );
199            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200            let initialization_handler = common_runtime::spawn_global(async move {
201                loop {
202                    initialization_interval.tick().await;
203                    let (tx, rx) = oneshot::channel();
204                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
205                        info!(
206                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
207                        );
208                        break;
209                    }
210                    if rx.await.is_ok() {
211                        info!("All region failure detectors are initialized.");
212                        break;
213                    }
214                }
215            });
216            *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
217
218            let sender = self.sender.clone();
219            let ticker_loop = tokio::spawn(async move {
220                let mut tick_interval = interval(tick_interval);
221                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222
223                if let Err(err) = sender.send(Event::Clear).await {
224                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
225                    return;
226                }
227                loop {
228                    tick_interval.tick().await;
229                    if sender.send(Event::Tick).await.is_err() {
230                        info!("EventReceiver is dropped, tick loop is stopped");
231                        break;
232                    }
233                }
234            });
235            *handle = Some(ticker_loop);
236        }
237    }
238
239    /// Stops the ticker.
240    pub fn stop(&self) {
241        let handle = self.tick_handle.lock().unwrap().take();
242        if let Some(handle) = handle {
243            handle.abort();
244            info!("The tick loop is stopped.");
245        }
246        let initialization_handler = self.initialization_handle.lock().unwrap().take();
247        if let Some(initialization_handler) = initialization_handler {
248            initialization_handler.abort();
249            info!("The initialization loop is stopped.");
250        }
251    }
252}
253
254impl Drop for RegionSupervisorTicker {
255    fn drop(&mut self) {
256        self.stop();
257    }
258}
259
260pub type RegionSupervisorRef = Arc<RegionSupervisor>;
261
262/// The default tick interval.
263pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
264/// The default initialization retry period.
265pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
266
267/// Selector for region supervisor.
268pub enum RegionSupervisorSelector {
269    NaiveSelector(SelectorRef),
270    RegionStatAwareSelector(RegionStatAwareSelectorRef),
271}
272
273/// The [`RegionSupervisor`] is used to detect Region failures
274/// and initiate Region failover upon detection, ensuring uninterrupted region service.
275pub struct RegionSupervisor {
276    /// Used to detect the failure of regions.
277    failure_detector: RegionFailureDetector,
278    /// Tracks the number of failovers for each region.
279    failover_counts: HashMap<DetectingRegion, u32>,
280    /// Receives [Event]s.
281    receiver: Receiver<Event>,
282    /// The context of [`SelectorRef`]
283    selector_context: SelectorContext,
284    /// Candidate node selector.
285    selector: RegionSupervisorSelector,
286    /// Region migration manager.
287    region_migration_manager: RegionMigrationManagerRef,
288    /// The maintenance mode manager.
289    runtime_switch_manager: RuntimeSwitchManagerRef,
290    /// Peer resolver
291    peer_resolver: PeerResolverRef,
292    /// The kv backend.
293    kv_backend: KvBackendRef,
294    /// The meta state, used to check if the current metasrv is the leader.
295    state: Option<StateRef>,
296}
297
298/// Controller for managing failure detectors for regions.
299#[derive(Debug, Clone)]
300pub struct RegionFailureDetectorControl {
301    sender: Sender<Event>,
302}
303
304impl RegionFailureDetectorControl {
305    pub(crate) fn new(sender: Sender<Event>) -> Self {
306        Self { sender }
307    }
308}
309
310#[async_trait::async_trait]
311impl RegionFailureDetectorController for RegionFailureDetectorControl {
312    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
313        if let Err(err) = self
314            .sender
315            .send(Event::RegisterFailureDetectors(detecting_regions))
316            .await
317        {
318            error!(err; "RegionSupervisor has stop receiving heartbeat.");
319        }
320    }
321
322    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
323        if let Err(err) = self
324            .sender
325            .send(Event::DeregisterFailureDetectors(detecting_regions))
326            .await
327        {
328            error!(err; "RegionSupervisor has stop receiving heartbeat.");
329        }
330    }
331}
332
333/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
334#[derive(Clone)]
335pub(crate) struct HeartbeatAcceptor {
336    sender: Sender<Event>,
337}
338
339impl HeartbeatAcceptor {
340    pub(crate) fn new(sender: Sender<Event>) -> Self {
341        Self { sender }
342    }
343
344    /// Accepts heartbeats from datanodes.
345    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
346        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
347            error!(err; "RegionSupervisor has stop receiving heartbeat.");
348        }
349    }
350}
351
352impl RegionSupervisor {
353    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
354    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
355        tokio::sync::mpsc::channel(1024)
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn new(
360        event_receiver: Receiver<Event>,
361        options: PhiAccrualFailureDetectorOptions,
362        selector_context: SelectorContext,
363        selector: RegionSupervisorSelector,
364        region_migration_manager: RegionMigrationManagerRef,
365        runtime_switch_manager: RuntimeSwitchManagerRef,
366        peer_resolver: PeerResolverRef,
367        kv_backend: KvBackendRef,
368    ) -> Self {
369        Self {
370            failure_detector: RegionFailureDetector::new(options),
371            failover_counts: HashMap::new(),
372            receiver: event_receiver,
373            selector_context,
374            selector,
375            region_migration_manager,
376            runtime_switch_manager,
377            peer_resolver,
378            kv_backend,
379            state: None,
380        }
381    }
382
383    /// Sets the meta state.
384    pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385        self.state = Some(state);
386        self
387    }
388
389    /// Runs the main loop.
390    pub(crate) async fn run(&mut self) {
391        while let Some(event) = self.receiver.recv().await {
392            if let Some(state) = self.state.as_ref()
393                && !state.read().unwrap().is_leader()
394            {
395                warn!(
396                    "The current metasrv is not the leader, ignore {:?} event",
397                    event
398                );
399                continue;
400            }
401
402            match event {
403                Event::InitializeAllRegions(sender) => {
404                    match self.is_maintenance_mode_enabled().await {
405                        Ok(false) => {}
406                        Ok(true) => {
407                            warn!(
408                                "Skipping initialize all regions since maintenance mode is enabled."
409                            );
410                            continue;
411                        }
412                        Err(err) => {
413                            error!(err; "Failed to check maintenance mode during initialize all regions.");
414                            continue;
415                        }
416                    }
417
418                    if let Err(err) = self.initialize_all().await {
419                        error!(err; "Failed to initialize all regions.");
420                    } else {
421                        // Ignore the error.
422                        let _ = sender.send(());
423                    }
424                }
425                Event::Tick => {
426                    let regions = self.detect_region_failure();
427                    self.handle_region_failures(regions).await;
428                }
429                Event::RegisterFailureDetectors(detecting_regions) => {
430                    self.register_failure_detectors(detecting_regions).await
431                }
432                Event::DeregisterFailureDetectors(detecting_regions) => {
433                    self.deregister_failure_detectors(detecting_regions).await
434                }
435                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
436                Event::Clear => {
437                    self.clear();
438                    info!("Region supervisor is initialized.");
439                }
440                #[cfg(test)]
441                Event::Dump(sender) => {
442                    let _ = sender.send(self.failure_detector.dump());
443                }
444            }
445        }
446        info!("RegionSupervisor is stopped!");
447    }
448
449    async fn initialize_all(&self) -> Result<()> {
450        let now = Instant::now();
451        let regions = self.regions();
452        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
453        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
454            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
455        })
456        .into_stream();
457
458        let mut stream = stream
459            .map_ok(|(_, value)| {
460                TableRouteValue::try_from_raw_value(&value)
461                    .context(error::TableMetadataManagerSnafu)
462            })
463            .boxed();
464        let mut detecting_regions = Vec::new();
465        while let Some(route) = stream
466            .try_next()
467            .await
468            .context(error::TableMetadataManagerSnafu)?
469        {
470            let route = route?;
471            if !route.is_physical() {
472                continue;
473            }
474
475            let physical_table_route = route.into_physical_table_route();
476            physical_table_route
477                .region_routes
478                .iter()
479                .for_each(|region_route| {
480                    if !regions.contains(&region_route.region.id)
481                        && let Some(leader_peer) = &region_route.leader_peer
482                    {
483                        detecting_regions.push((leader_peer.id, region_route.region.id));
484                    }
485                });
486        }
487
488        let num_detecting_regions = detecting_regions.len();
489        if !detecting_regions.is_empty() {
490            self.register_failure_detectors(detecting_regions).await;
491        }
492
493        info!(
494            "Initialize {} region failure detectors, elapsed: {:?}",
495            num_detecting_regions,
496            now.elapsed()
497        );
498
499        Ok(())
500    }
501
502    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
503        let ts_millis = current_time_millis();
504        for region in detecting_regions {
505            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
506            self.failure_detector
507                .maybe_init_region_failure_detector(region, ts_millis);
508        }
509    }
510
511    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
512        for region in detecting_regions {
513            self.failure_detector.remove(&region);
514            self.failover_counts.remove(&region);
515        }
516    }
517
518    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
519        if regions.is_empty() {
520            return;
521        }
522        match self.is_maintenance_mode_enabled().await {
523            Ok(false) => {}
524            Ok(true) => {
525                warn!(
526                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
527                    regions
528                );
529                return;
530            }
531            Err(err) => {
532                error!(err; "Failed to check maintenance mode");
533                return;
534            }
535        }
536
537        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
538        let migrating_regions = regions
539            .extract_if(.., |(_, region_id)| {
540                self.region_migration_manager.tracker().contains(*region_id)
541            })
542            .collect::<Vec<_>>();
543
544        for (datanode_id, region_id) in migrating_regions {
545            debug!(
546                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
547            );
548        }
549
550        if regions.is_empty() {
551            // If all detected regions are failover or migrating, just return.
552            return;
553        }
554
555        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
556            HashMap::with_capacity(regions.len());
557        for (datanode_id, region_id) in regions {
558            grouped_regions
559                .entry(datanode_id)
560                .or_default()
561                .push(region_id);
562        }
563
564        for (datanode_id, regions) in grouped_regions {
565            warn!(
566                "Detects region failures on datanode: {}, regions: {:?}",
567                datanode_id, regions
568            );
569            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
570            // because there may be false positives in failure detection on the datanode.
571            // So we only consider the datanode that reports the failure.
572            let failed_datanodes = [datanode_id];
573            match self
574                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
575                .await
576            {
577                Ok(tasks) => {
578                    let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
579                    for (task, count) in tasks {
580                        grouped_tasks
581                            .entry((task.from_peer.id, task.to_peer.id))
582                            .or_default()
583                            .push((task, count));
584                    }
585
586                    for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
587                        if tasks.is_empty() {
588                            continue;
589                        }
590                        let task = RegionMigrationTaskBatch::from_tasks(tasks);
591                        let region_ids = task.region_ids.clone();
592                        if let Err(err) = self.do_failover_tasks(task).await {
593                            error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
594                        }
595                    }
596                }
597                Err(err) => error!(err; "Failed to generate failover tasks"),
598            }
599        }
600    }
601
602    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
603        self.runtime_switch_manager
604            .maintenance_mode()
605            .await
606            .context(error::RuntimeSwitchManagerSnafu)
607    }
608
609    async fn select_peers(
610        &self,
611        from_peer_id: DatanodeId,
612        regions: &[RegionId],
613        failure_datanodes: &[DatanodeId],
614    ) -> Result<Vec<(RegionId, Peer)>> {
615        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
616        match &self.selector {
617            RegionSupervisorSelector::NaiveSelector(selector) => {
618                let opt = SelectorOptions {
619                    min_required_items: regions.len(),
620                    allow_duplication: true,
621                    exclude_peer_ids,
622                    workload_filter: Some(accept_ingest_workload),
623                };
624                let peers = selector.select(&self.selector_context, opt).await?;
625                ensure!(
626                    peers.len() == regions.len(),
627                    error::NoEnoughAvailableNodeSnafu {
628                        required: regions.len(),
629                        available: peers.len(),
630                        select_target: SelectTarget::Datanode,
631                    }
632                );
633                let region_peers = regions
634                    .iter()
635                    .zip(peers)
636                    .map(|(region_id, peer)| (*region_id, peer))
637                    .collect::<Vec<_>>();
638
639                Ok(region_peers)
640            }
641            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
642                let peers = selector
643                    .select(
644                        &self.selector_context,
645                        from_peer_id,
646                        regions,
647                        exclude_peer_ids,
648                    )
649                    .await?;
650                ensure!(
651                    peers.len() == regions.len(),
652                    error::NoEnoughAvailableNodeSnafu {
653                        required: regions.len(),
654                        available: peers.len(),
655                        select_target: SelectTarget::Datanode,
656                    }
657                );
658
659                Ok(peers)
660            }
661        }
662    }
663
664    async fn generate_failover_tasks(
665        &mut self,
666        from_peer_id: DatanodeId,
667        regions: &[RegionId],
668        failed_datanodes: &[DatanodeId],
669    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
670        let mut tasks = Vec::with_capacity(regions.len());
671        let from_peer = self
672            .peer_resolver
673            .datanode(from_peer_id)
674            .await
675            .ok()
676            .flatten()
677            .unwrap_or_else(|| Peer::empty(from_peer_id));
678
679        let region_peers = self
680            .select_peers(from_peer_id, regions, failed_datanodes)
681            .await?;
682
683        for (region_id, peer) in region_peers {
684            let count = *self
685                .failover_counts
686                .entry((from_peer_id, region_id))
687                .and_modify(|count| *count += 1)
688                .or_insert(1);
689            let task = RegionMigrationProcedureTask {
690                region_id,
691                from_peer: from_peer.clone(),
692                to_peer: peer,
693                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
694                trigger_reason: RegionMigrationTriggerReason::Failover,
695            };
696            tasks.push((task, count));
697        }
698
699        Ok(tasks)
700    }
701
702    async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
703        let from_peer_id = task.from_peer.id;
704        let to_peer_id = task.to_peer.id;
705        let timeout = task.timeout;
706        let trigger_reason = task.trigger_reason;
707        let result = self
708            .region_migration_manager
709            .submit_region_migration_task(task)
710            .await?;
711        self.handle_submit_region_migration_task_result(
712            from_peer_id,
713            to_peer_id,
714            timeout,
715            trigger_reason,
716            result,
717        )
718        .await
719    }
720
721    async fn handle_submit_region_migration_task_result(
722        &mut self,
723        from_peer_id: DatanodeId,
724        to_peer_id: DatanodeId,
725        timeout: Duration,
726        trigger_reason: RegionMigrationTriggerReason,
727        result: SubmitRegionMigrationTaskResult,
728    ) -> Result<()> {
729        if !result.migrated.is_empty() {
730            let detecting_regions = result
731                .migrated
732                .iter()
733                .map(|region_id| (from_peer_id, *region_id))
734                .collect::<Vec<_>>();
735            self.deregister_failure_detectors(detecting_regions).await;
736            info!(
737                "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
738                to_peer_id, result.migrated,
739            )
740        }
741        if !result.migrating.is_empty() {
742            info!(
743                "Region is still migrating, skipping failover for regions: {:?}",
744                result.migrating
745            );
746        }
747        if !result.table_not_found.is_empty() {
748            let detecting_regions = result
749                .table_not_found
750                .iter()
751                .map(|region_id| (from_peer_id, *region_id))
752                .collect::<Vec<_>>();
753            self.deregister_failure_detectors(detecting_regions).await;
754            info!(
755                "Table is not found, removed failover detectors for regions: {:?}",
756                result.table_not_found
757            );
758        }
759        if !result.leader_changed.is_empty() {
760            let detecting_regions = result
761                .leader_changed
762                .iter()
763                .map(|region_id| (from_peer_id, *region_id))
764                .collect::<Vec<_>>();
765            self.deregister_failure_detectors(detecting_regions).await;
766            info!(
767                "Region's leader peer changed, removed failover detectors for regions: {:?}",
768                result.leader_changed
769            );
770        }
771        if !result.peer_conflict.is_empty() {
772            info!(
773                "Region has peer conflict, ignore failover for regions: {:?}",
774                result.peer_conflict
775            );
776        }
777        if !result.submitted.is_empty() {
778            info!(
779                "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
780                result.submitted,
781                from_peer_id,
782                to_peer_id,
783                result.procedure_id,
784                timeout,
785                trigger_reason,
786            );
787        }
788
789        Ok(())
790    }
791
792    /// Detects the failure of regions.
793    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
794        self.failure_detector
795            .iter()
796            .filter_map(|e| {
797                // Intentionally not place `current_time_millis()` out of the iteration.
798                // The failure detection determination should be happened "just in time",
799                // i.e., failed or not has to be compared with the most recent "now".
800                // Besides, it might reduce the false positive of failure detection,
801                // because during the iteration, heartbeats are coming in as usual,
802                // and the `phi`s are still updating.
803                if !e.failure_detector().is_available(current_time_millis()) {
804                    Some(*e.region_ident())
805                } else {
806                    None
807                }
808            })
809            .collect::<Vec<_>>()
810    }
811
812    /// Returns all regions that registered in the failure detector.
813    fn regions(&self) -> HashSet<RegionId> {
814        self.failure_detector
815            .iter()
816            .map(|e| e.region_ident().1)
817            .collect::<HashSet<_>>()
818    }
819
820    /// Updates the state of corresponding failure detectors.
821    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
822        for region_id in heartbeat.regions {
823            let detecting_region = (heartbeat.datanode_id, region_id);
824            let mut detector = self
825                .failure_detector
826                .region_failure_detector(detecting_region);
827            detector.heartbeat(heartbeat.timestamp);
828        }
829    }
830
831    fn clear(&self) {
832        self.failure_detector.clear();
833    }
834}
835
836#[cfg(test)]
837pub(crate) mod tests {
838    use std::assert_matches::assert_matches;
839    use std::collections::HashMap;
840    use std::sync::{Arc, Mutex};
841    use std::time::Duration;
842
843    use common_meta::ddl::RegionFailureDetectorController;
844    use common_meta::ddl::test_util::{
845        test_create_logical_table_task, test_create_physical_table_task,
846    };
847    use common_meta::key::table_route::{
848        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
849    };
850    use common_meta::key::{TableMetadataManager, runtime_switch};
851    use common_meta::peer::Peer;
852    use common_meta::rpc::router::{Region, RegionRoute};
853    use common_meta::test_util::NoopPeerResolver;
854    use common_telemetry::info;
855    use common_time::util::current_time_millis;
856    use rand::Rng;
857    use store_api::storage::RegionId;
858    use tokio::sync::mpsc::Sender;
859    use tokio::sync::oneshot;
860    use tokio::time::sleep;
861
862    use super::RegionSupervisorSelector;
863    use crate::procedure::region_migration::RegionMigrationTriggerReason;
864    use crate::procedure::region_migration::manager::{
865        RegionMigrationManager, SubmitRegionMigrationTaskResult,
866    };
867    use crate::procedure::region_migration::test_util::TestingEnv;
868    use crate::region::supervisor::{
869        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
870        RegionSupervisorTicker,
871    };
872    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
873
874    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
875        let env = TestingEnv::new();
876        let selector_context = new_test_selector_context();
877        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
878        let context_factory = env.context_factory();
879        let region_migration_manager = Arc::new(RegionMigrationManager::new(
880            env.procedure_manager().clone(),
881            context_factory,
882        ));
883        let runtime_switch_manager =
884            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
885        let peer_resolver = Arc::new(NoopPeerResolver);
886        let (tx, rx) = RegionSupervisor::channel();
887        let kv_backend = env.kv_backend();
888
889        (
890            RegionSupervisor::new(
891                rx,
892                Default::default(),
893                selector_context,
894                RegionSupervisorSelector::NaiveSelector(selector),
895                region_migration_manager,
896                runtime_switch_manager,
897                peer_resolver,
898                kv_backend,
899            ),
900            tx,
901        )
902    }
903
904    #[tokio::test]
905    async fn test_heartbeat() {
906        let (mut supervisor, sender) = new_test_supervisor();
907        tokio::spawn(async move { supervisor.run().await });
908
909        sender
910            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
911                datanode_id: 0,
912                regions: vec![RegionId::new(1, 1)],
913                timestamp: 100,
914            }))
915            .await
916            .unwrap();
917        let (tx, rx) = oneshot::channel();
918        sender.send(Event::Dump(tx)).await.unwrap();
919        let detector = rx.await.unwrap();
920        assert!(detector.contains(&(0, RegionId::new(1, 1))));
921
922        // Clear up
923        sender.send(Event::Clear).await.unwrap();
924        let (tx, rx) = oneshot::channel();
925        sender.send(Event::Dump(tx)).await.unwrap();
926        assert!(rx.await.unwrap().is_empty());
927
928        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
929            let mut rng = rand::rng();
930            let start = current_time_millis();
931            (0..2000)
932                .map(|i| DatanodeHeartbeat {
933                    timestamp: start + i * 1000 + rng.random_range(0..100),
934                    datanode_id,
935                    regions: region_ids
936                        .iter()
937                        .map(|number| RegionId::new(0, *number))
938                        .collect(),
939                })
940                .collect::<Vec<_>>()
941        }
942
943        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
944        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
945        for heartbeat in heartbeats {
946            sender
947                .send(Event::HeartbeatArrived(heartbeat))
948                .await
949                .unwrap();
950        }
951
952        let (tx, rx) = oneshot::channel();
953        sender.send(Event::Dump(tx)).await.unwrap();
954        let detector = rx.await.unwrap();
955        assert_eq!(detector.len(), 3);
956
957        for e in detector.iter() {
958            let fd = e.failure_detector();
959            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
960            let start = last_heartbeat_time;
961
962            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
963            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
964                let now = start + i * 1000;
965                assert_eq!(fd.phi(now), 0.0);
966            }
967
968            // ... then in less than two seconds, phi is above the threshold.
969            // The same effect can be seen in the diagrams in Akka's document.
970            let now = start + acceptable_heartbeat_pause_millis + 1000;
971            assert!(fd.phi(now) < fd.threshold() as _);
972            let now = start + acceptable_heartbeat_pause_millis + 2000;
973            assert!(fd.phi(now) > fd.threshold() as _);
974        }
975    }
976
977    #[tokio::test]
978    async fn test_supervisor_ticker() {
979        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
980        let ticker = RegionSupervisorTicker {
981            tick_handle: Mutex::new(None),
982            initialization_handle: Mutex::new(None),
983            tick_interval: Duration::from_millis(10),
984            initialization_delay: Duration::from_millis(100),
985            initialization_retry_period: Duration::from_millis(100),
986            sender: tx,
987        };
988        // It's ok if we start the ticker again.
989        for _ in 0..2 {
990            ticker.start();
991            sleep(Duration::from_millis(100)).await;
992            ticker.stop();
993            assert!(!rx.is_empty());
994            while let Ok(event) = rx.try_recv() {
995                assert_matches!(
996                    event,
997                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
998                );
999            }
1000            assert!(ticker.initialization_handle.lock().unwrap().is_none());
1001            assert!(ticker.tick_handle.lock().unwrap().is_none());
1002        }
1003    }
1004
1005    #[tokio::test]
1006    async fn test_initialize_all_regions_event_handling() {
1007        common_telemetry::init_default_ut_logging();
1008        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1009        let ticker = RegionSupervisorTicker {
1010            tick_handle: Mutex::new(None),
1011            initialization_handle: Mutex::new(None),
1012            tick_interval: Duration::from_millis(1000),
1013            initialization_delay: Duration::from_millis(50),
1014            initialization_retry_period: Duration::from_millis(50),
1015            sender: tx,
1016        };
1017        ticker.start();
1018        sleep(Duration::from_millis(60)).await;
1019        let handle = tokio::spawn(async move {
1020            let mut counter = 0;
1021            while let Some(event) = rx.recv().await {
1022                if let Event::InitializeAllRegions(tx) = event {
1023                    if counter == 0 {
1024                        // Ignore the first event
1025                        counter += 1;
1026                        continue;
1027                    }
1028                    tx.send(()).unwrap();
1029                    info!("Responded initialize all regions event");
1030                    break;
1031                }
1032            }
1033            rx
1034        });
1035
1036        let rx = handle.await.unwrap();
1037        for _ in 0..3 {
1038            sleep(Duration::from_millis(100)).await;
1039            assert!(rx.is_empty());
1040        }
1041    }
1042
1043    #[tokio::test]
1044    async fn test_initialize_all_regions() {
1045        common_telemetry::init_default_ut_logging();
1046        let (mut supervisor, sender) = new_test_supervisor();
1047        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1048
1049        // Create a physical table metadata
1050        let table_id = 1024;
1051        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1052        create_physical_table_task.set_table_id(table_id);
1053        let table_info = create_physical_table_task.table_info;
1054        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1055            region: Region {
1056                id: RegionId::new(table_id, 0),
1057                ..Default::default()
1058            },
1059            leader_peer: Some(Peer::empty(1)),
1060            ..Default::default()
1061        }]);
1062        let table_route_value = TableRouteValue::Physical(table_route);
1063        table_metadata_manager
1064            .create_table_metadata(table_info, table_route_value, HashMap::new())
1065            .await
1066            .unwrap();
1067
1068        // Create a logical table metadata
1069        let logical_table_id = 1025;
1070        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1071        test_create_logical_table_task.set_table_id(logical_table_id);
1072        let table_info = test_create_logical_table_task.table_info;
1073        let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
1074        let table_route_value = TableRouteValue::Logical(table_route);
1075        table_metadata_manager
1076            .create_table_metadata(table_info, table_route_value, HashMap::new())
1077            .await
1078            .unwrap();
1079        tokio::spawn(async move { supervisor.run().await });
1080        let (tx, rx) = oneshot::channel();
1081        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1082        assert!(rx.await.is_ok());
1083
1084        let (tx, rx) = oneshot::channel();
1085        sender.send(Event::Dump(tx)).await.unwrap();
1086        let detector = rx.await.unwrap();
1087        assert_eq!(detector.len(), 1);
1088        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1089    }
1090
1091    #[tokio::test]
1092    async fn test_initialize_all_regions_with_maintenance_mode() {
1093        common_telemetry::init_default_ut_logging();
1094        let (mut supervisor, sender) = new_test_supervisor();
1095
1096        supervisor
1097            .runtime_switch_manager
1098            .set_maintenance_mode()
1099            .await
1100            .unwrap();
1101        tokio::spawn(async move { supervisor.run().await });
1102        let (tx, rx) = oneshot::channel();
1103        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1104        // The sender is dropped, so the receiver will receive an error.
1105        assert!(rx.await.is_err());
1106    }
1107
1108    #[tokio::test]
1109    async fn test_region_failure_detector_controller() {
1110        let (mut supervisor, sender) = new_test_supervisor();
1111        let controller = RegionFailureDetectorControl::new(sender.clone());
1112        tokio::spawn(async move { supervisor.run().await });
1113        let detecting_region = (1, RegionId::new(1, 1));
1114        controller
1115            .register_failure_detectors(vec![detecting_region])
1116            .await;
1117
1118        let (tx, rx) = oneshot::channel();
1119        sender.send(Event::Dump(tx)).await.unwrap();
1120        let detector = rx.await.unwrap();
1121        let region_detector = detector.region_failure_detector(detecting_region).clone();
1122
1123        // Registers failure detector again
1124        controller
1125            .register_failure_detectors(vec![detecting_region])
1126            .await;
1127        let (tx, rx) = oneshot::channel();
1128        sender.send(Event::Dump(tx)).await.unwrap();
1129        let detector = rx.await.unwrap();
1130        let got = detector.region_failure_detector(detecting_region).clone();
1131        assert_eq!(region_detector, got);
1132
1133        controller
1134            .deregister_failure_detectors(vec![detecting_region])
1135            .await;
1136        let (tx, rx) = oneshot::channel();
1137        sender.send(Event::Dump(tx)).await.unwrap();
1138        assert!(rx.await.unwrap().is_empty());
1139    }
1140
1141    #[tokio::test]
1142    async fn test_handle_submit_region_migration_task_result_migrated() {
1143        common_telemetry::init_default_ut_logging();
1144        let (mut supervisor, _) = new_test_supervisor();
1145        let region_id = RegionId::new(1, 1);
1146        let detecting_region = (1, region_id);
1147        supervisor
1148            .register_failure_detectors(vec![detecting_region])
1149            .await;
1150        supervisor.failover_counts.insert(detecting_region, 1);
1151        let result = SubmitRegionMigrationTaskResult {
1152            migrated: vec![region_id],
1153            ..Default::default()
1154        };
1155        supervisor
1156            .handle_submit_region_migration_task_result(
1157                1,
1158                2,
1159                Duration::from_millis(1000),
1160                RegionMigrationTriggerReason::Manual,
1161                result,
1162            )
1163            .await
1164            .unwrap();
1165        assert!(!supervisor.failure_detector.contains(&detecting_region));
1166        assert!(supervisor.failover_counts.is_empty());
1167    }
1168
1169    #[tokio::test]
1170    async fn test_handle_submit_region_migration_task_result_migrating() {
1171        common_telemetry::init_default_ut_logging();
1172        let (mut supervisor, _) = new_test_supervisor();
1173        let region_id = RegionId::new(1, 1);
1174        let detecting_region = (1, region_id);
1175        supervisor
1176            .register_failure_detectors(vec![detecting_region])
1177            .await;
1178        supervisor.failover_counts.insert(detecting_region, 1);
1179        let result = SubmitRegionMigrationTaskResult {
1180            migrating: vec![region_id],
1181            ..Default::default()
1182        };
1183        supervisor
1184            .handle_submit_region_migration_task_result(
1185                1,
1186                2,
1187                Duration::from_millis(1000),
1188                RegionMigrationTriggerReason::Manual,
1189                result,
1190            )
1191            .await
1192            .unwrap();
1193        assert!(supervisor.failure_detector.contains(&detecting_region));
1194        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1195    }
1196
1197    #[tokio::test]
1198    async fn test_handle_submit_region_migration_task_result_table_not_found() {
1199        common_telemetry::init_default_ut_logging();
1200        let (mut supervisor, _) = new_test_supervisor();
1201        let region_id = RegionId::new(1, 1);
1202        let detecting_region = (1, region_id);
1203        supervisor
1204            .register_failure_detectors(vec![detecting_region])
1205            .await;
1206        supervisor.failover_counts.insert(detecting_region, 1);
1207        let result = SubmitRegionMigrationTaskResult {
1208            table_not_found: vec![region_id],
1209            ..Default::default()
1210        };
1211        supervisor
1212            .handle_submit_region_migration_task_result(
1213                1,
1214                2,
1215                Duration::from_millis(1000),
1216                RegionMigrationTriggerReason::Manual,
1217                result,
1218            )
1219            .await
1220            .unwrap();
1221        assert!(!supervisor.failure_detector.contains(&detecting_region));
1222        assert!(supervisor.failover_counts.is_empty());
1223    }
1224
1225    #[tokio::test]
1226    async fn test_handle_submit_region_migration_task_result_leader_changed() {
1227        common_telemetry::init_default_ut_logging();
1228        let (mut supervisor, _) = new_test_supervisor();
1229        let region_id = RegionId::new(1, 1);
1230        let detecting_region = (1, region_id);
1231        supervisor
1232            .register_failure_detectors(vec![detecting_region])
1233            .await;
1234        supervisor.failover_counts.insert(detecting_region, 1);
1235        let result = SubmitRegionMigrationTaskResult {
1236            leader_changed: vec![region_id],
1237            ..Default::default()
1238        };
1239        supervisor
1240            .handle_submit_region_migration_task_result(
1241                1,
1242                2,
1243                Duration::from_millis(1000),
1244                RegionMigrationTriggerReason::Manual,
1245                result,
1246            )
1247            .await
1248            .unwrap();
1249        assert!(!supervisor.failure_detector.contains(&detecting_region));
1250        assert!(supervisor.failover_counts.is_empty());
1251    }
1252
1253    #[tokio::test]
1254    async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1255        common_telemetry::init_default_ut_logging();
1256        let (mut supervisor, _) = new_test_supervisor();
1257        let region_id = RegionId::new(1, 1);
1258        let detecting_region = (1, region_id);
1259        supervisor
1260            .register_failure_detectors(vec![detecting_region])
1261            .await;
1262        supervisor.failover_counts.insert(detecting_region, 1);
1263        let result = SubmitRegionMigrationTaskResult {
1264            peer_conflict: vec![region_id],
1265            ..Default::default()
1266        };
1267        supervisor
1268            .handle_submit_region_migration_task_result(
1269                1,
1270                2,
1271                Duration::from_millis(1000),
1272                RegionMigrationTriggerReason::Manual,
1273                result,
1274            )
1275            .await
1276            .unwrap();
1277        assert!(supervisor.failure_detector.contains(&detecting_region));
1278        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1279    }
1280
1281    #[tokio::test]
1282    async fn test_handle_submit_region_migration_task_result_submitted() {
1283        common_telemetry::init_default_ut_logging();
1284        let (mut supervisor, _) = new_test_supervisor();
1285        let region_id = RegionId::new(1, 1);
1286        let detecting_region = (1, region_id);
1287        supervisor
1288            .register_failure_detectors(vec![detecting_region])
1289            .await;
1290        supervisor.failover_counts.insert(detecting_region, 1);
1291        let result = SubmitRegionMigrationTaskResult {
1292            submitted: vec![region_id],
1293            ..Default::default()
1294        };
1295        supervisor
1296            .handle_submit_region_migration_task_result(
1297                1,
1298                2,
1299                Duration::from_millis(1000),
1300                RegionMigrationTriggerReason::Manual,
1301                result,
1302            )
1303            .await
1304            .unwrap();
1305        assert!(supervisor.failure_detector.contains(&detecting_region));
1306        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1307    }
1308}