meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47    RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
58/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
59/// and a timestamp indicating when the heartbeat was sent.
60#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62    datanode_id: DatanodeId,
63    // TODO(weny): Considers collecting the memtable size in regions.
64    regions: Vec<RegionId>,
65    timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69    fn from(value: &Stat) -> Self {
70        DatanodeHeartbeat {
71            datanode_id: value.id,
72            regions: value.region_stats.iter().map(|x| x.id).collect(),
73            timestamp: value.timestamp_millis,
74        }
75    }
76}
77
78/// `Event` represents various types of events that can be processed by the region supervisor.
79/// These events are crucial for managing state transitions and handling specific scenarios
80/// in the region lifecycle.
81///
82/// Variants:
83/// - `Tick`: This event is used to trigger region failure detection periodically.
84/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
85/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
86/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
87/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
88/// - `Clear`: This event is used to reset the state of the supervisor, typically used
89///   when a system-wide reset or reinitialization is needed.
90/// - `Dump`: (Available only in test) This event triggers a dump of the
91///   current state for debugging purposes. It allows developers to inspect the internal state
92///   of the supervisor during tests.
93pub(crate) enum Event {
94    Tick,
95    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
96    RegisterFailureDetectors(Vec<DetectingRegion>),
97    DeregisterFailureDetectors(Vec<DetectingRegion>),
98    HeartbeatArrived(DatanodeHeartbeat),
99    Clear,
100    #[cfg(test)]
101    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
102}
103
104impl Debug for Event {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Tick => write!(f, "Tick"),
108            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
109            Self::Clear => write!(f, "Clear"),
110            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
111            Self::RegisterFailureDetectors(arg0) => f
112                .debug_tuple("RegisterFailureDetectors")
113                .field(arg0)
114                .finish(),
115            Self::DeregisterFailureDetectors(arg0) => f
116                .debug_tuple("DeregisterFailureDetectors")
117                .field(arg0)
118                .finish(),
119            #[cfg(test)]
120            Self::Dump(_) => f.debug_struct("Dump").finish(),
121        }
122    }
123}
124
125pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
126
127/// A background job to generate [`Event::Tick`] type events.
128#[derive(Debug)]
129pub struct RegionSupervisorTicker {
130    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
131    tick_handle: Mutex<Option<JoinHandle<()>>>,
132
133    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
134    initialization_handle: Mutex<Option<JoinHandle<()>>>,
135
136    /// The interval of tick.
137    tick_interval: Duration,
138
139    /// The delay before initializing all region failure detectors.
140    initialization_delay: Duration,
141
142    /// The retry period for initializing all region failure detectors.
143    initialization_retry_period: Duration,
144
145    /// Sends [Event]s.
146    sender: Sender<Event>,
147}
148
149#[async_trait]
150impl LeadershipChangeListener for RegionSupervisorTicker {
151    fn name(&self) -> &'static str {
152        "RegionSupervisorTicker"
153    }
154
155    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
156        self.start();
157        Ok(())
158    }
159
160    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
161        self.stop();
162        Ok(())
163    }
164}
165
166impl RegionSupervisorTicker {
167    pub(crate) fn new(
168        tick_interval: Duration,
169        initialization_delay: Duration,
170        initialization_retry_period: Duration,
171        sender: Sender<Event>,
172    ) -> Self {
173        info!(
174            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
175            tick_interval, initialization_delay, initialization_retry_period
176        );
177        Self {
178            tick_handle: Mutex::new(None),
179            initialization_handle: Mutex::new(None),
180            tick_interval,
181            initialization_delay,
182            initialization_retry_period,
183            sender,
184        }
185    }
186
187    /// Starts the ticker.
188    pub fn start(&self) {
189        let mut handle = self.tick_handle.lock().unwrap();
190        if handle.is_none() {
191            let sender = self.sender.clone();
192            let tick_interval = self.tick_interval;
193            let initialization_delay = self.initialization_delay;
194
195            let mut initialization_interval = interval_at(
196                tokio::time::Instant::now() + initialization_delay,
197                self.initialization_retry_period,
198            );
199            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200            let initialization_handler = common_runtime::spawn_global(async move {
201                loop {
202                    initialization_interval.tick().await;
203                    let (tx, rx) = oneshot::channel();
204                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
205                        info!(
206                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
207                        );
208                        break;
209                    }
210                    if rx.await.is_ok() {
211                        info!("All region failure detectors are initialized.");
212                        break;
213                    }
214                }
215            });
216            *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
217
218            let sender = self.sender.clone();
219            let ticker_loop = tokio::spawn(async move {
220                let mut tick_interval = interval(tick_interval);
221                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222
223                if let Err(err) = sender.send(Event::Clear).await {
224                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
225                    return;
226                }
227                loop {
228                    tick_interval.tick().await;
229                    if sender.send(Event::Tick).await.is_err() {
230                        info!("EventReceiver is dropped, tick loop is stopped");
231                        break;
232                    }
233                }
234            });
235            *handle = Some(ticker_loop);
236        }
237    }
238
239    /// Stops the ticker.
240    pub fn stop(&self) {
241        let handle = self.tick_handle.lock().unwrap().take();
242        if let Some(handle) = handle {
243            handle.abort();
244            info!("The tick loop is stopped.");
245        }
246        let initialization_handler = self.initialization_handle.lock().unwrap().take();
247        if let Some(initialization_handler) = initialization_handler {
248            initialization_handler.abort();
249            info!("The initialization loop is stopped.");
250        }
251    }
252}
253
254impl Drop for RegionSupervisorTicker {
255    fn drop(&mut self) {
256        self.stop();
257    }
258}
259
260pub type RegionSupervisorRef = Arc<RegionSupervisor>;
261
262/// The default tick interval.
263pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
264/// The default initialization retry period.
265pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
266
267/// Selector for region supervisor.
268pub enum RegionSupervisorSelector {
269    NaiveSelector(SelectorRef),
270    RegionStatAwareSelector(RegionStatAwareSelectorRef),
271}
272
273/// The [`RegionSupervisor`] is used to detect Region failures
274/// and initiate Region failover upon detection, ensuring uninterrupted region service.
275pub struct RegionSupervisor {
276    /// Used to detect the failure of regions.
277    failure_detector: RegionFailureDetector,
278    /// Tracks the number of failovers for each region.
279    failover_counts: HashMap<DetectingRegion, u32>,
280    /// Receives [Event]s.
281    receiver: Receiver<Event>,
282    /// The context of [`SelectorRef`]
283    selector_context: SelectorContext,
284    /// Candidate node selector.
285    selector: RegionSupervisorSelector,
286    /// Region migration manager.
287    region_migration_manager: RegionMigrationManagerRef,
288    /// The maintenance mode manager.
289    runtime_switch_manager: RuntimeSwitchManagerRef,
290    /// Peer resolver
291    peer_resolver: PeerResolverRef,
292    /// The kv backend.
293    kv_backend: KvBackendRef,
294    /// The meta state, used to check if the current metasrv is the leader.
295    state: Option<StateRef>,
296}
297
298/// Controller for managing failure detectors for regions.
299#[derive(Debug, Clone)]
300pub struct RegionFailureDetectorControl {
301    sender: Sender<Event>,
302}
303
304impl RegionFailureDetectorControl {
305    pub(crate) fn new(sender: Sender<Event>) -> Self {
306        Self { sender }
307    }
308}
309
310#[async_trait::async_trait]
311impl RegionFailureDetectorController for RegionFailureDetectorControl {
312    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
313        if let Err(err) = self
314            .sender
315            .send(Event::RegisterFailureDetectors(detecting_regions))
316            .await
317        {
318            error!(err; "RegionSupervisor has stop receiving heartbeat.");
319        }
320    }
321
322    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
323        if let Err(err) = self
324            .sender
325            .send(Event::DeregisterFailureDetectors(detecting_regions))
326            .await
327        {
328            error!(err; "RegionSupervisor has stop receiving heartbeat.");
329        }
330    }
331}
332
333/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
334#[derive(Clone)]
335pub(crate) struct HeartbeatAcceptor {
336    sender: Sender<Event>,
337}
338
339impl HeartbeatAcceptor {
340    pub(crate) fn new(sender: Sender<Event>) -> Self {
341        Self { sender }
342    }
343
344    /// Accepts heartbeats from datanodes.
345    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
346        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
347            error!(err; "RegionSupervisor has stop receiving heartbeat.");
348        }
349    }
350}
351
352impl RegionSupervisor {
353    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
354    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
355        tokio::sync::mpsc::channel(1024)
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn new(
360        event_receiver: Receiver<Event>,
361        options: PhiAccrualFailureDetectorOptions,
362        selector_context: SelectorContext,
363        selector: RegionSupervisorSelector,
364        region_migration_manager: RegionMigrationManagerRef,
365        runtime_switch_manager: RuntimeSwitchManagerRef,
366        peer_resolver: PeerResolverRef,
367        kv_backend: KvBackendRef,
368    ) -> Self {
369        Self {
370            failure_detector: RegionFailureDetector::new(options),
371            failover_counts: HashMap::new(),
372            receiver: event_receiver,
373            selector_context,
374            selector,
375            region_migration_manager,
376            runtime_switch_manager,
377            peer_resolver,
378            kv_backend,
379            state: None,
380        }
381    }
382
383    /// Sets the meta state.
384    pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385        self.state = Some(state);
386        self
387    }
388
389    /// Runs the main loop.
390    pub(crate) async fn run(&mut self) {
391        while let Some(event) = self.receiver.recv().await {
392            if let Some(state) = self.state.as_ref()
393                && !state.read().unwrap().is_leader()
394            {
395                warn!(
396                    "The current metasrv is not the leader, ignore {:?} event",
397                    event
398                );
399                continue;
400            }
401
402            match event {
403                Event::InitializeAllRegions(sender) => {
404                    match self.is_maintenance_mode_enabled().await {
405                        Ok(false) => {}
406                        Ok(true) => {
407                            warn!(
408                                "Skipping initialize all regions since maintenance mode is enabled."
409                            );
410                            continue;
411                        }
412                        Err(err) => {
413                            error!(err; "Failed to check maintenance mode during initialize all regions.");
414                            continue;
415                        }
416                    }
417
418                    if let Err(err) = self.initialize_all().await {
419                        error!(err; "Failed to initialize all regions.");
420                    } else {
421                        // Ignore the error.
422                        let _ = sender.send(());
423                    }
424                }
425                Event::Tick => {
426                    let regions = self.detect_region_failure();
427                    self.handle_region_failures(regions).await;
428                }
429                Event::RegisterFailureDetectors(detecting_regions) => {
430                    self.register_failure_detectors(detecting_regions).await
431                }
432                Event::DeregisterFailureDetectors(detecting_regions) => {
433                    self.deregister_failure_detectors(detecting_regions).await
434                }
435                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
436                Event::Clear => {
437                    self.clear();
438                    info!("Region supervisor is initialized.");
439                }
440                #[cfg(test)]
441                Event::Dump(sender) => {
442                    let _ = sender.send(self.failure_detector.dump());
443                }
444            }
445        }
446        info!("RegionSupervisor is stopped!");
447    }
448
449    async fn initialize_all(&self) -> Result<()> {
450        let now = Instant::now();
451        let regions = self.regions();
452        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
453        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
454            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
455        })
456        .into_stream();
457
458        let mut stream = stream
459            .map_ok(|(_, value)| {
460                TableRouteValue::try_from_raw_value(&value)
461                    .context(error::TableMetadataManagerSnafu)
462            })
463            .boxed();
464        let mut detecting_regions = Vec::new();
465        while let Some(route) = stream
466            .try_next()
467            .await
468            .context(error::TableMetadataManagerSnafu)?
469        {
470            let route = route?;
471            if !route.is_physical() {
472                continue;
473            }
474
475            let physical_table_route = route.into_physical_table_route();
476            physical_table_route
477                .region_routes
478                .iter()
479                .for_each(|region_route| {
480                    if !regions.contains(&region_route.region.id)
481                        && let Some(leader_peer) = &region_route.leader_peer
482                    {
483                        detecting_regions.push((leader_peer.id, region_route.region.id));
484                    }
485                });
486        }
487
488        let num_detecting_regions = detecting_regions.len();
489        if !detecting_regions.is_empty() {
490            self.register_failure_detectors(detecting_regions).await;
491        }
492
493        info!(
494            "Initialize {} region failure detectors, elapsed: {:?}",
495            num_detecting_regions,
496            now.elapsed()
497        );
498
499        Ok(())
500    }
501
502    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
503        let ts_millis = current_time_millis();
504        for region in detecting_regions {
505            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
506            self.failure_detector
507                .maybe_init_region_failure_detector(region, ts_millis);
508        }
509    }
510
511    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
512        for region in detecting_regions {
513            self.failure_detector.remove(&region);
514            self.failover_counts.remove(&region);
515        }
516    }
517
518    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
519        if regions.is_empty() {
520            return;
521        }
522        match self.is_maintenance_mode_enabled().await {
523            Ok(false) => {}
524            Ok(true) => {
525                warn!(
526                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
527                    regions
528                );
529                return;
530            }
531            Err(err) => {
532                error!(err; "Failed to check maintenance mode");
533                return;
534            }
535        }
536
537        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
538        let migrating_regions = regions
539            .extract_if(.., |(_, region_id)| {
540                self.region_migration_manager.tracker().contains(*region_id)
541            })
542            .collect::<Vec<_>>();
543
544        for (datanode_id, region_id) in migrating_regions {
545            debug!(
546                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
547            );
548        }
549
550        if regions.is_empty() {
551            // If all detected regions are failover or migrating, just return.
552            return;
553        }
554
555        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
556            HashMap::with_capacity(regions.len());
557        for (datanode_id, region_id) in regions {
558            grouped_regions
559                .entry(datanode_id)
560                .or_default()
561                .push(region_id);
562        }
563
564        for (datanode_id, regions) in grouped_regions {
565            warn!(
566                "Detects region failures on datanode: {}, regions: {:?}",
567                datanode_id, regions
568            );
569            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
570            // because there may be false positives in failure detection on the datanode.
571            // So we only consider the datanode that reports the failure.
572            let failed_datanodes = [datanode_id];
573            match self
574                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
575                .await
576            {
577                Ok(tasks) => {
578                    let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
579                    for (task, count) in tasks {
580                        grouped_tasks
581                            .entry((task.from_peer.id, task.to_peer.id))
582                            .or_default()
583                            .push((task, count));
584                    }
585
586                    for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
587                        if tasks.is_empty() {
588                            continue;
589                        }
590                        let task = RegionMigrationTaskBatch::from_tasks(tasks);
591                        let region_ids = task.region_ids.clone();
592                        if let Err(err) = self.do_failover_tasks(task).await {
593                            error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
594                        }
595                    }
596                }
597                Err(err) => error!(err; "Failed to generate failover tasks"),
598            }
599        }
600    }
601
602    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
603        self.runtime_switch_manager
604            .maintenance_mode()
605            .await
606            .context(error::RuntimeSwitchManagerSnafu)
607    }
608
609    async fn select_peers(
610        &self,
611        from_peer_id: DatanodeId,
612        regions: &[RegionId],
613        failure_datanodes: &[DatanodeId],
614    ) -> Result<Vec<(RegionId, Peer)>> {
615        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
616        match &self.selector {
617            RegionSupervisorSelector::NaiveSelector(selector) => {
618                let opt = SelectorOptions {
619                    min_required_items: regions.len(),
620                    allow_duplication: true,
621                    exclude_peer_ids,
622                    workload_filter: Some(accept_ingest_workload),
623                };
624                let peers = selector.select(&self.selector_context, opt).await?;
625                ensure!(
626                    peers.len() == regions.len(),
627                    error::NoEnoughAvailableNodeSnafu {
628                        required: regions.len(),
629                        available: peers.len(),
630                        select_target: SelectTarget::Datanode,
631                    }
632                );
633                let region_peers = regions
634                    .iter()
635                    .zip(peers)
636                    .map(|(region_id, peer)| (*region_id, peer))
637                    .collect::<Vec<_>>();
638
639                Ok(region_peers)
640            }
641            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
642                let peers = selector
643                    .select(
644                        &self.selector_context,
645                        from_peer_id,
646                        regions,
647                        exclude_peer_ids,
648                    )
649                    .await?;
650                ensure!(
651                    peers.len() == regions.len(),
652                    error::NoEnoughAvailableNodeSnafu {
653                        required: regions.len(),
654                        available: peers.len(),
655                        select_target: SelectTarget::Datanode,
656                    }
657                );
658
659                Ok(peers)
660            }
661        }
662    }
663
664    async fn generate_failover_tasks(
665        &mut self,
666        from_peer_id: DatanodeId,
667        regions: &[RegionId],
668        failed_datanodes: &[DatanodeId],
669    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
670        let mut tasks = Vec::with_capacity(regions.len());
671        let from_peer = self
672            .peer_resolver
673            .datanode(from_peer_id)
674            .await
675            .ok()
676            .flatten()
677            .unwrap_or_else(|| Peer::empty(from_peer_id));
678
679        let region_peers = self
680            .select_peers(from_peer_id, regions, failed_datanodes)
681            .await?;
682
683        for (region_id, peer) in region_peers {
684            let count = *self
685                .failover_counts
686                .entry((from_peer_id, region_id))
687                .and_modify(|count| *count += 1)
688                .or_insert(1);
689            let task = RegionMigrationProcedureTask {
690                region_id,
691                from_peer: from_peer.clone(),
692                to_peer: peer,
693                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
694                trigger_reason: RegionMigrationTriggerReason::Failover,
695            };
696            tasks.push((task, count));
697        }
698
699        Ok(tasks)
700    }
701
702    async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
703        let from_peer_id = task.from_peer.id;
704        let to_peer_id = task.to_peer.id;
705        let timeout = task.timeout;
706        let trigger_reason = task.trigger_reason;
707        let result = self
708            .region_migration_manager
709            .submit_region_migration_task(task)
710            .await?;
711        self.handle_submit_region_migration_task_result(
712            from_peer_id,
713            to_peer_id,
714            timeout,
715            trigger_reason,
716            result,
717        )
718        .await
719    }
720
721    async fn handle_submit_region_migration_task_result(
722        &mut self,
723        from_peer_id: DatanodeId,
724        to_peer_id: DatanodeId,
725        timeout: Duration,
726        trigger_reason: RegionMigrationTriggerReason,
727        result: SubmitRegionMigrationTaskResult,
728    ) -> Result<()> {
729        if !result.migrated.is_empty() {
730            let detecting_regions = result
731                .migrated
732                .iter()
733                .map(|region_id| (from_peer_id, *region_id))
734                .collect::<Vec<_>>();
735            self.deregister_failure_detectors(detecting_regions).await;
736            info!(
737                "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
738                to_peer_id, result.migrated,
739            )
740        }
741        if !result.migrating.is_empty() {
742            info!(
743                "Region is still migrating, skipping failover for regions: {:?}",
744                result.migrating
745            );
746        }
747        if !result.region_not_found.is_empty() {
748            let detecting_regions = result
749                .region_not_found
750                .iter()
751                .map(|region_id| (from_peer_id, *region_id))
752                .collect::<Vec<_>>();
753            self.deregister_failure_detectors(detecting_regions).await;
754            info!(
755                "Region route not found, removed failover detectors for regions: {:?}",
756                result.region_not_found
757            );
758        }
759        if !result.table_not_found.is_empty() {
760            let detecting_regions = result
761                .table_not_found
762                .iter()
763                .map(|region_id| (from_peer_id, *region_id))
764                .collect::<Vec<_>>();
765            self.deregister_failure_detectors(detecting_regions).await;
766            info!(
767                "Table is not found, removed failover detectors for regions: {:?}",
768                result.table_not_found
769            );
770        }
771        if !result.leader_changed.is_empty() {
772            let detecting_regions = result
773                .leader_changed
774                .iter()
775                .map(|region_id| (from_peer_id, *region_id))
776                .collect::<Vec<_>>();
777            self.deregister_failure_detectors(detecting_regions).await;
778            info!(
779                "Region's leader peer changed, removed failover detectors for regions: {:?}",
780                result.leader_changed
781            );
782        }
783        if !result.peer_conflict.is_empty() {
784            info!(
785                "Region has peer conflict, ignore failover for regions: {:?}",
786                result.peer_conflict
787            );
788        }
789        if !result.submitted.is_empty() {
790            info!(
791                "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
792                result.submitted,
793                from_peer_id,
794                to_peer_id,
795                result.procedure_id,
796                timeout,
797                trigger_reason,
798            );
799        }
800
801        Ok(())
802    }
803
804    /// Detects the failure of regions.
805    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
806        self.failure_detector
807            .iter()
808            .filter_map(|e| {
809                // Intentionally not place `current_time_millis()` out of the iteration.
810                // The failure detection determination should be happened "just in time",
811                // i.e., failed or not has to be compared with the most recent "now".
812                // Besides, it might reduce the false positive of failure detection,
813                // because during the iteration, heartbeats are coming in as usual,
814                // and the `phi`s are still updating.
815                if !e.failure_detector().is_available(current_time_millis()) {
816                    Some(*e.region_ident())
817                } else {
818                    None
819                }
820            })
821            .collect::<Vec<_>>()
822    }
823
824    /// Returns all regions that registered in the failure detector.
825    fn regions(&self) -> HashSet<RegionId> {
826        self.failure_detector
827            .iter()
828            .map(|e| e.region_ident().1)
829            .collect::<HashSet<_>>()
830    }
831
832    /// Updates the state of corresponding failure detectors.
833    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
834        for region_id in heartbeat.regions {
835            let detecting_region = (heartbeat.datanode_id, region_id);
836            let mut detector = self
837                .failure_detector
838                .region_failure_detector(detecting_region);
839            detector.heartbeat(heartbeat.timestamp);
840        }
841    }
842
843    fn clear(&self) {
844        self.failure_detector.clear();
845    }
846}
847
848#[cfg(test)]
849pub(crate) mod tests {
850    use std::assert_matches::assert_matches;
851    use std::collections::HashMap;
852    use std::sync::{Arc, Mutex};
853    use std::time::Duration;
854
855    use common_meta::ddl::RegionFailureDetectorController;
856    use common_meta::ddl::test_util::{
857        test_create_logical_table_task, test_create_physical_table_task,
858    };
859    use common_meta::key::table_route::{
860        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
861    };
862    use common_meta::key::{TableMetadataManager, runtime_switch};
863    use common_meta::peer::Peer;
864    use common_meta::rpc::router::{Region, RegionRoute};
865    use common_meta::test_util::NoopPeerResolver;
866    use common_telemetry::info;
867    use common_time::util::current_time_millis;
868    use rand::Rng;
869    use store_api::storage::RegionId;
870    use tokio::sync::mpsc::Sender;
871    use tokio::sync::oneshot;
872    use tokio::time::sleep;
873
874    use super::RegionSupervisorSelector;
875    use crate::procedure::region_migration::RegionMigrationTriggerReason;
876    use crate::procedure::region_migration::manager::{
877        RegionMigrationManager, SubmitRegionMigrationTaskResult,
878    };
879    use crate::procedure::region_migration::test_util::TestingEnv;
880    use crate::region::supervisor::{
881        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
882        RegionSupervisorTicker,
883    };
884    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
885
886    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
887        let env = TestingEnv::new();
888        let selector_context = new_test_selector_context();
889        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
890        let context_factory = env.context_factory();
891        let region_migration_manager = Arc::new(RegionMigrationManager::new(
892            env.procedure_manager().clone(),
893            context_factory,
894        ));
895        let runtime_switch_manager =
896            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
897        let peer_resolver = Arc::new(NoopPeerResolver);
898        let (tx, rx) = RegionSupervisor::channel();
899        let kv_backend = env.kv_backend();
900
901        (
902            RegionSupervisor::new(
903                rx,
904                Default::default(),
905                selector_context,
906                RegionSupervisorSelector::NaiveSelector(selector),
907                region_migration_manager,
908                runtime_switch_manager,
909                peer_resolver,
910                kv_backend,
911            ),
912            tx,
913        )
914    }
915
916    #[tokio::test]
917    async fn test_heartbeat() {
918        let (mut supervisor, sender) = new_test_supervisor();
919        tokio::spawn(async move { supervisor.run().await });
920
921        sender
922            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
923                datanode_id: 0,
924                regions: vec![RegionId::new(1, 1)],
925                timestamp: 100,
926            }))
927            .await
928            .unwrap();
929        let (tx, rx) = oneshot::channel();
930        sender.send(Event::Dump(tx)).await.unwrap();
931        let detector = rx.await.unwrap();
932        assert!(detector.contains(&(0, RegionId::new(1, 1))));
933
934        // Clear up
935        sender.send(Event::Clear).await.unwrap();
936        let (tx, rx) = oneshot::channel();
937        sender.send(Event::Dump(tx)).await.unwrap();
938        assert!(rx.await.unwrap().is_empty());
939
940        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
941            let mut rng = rand::rng();
942            let start = current_time_millis();
943            (0..2000)
944                .map(|i| DatanodeHeartbeat {
945                    timestamp: start + i * 1000 + rng.random_range(0..100),
946                    datanode_id,
947                    regions: region_ids
948                        .iter()
949                        .map(|number| RegionId::new(0, *number))
950                        .collect(),
951                })
952                .collect::<Vec<_>>()
953        }
954
955        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
956        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
957        for heartbeat in heartbeats {
958            sender
959                .send(Event::HeartbeatArrived(heartbeat))
960                .await
961                .unwrap();
962        }
963
964        let (tx, rx) = oneshot::channel();
965        sender.send(Event::Dump(tx)).await.unwrap();
966        let detector = rx.await.unwrap();
967        assert_eq!(detector.len(), 3);
968
969        for e in detector.iter() {
970            let fd = e.failure_detector();
971            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
972            let start = last_heartbeat_time;
973
974            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
975            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
976                let now = start + i * 1000;
977                assert_eq!(fd.phi(now), 0.0);
978            }
979
980            // ... then in less than two seconds, phi is above the threshold.
981            // The same effect can be seen in the diagrams in Akka's document.
982            let now = start + acceptable_heartbeat_pause_millis + 1000;
983            assert!(fd.phi(now) < fd.threshold() as _);
984            let now = start + acceptable_heartbeat_pause_millis + 2000;
985            assert!(fd.phi(now) > fd.threshold() as _);
986        }
987    }
988
989    #[tokio::test]
990    async fn test_supervisor_ticker() {
991        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
992        let ticker = RegionSupervisorTicker {
993            tick_handle: Mutex::new(None),
994            initialization_handle: Mutex::new(None),
995            tick_interval: Duration::from_millis(10),
996            initialization_delay: Duration::from_millis(100),
997            initialization_retry_period: Duration::from_millis(100),
998            sender: tx,
999        };
1000        // It's ok if we start the ticker again.
1001        for _ in 0..2 {
1002            ticker.start();
1003            sleep(Duration::from_millis(100)).await;
1004            ticker.stop();
1005            assert!(!rx.is_empty());
1006            while let Ok(event) = rx.try_recv() {
1007                assert_matches!(
1008                    event,
1009                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
1010                );
1011            }
1012            assert!(ticker.initialization_handle.lock().unwrap().is_none());
1013            assert!(ticker.tick_handle.lock().unwrap().is_none());
1014        }
1015    }
1016
1017    #[tokio::test]
1018    async fn test_initialize_all_regions_event_handling() {
1019        common_telemetry::init_default_ut_logging();
1020        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1021        let ticker = RegionSupervisorTicker {
1022            tick_handle: Mutex::new(None),
1023            initialization_handle: Mutex::new(None),
1024            tick_interval: Duration::from_millis(1000),
1025            initialization_delay: Duration::from_millis(50),
1026            initialization_retry_period: Duration::from_millis(50),
1027            sender: tx,
1028        };
1029        ticker.start();
1030        sleep(Duration::from_millis(60)).await;
1031        let handle = tokio::spawn(async move {
1032            let mut counter = 0;
1033            while let Some(event) = rx.recv().await {
1034                if let Event::InitializeAllRegions(tx) = event {
1035                    if counter == 0 {
1036                        // Ignore the first event
1037                        counter += 1;
1038                        continue;
1039                    }
1040                    tx.send(()).unwrap();
1041                    info!("Responded initialize all regions event");
1042                    break;
1043                }
1044            }
1045            rx
1046        });
1047
1048        let rx = handle.await.unwrap();
1049        for _ in 0..3 {
1050            sleep(Duration::from_millis(100)).await;
1051            assert!(rx.is_empty());
1052        }
1053    }
1054
1055    #[tokio::test]
1056    async fn test_initialize_all_regions() {
1057        common_telemetry::init_default_ut_logging();
1058        let (mut supervisor, sender) = new_test_supervisor();
1059        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1060
1061        // Create a physical table metadata
1062        let table_id = 1024;
1063        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1064        create_physical_table_task.set_table_id(table_id);
1065        let table_info = create_physical_table_task.table_info;
1066        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1067            region: Region {
1068                id: RegionId::new(table_id, 0),
1069                ..Default::default()
1070            },
1071            leader_peer: Some(Peer::empty(1)),
1072            ..Default::default()
1073        }]);
1074        let table_route_value = TableRouteValue::Physical(table_route);
1075        table_metadata_manager
1076            .create_table_metadata(table_info, table_route_value, HashMap::new())
1077            .await
1078            .unwrap();
1079
1080        // Create a logical table metadata
1081        let logical_table_id = 1025;
1082        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1083        test_create_logical_table_task.set_table_id(logical_table_id);
1084        let table_info = test_create_logical_table_task.table_info;
1085        let table_route = LogicalTableRouteValue::new(1024);
1086        let table_route_value = TableRouteValue::Logical(table_route);
1087        table_metadata_manager
1088            .create_table_metadata(table_info, table_route_value, HashMap::new())
1089            .await
1090            .unwrap();
1091        tokio::spawn(async move { supervisor.run().await });
1092        let (tx, rx) = oneshot::channel();
1093        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1094        assert!(rx.await.is_ok());
1095
1096        let (tx, rx) = oneshot::channel();
1097        sender.send(Event::Dump(tx)).await.unwrap();
1098        let detector = rx.await.unwrap();
1099        assert_eq!(detector.len(), 1);
1100        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1101    }
1102
1103    #[tokio::test]
1104    async fn test_initialize_all_regions_with_maintenance_mode() {
1105        common_telemetry::init_default_ut_logging();
1106        let (mut supervisor, sender) = new_test_supervisor();
1107
1108        supervisor
1109            .runtime_switch_manager
1110            .set_maintenance_mode()
1111            .await
1112            .unwrap();
1113        tokio::spawn(async move { supervisor.run().await });
1114        let (tx, rx) = oneshot::channel();
1115        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1116        // The sender is dropped, so the receiver will receive an error.
1117        assert!(rx.await.is_err());
1118    }
1119
1120    #[tokio::test]
1121    async fn test_region_failure_detector_controller() {
1122        let (mut supervisor, sender) = new_test_supervisor();
1123        let controller = RegionFailureDetectorControl::new(sender.clone());
1124        tokio::spawn(async move { supervisor.run().await });
1125        let detecting_region = (1, RegionId::new(1, 1));
1126        controller
1127            .register_failure_detectors(vec![detecting_region])
1128            .await;
1129
1130        let (tx, rx) = oneshot::channel();
1131        sender.send(Event::Dump(tx)).await.unwrap();
1132        let detector = rx.await.unwrap();
1133        let region_detector = detector.region_failure_detector(detecting_region).clone();
1134
1135        // Registers failure detector again
1136        controller
1137            .register_failure_detectors(vec![detecting_region])
1138            .await;
1139        let (tx, rx) = oneshot::channel();
1140        sender.send(Event::Dump(tx)).await.unwrap();
1141        let detector = rx.await.unwrap();
1142        let got = detector.region_failure_detector(detecting_region).clone();
1143        assert_eq!(region_detector, got);
1144
1145        controller
1146            .deregister_failure_detectors(vec![detecting_region])
1147            .await;
1148        let (tx, rx) = oneshot::channel();
1149        sender.send(Event::Dump(tx)).await.unwrap();
1150        assert!(rx.await.unwrap().is_empty());
1151    }
1152
1153    #[tokio::test]
1154    async fn test_handle_submit_region_migration_task_result_migrated() {
1155        common_telemetry::init_default_ut_logging();
1156        let (mut supervisor, _) = new_test_supervisor();
1157        let region_id = RegionId::new(1, 1);
1158        let detecting_region = (1, region_id);
1159        supervisor
1160            .register_failure_detectors(vec![detecting_region])
1161            .await;
1162        supervisor.failover_counts.insert(detecting_region, 1);
1163        let result = SubmitRegionMigrationTaskResult {
1164            migrated: vec![region_id],
1165            ..Default::default()
1166        };
1167        supervisor
1168            .handle_submit_region_migration_task_result(
1169                1,
1170                2,
1171                Duration::from_millis(1000),
1172                RegionMigrationTriggerReason::Manual,
1173                result,
1174            )
1175            .await
1176            .unwrap();
1177        assert!(!supervisor.failure_detector.contains(&detecting_region));
1178        assert!(supervisor.failover_counts.is_empty());
1179    }
1180
1181    #[tokio::test]
1182    async fn test_handle_submit_region_migration_task_result_migrating() {
1183        common_telemetry::init_default_ut_logging();
1184        let (mut supervisor, _) = new_test_supervisor();
1185        let region_id = RegionId::new(1, 1);
1186        let detecting_region = (1, region_id);
1187        supervisor
1188            .register_failure_detectors(vec![detecting_region])
1189            .await;
1190        supervisor.failover_counts.insert(detecting_region, 1);
1191        let result = SubmitRegionMigrationTaskResult {
1192            migrating: vec![region_id],
1193            ..Default::default()
1194        };
1195        supervisor
1196            .handle_submit_region_migration_task_result(
1197                1,
1198                2,
1199                Duration::from_millis(1000),
1200                RegionMigrationTriggerReason::Manual,
1201                result,
1202            )
1203            .await
1204            .unwrap();
1205        assert!(supervisor.failure_detector.contains(&detecting_region));
1206        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1207    }
1208
1209    #[tokio::test]
1210    async fn test_handle_submit_region_migration_task_result_table_not_found() {
1211        common_telemetry::init_default_ut_logging();
1212        let (mut supervisor, _) = new_test_supervisor();
1213        let region_id = RegionId::new(1, 1);
1214        let detecting_region = (1, region_id);
1215        supervisor
1216            .register_failure_detectors(vec![detecting_region])
1217            .await;
1218        supervisor.failover_counts.insert(detecting_region, 1);
1219        let result = SubmitRegionMigrationTaskResult {
1220            table_not_found: vec![region_id],
1221            ..Default::default()
1222        };
1223        supervisor
1224            .handle_submit_region_migration_task_result(
1225                1,
1226                2,
1227                Duration::from_millis(1000),
1228                RegionMigrationTriggerReason::Manual,
1229                result,
1230            )
1231            .await
1232            .unwrap();
1233        assert!(!supervisor.failure_detector.contains(&detecting_region));
1234        assert!(supervisor.failover_counts.is_empty());
1235    }
1236
1237    #[tokio::test]
1238    async fn test_handle_submit_region_migration_task_result_region_not_found() {
1239        common_telemetry::init_default_ut_logging();
1240        let (mut supervisor, _) = new_test_supervisor();
1241        let region_id = RegionId::new(1, 1);
1242        let detecting_region = (1, region_id);
1243        supervisor
1244            .register_failure_detectors(vec![detecting_region])
1245            .await;
1246        supervisor.failover_counts.insert(detecting_region, 1);
1247        let result = SubmitRegionMigrationTaskResult {
1248            region_not_found: vec![region_id],
1249            ..Default::default()
1250        };
1251        supervisor
1252            .handle_submit_region_migration_task_result(
1253                1,
1254                2,
1255                Duration::from_millis(1000),
1256                RegionMigrationTriggerReason::Manual,
1257                result,
1258            )
1259            .await
1260            .unwrap();
1261        assert!(!supervisor.failure_detector.contains(&detecting_region));
1262        assert!(supervisor.failover_counts.is_empty());
1263    }
1264
1265    #[tokio::test]
1266    async fn test_handle_submit_region_migration_task_result_leader_changed() {
1267        common_telemetry::init_default_ut_logging();
1268        let (mut supervisor, _) = new_test_supervisor();
1269        let region_id = RegionId::new(1, 1);
1270        let detecting_region = (1, region_id);
1271        supervisor
1272            .register_failure_detectors(vec![detecting_region])
1273            .await;
1274        supervisor.failover_counts.insert(detecting_region, 1);
1275        let result = SubmitRegionMigrationTaskResult {
1276            leader_changed: vec![region_id],
1277            ..Default::default()
1278        };
1279        supervisor
1280            .handle_submit_region_migration_task_result(
1281                1,
1282                2,
1283                Duration::from_millis(1000),
1284                RegionMigrationTriggerReason::Manual,
1285                result,
1286            )
1287            .await
1288            .unwrap();
1289        assert!(!supervisor.failure_detector.contains(&detecting_region));
1290        assert!(supervisor.failover_counts.is_empty());
1291    }
1292
1293    #[tokio::test]
1294    async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1295        common_telemetry::init_default_ut_logging();
1296        let (mut supervisor, _) = new_test_supervisor();
1297        let region_id = RegionId::new(1, 1);
1298        let detecting_region = (1, region_id);
1299        supervisor
1300            .register_failure_detectors(vec![detecting_region])
1301            .await;
1302        supervisor.failover_counts.insert(detecting_region, 1);
1303        let result = SubmitRegionMigrationTaskResult {
1304            peer_conflict: vec![region_id],
1305            ..Default::default()
1306        };
1307        supervisor
1308            .handle_submit_region_migration_task_result(
1309                1,
1310                2,
1311                Duration::from_millis(1000),
1312                RegionMigrationTriggerReason::Manual,
1313                result,
1314            )
1315            .await
1316            .unwrap();
1317        assert!(supervisor.failure_detector.contains(&detecting_region));
1318        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1319    }
1320
1321    #[tokio::test]
1322    async fn test_handle_submit_region_migration_task_result_submitted() {
1323        common_telemetry::init_default_ut_logging();
1324        let (mut supervisor, _) = new_test_supervisor();
1325        let region_id = RegionId::new(1, 1);
1326        let detecting_region = (1, region_id);
1327        supervisor
1328            .register_failure_detectors(vec![detecting_region])
1329            .await;
1330        supervisor.failover_counts.insert(detecting_region, 1);
1331        let result = SubmitRegionMigrationTaskResult {
1332            submitted: vec![region_id],
1333            ..Default::default()
1334        };
1335        supervisor
1336            .handle_submit_region_migration_task_result(
1337                1,
1338                2,
1339                Duration::from_millis(1000),
1340                RegionMigrationTriggerReason::Manual,
1341                result,
1342            )
1343            .await
1344            .unwrap();
1345        assert!(supervisor.failure_detector.contains(&detecting_region));
1346        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1347    }
1348}