Skip to main content

meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47    RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
58/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
59/// and a timestamp indicating when the heartbeat was sent.
60#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62    datanode_id: DatanodeId,
63    // TODO(weny): Considers collecting the memtable size in regions.
64    regions: Vec<RegionId>,
65    timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69    fn from(value: &Stat) -> Self {
70        DatanodeHeartbeat {
71            datanode_id: value.id,
72            regions: value.region_stats.iter().map(|x| x.id).collect(),
73            timestamp: value.timestamp_millis,
74        }
75    }
76}
77
78/// `Event` represents various types of events that can be processed by the region supervisor.
79/// These events are crucial for managing state transitions and handling specific scenarios
80/// in the region lifecycle.
81///
82/// Variants:
83/// - `Tick`: This event is used to trigger region failure detection periodically.
84/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
85/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
86/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
87/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
88/// - `Clear`: This event is used to reset the state of the supervisor, typically used
89///   when a system-wide reset or reinitialization is needed.
90/// - `Dump`: (Available only in test) This event triggers a dump of the
91///   current state for debugging purposes. It allows developers to inspect the internal state
92///   of the supervisor during tests.
93pub(crate) enum Event {
94    Tick,
95    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
96    RegisterFailureDetectors(Vec<DetectingRegion>),
97    DeregisterFailureDetectors(Vec<DetectingRegion>),
98    HeartbeatArrived(DatanodeHeartbeat),
99    Clear,
100    #[cfg(test)]
101    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
102}
103
104impl Debug for Event {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Tick => write!(f, "Tick"),
108            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
109            Self::Clear => write!(f, "Clear"),
110            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
111            Self::RegisterFailureDetectors(arg0) => f
112                .debug_tuple("RegisterFailureDetectors")
113                .field(arg0)
114                .finish(),
115            Self::DeregisterFailureDetectors(arg0) => f
116                .debug_tuple("DeregisterFailureDetectors")
117                .field(arg0)
118                .finish(),
119            #[cfg(test)]
120            Self::Dump(_) => f.debug_struct("Dump").finish(),
121        }
122    }
123}
124
125pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
126
127/// A background job to generate [`Event::Tick`] type events.
128#[derive(Debug)]
129pub struct RegionSupervisorTicker {
130    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
131    tick_handle: Mutex<Option<JoinHandle<()>>>,
132
133    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
134    initialization_handle: Mutex<Option<JoinHandle<()>>>,
135
136    /// The interval of tick.
137    tick_interval: Duration,
138
139    /// The delay before initializing all region failure detectors.
140    initialization_delay: Duration,
141
142    /// The retry period for initializing all region failure detectors.
143    initialization_retry_period: Duration,
144
145    /// Sends [Event]s.
146    sender: Sender<Event>,
147}
148
149#[async_trait]
150impl LeadershipChangeListener for RegionSupervisorTicker {
151    fn name(&self) -> &'static str {
152        "RegionSupervisorTicker"
153    }
154
155    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
156        self.start();
157        Ok(())
158    }
159
160    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
161        self.stop();
162        Ok(())
163    }
164}
165
166impl RegionSupervisorTicker {
167    pub(crate) fn new(
168        tick_interval: Duration,
169        initialization_delay: Duration,
170        initialization_retry_period: Duration,
171        sender: Sender<Event>,
172    ) -> Self {
173        info!(
174            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
175            tick_interval, initialization_delay, initialization_retry_period
176        );
177        Self {
178            tick_handle: Mutex::new(None),
179            initialization_handle: Mutex::new(None),
180            tick_interval,
181            initialization_delay,
182            initialization_retry_period,
183            sender,
184        }
185    }
186
187    /// Starts the ticker.
188    pub fn start(&self) {
189        let mut handle = self.tick_handle.lock().unwrap();
190        if handle.is_none() {
191            let sender = self.sender.clone();
192            let tick_interval = self.tick_interval;
193            let initialization_delay = self.initialization_delay;
194
195            let mut initialization_interval = interval_at(
196                tokio::time::Instant::now() + initialization_delay,
197                self.initialization_retry_period,
198            );
199            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200            let initialization_handler = common_runtime::spawn_global(async move {
201                loop {
202                    initialization_interval.tick().await;
203                    let (tx, rx) = oneshot::channel();
204                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
205                        info!(
206                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
207                        );
208                        break;
209                    }
210                    if rx.await.is_ok() {
211                        info!("All region failure detectors are initialized.");
212                        break;
213                    }
214                }
215            });
216            *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
217
218            let sender = self.sender.clone();
219            let ticker_loop = tokio::spawn(async move {
220                let mut tick_interval = interval(tick_interval);
221                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222
223                if let Err(err) = sender.send(Event::Clear).await {
224                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
225                    return;
226                }
227                loop {
228                    tick_interval.tick().await;
229                    if sender.send(Event::Tick).await.is_err() {
230                        info!("EventReceiver is dropped, tick loop is stopped");
231                        break;
232                    }
233                }
234            });
235            *handle = Some(ticker_loop);
236        }
237    }
238
239    /// Stops the ticker.
240    pub fn stop(&self) {
241        let handle = self.tick_handle.lock().unwrap().take();
242        if let Some(handle) = handle {
243            handle.abort();
244            info!("The tick loop is stopped.");
245        }
246        let initialization_handler = self.initialization_handle.lock().unwrap().take();
247        if let Some(initialization_handler) = initialization_handler {
248            initialization_handler.abort();
249            info!("The initialization loop is stopped.");
250        }
251    }
252}
253
254impl Drop for RegionSupervisorTicker {
255    fn drop(&mut self) {
256        self.stop();
257    }
258}
259
260pub type RegionSupervisorRef = Arc<RegionSupervisor>;
261
262/// The default tick interval.
263pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
264/// The default initialization retry period.
265pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
266
267/// Selector for region supervisor.
268pub enum RegionSupervisorSelector {
269    NaiveSelector(SelectorRef),
270    RegionStatAwareSelector(RegionStatAwareSelectorRef),
271}
272
273/// The [`RegionSupervisor`] is used to detect Region failures
274/// and initiate Region failover upon detection, ensuring uninterrupted region service.
275pub struct RegionSupervisor {
276    /// Used to detect the failure of regions.
277    failure_detector: RegionFailureDetector,
278    /// Tracks the number of failovers for each region.
279    failover_counts: HashMap<DetectingRegion, u32>,
280    /// Receives [Event]s.
281    receiver: Receiver<Event>,
282    /// The context of [`SelectorRef`]
283    selector_context: SelectorContext,
284    /// Candidate node selector.
285    selector: RegionSupervisorSelector,
286    /// Region migration manager.
287    region_migration_manager: RegionMigrationManagerRef,
288    /// The maintenance mode manager.
289    runtime_switch_manager: RuntimeSwitchManagerRef,
290    /// Peer resolver
291    peer_resolver: PeerResolverRef,
292    /// The kv backend.
293    kv_backend: KvBackendRef,
294    /// The meta state, used to check if the current metasrv is the leader.
295    state: Option<StateRef>,
296}
297
298/// Controller for managing failure detectors for regions.
299#[derive(Debug, Clone)]
300pub struct RegionFailureDetectorControl {
301    sender: Sender<Event>,
302}
303
304impl RegionFailureDetectorControl {
305    pub(crate) fn new(sender: Sender<Event>) -> Self {
306        Self { sender }
307    }
308}
309
310#[async_trait::async_trait]
311impl RegionFailureDetectorController for RegionFailureDetectorControl {
312    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
313        if let Err(err) = self
314            .sender
315            .send(Event::RegisterFailureDetectors(detecting_regions))
316            .await
317        {
318            error!(err; "RegionSupervisor has stop receiving heartbeat.");
319        }
320    }
321
322    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
323        if let Err(err) = self
324            .sender
325            .send(Event::DeregisterFailureDetectors(detecting_regions))
326            .await
327        {
328            error!(err; "RegionSupervisor has stop receiving heartbeat.");
329        }
330    }
331}
332
333/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
334#[derive(Clone)]
335pub(crate) struct HeartbeatAcceptor {
336    sender: Sender<Event>,
337}
338
339impl HeartbeatAcceptor {
340    pub(crate) fn new(sender: Sender<Event>) -> Self {
341        Self { sender }
342    }
343
344    /// Accepts heartbeats from datanodes.
345    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
346        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
347            error!(err; "RegionSupervisor has stop receiving heartbeat.");
348        }
349    }
350}
351
352impl RegionSupervisor {
353    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
354    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
355        tokio::sync::mpsc::channel(1024)
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn new(
360        event_receiver: Receiver<Event>,
361        options: PhiAccrualFailureDetectorOptions,
362        selector_context: SelectorContext,
363        selector: RegionSupervisorSelector,
364        region_migration_manager: RegionMigrationManagerRef,
365        runtime_switch_manager: RuntimeSwitchManagerRef,
366        peer_resolver: PeerResolverRef,
367        kv_backend: KvBackendRef,
368    ) -> Self {
369        Self {
370            failure_detector: RegionFailureDetector::new(options),
371            failover_counts: HashMap::new(),
372            receiver: event_receiver,
373            selector_context,
374            selector,
375            region_migration_manager,
376            runtime_switch_manager,
377            peer_resolver,
378            kv_backend,
379            state: None,
380        }
381    }
382
383    /// Sets the meta state.
384    pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385        self.state = Some(state);
386        self
387    }
388
389    /// Runs the main loop.
390    pub(crate) async fn run(&mut self) {
391        while let Some(event) = self.receiver.recv().await {
392            if let Some(state) = self.state.as_ref()
393                && !state.read().unwrap().is_leader()
394            {
395                warn!(
396                    "The current metasrv is not the leader, ignore {:?} event",
397                    event
398                );
399                continue;
400            }
401
402            match event {
403                Event::InitializeAllRegions(sender) => {
404                    match self.is_maintenance_mode_enabled().await {
405                        Ok(false) => {}
406                        Ok(true) => {
407                            warn!(
408                                "Skipping initialize all regions since maintenance mode is enabled."
409                            );
410                            continue;
411                        }
412                        Err(err) => {
413                            error!(err; "Failed to check maintenance mode during initialize all regions.");
414                            continue;
415                        }
416                    }
417
418                    if let Err(err) = self.initialize_all().await {
419                        error!(err; "Failed to initialize all regions.");
420                    } else {
421                        // Ignore the error.
422                        let _ = sender.send(());
423                    }
424                }
425                Event::Tick => {
426                    let regions = self.detect_region_failure();
427                    self.handle_region_failures(regions).await;
428                }
429                Event::RegisterFailureDetectors(detecting_regions) => {
430                    self.register_failure_detectors(detecting_regions).await
431                }
432                Event::DeregisterFailureDetectors(detecting_regions) => {
433                    self.deregister_failure_detectors(detecting_regions).await
434                }
435                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
436                Event::Clear => {
437                    self.clear();
438                    info!("Region supervisor is initialized.");
439                }
440                #[cfg(test)]
441                Event::Dump(sender) => {
442                    let _ = sender.send(self.failure_detector.dump());
443                }
444            }
445        }
446        info!("RegionSupervisor is stopped!");
447    }
448
449    async fn initialize_all(&self) -> Result<()> {
450        let now = Instant::now();
451        let regions = self.regions();
452        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
453        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
454            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
455        })
456        .into_stream();
457
458        let mut stream = stream
459            .map_ok(|(_, value)| {
460                TableRouteValue::try_from_raw_value(&value)
461                    .context(error::TableMetadataManagerSnafu)
462            })
463            .boxed();
464        let mut detecting_regions = Vec::new();
465        while let Some(route) = stream
466            .try_next()
467            .await
468            .context(error::TableMetadataManagerSnafu)?
469        {
470            let route = route?;
471            if !route.is_physical() {
472                continue;
473            }
474
475            let physical_table_route = route.into_physical_table_route();
476            physical_table_route
477                .region_routes
478                .iter()
479                .for_each(|region_route| {
480                    if !regions.contains(&region_route.region.id)
481                        && let Some(leader_peer) = &region_route.leader_peer
482                    {
483                        detecting_regions.push((leader_peer.id, region_route.region.id));
484                    }
485                });
486        }
487
488        let num_detecting_regions = detecting_regions.len();
489        if !detecting_regions.is_empty() {
490            self.register_failure_detectors(detecting_regions).await;
491        }
492
493        info!(
494            "Initialize {} region failure detectors, elapsed: {:?}",
495            num_detecting_regions,
496            now.elapsed()
497        );
498
499        Ok(())
500    }
501
502    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
503        let ts_millis = current_time_millis();
504        for region in detecting_regions {
505            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
506            self.failure_detector
507                .maybe_init_region_failure_detector(region, ts_millis);
508        }
509    }
510
511    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
512        for region in detecting_regions {
513            self.failure_detector.remove(&region);
514            self.failover_counts.remove(&region);
515        }
516    }
517
518    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
519        if regions.is_empty() {
520            return;
521        }
522        match self.is_maintenance_mode_enabled().await {
523            Ok(false) => {}
524            Ok(true) => {
525                warn!(
526                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
527                    regions
528                );
529                return;
530            }
531            Err(err) => {
532                error!(err; "Failed to check maintenance mode");
533                return;
534            }
535        }
536
537        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
538        let migrating_regions = regions
539            .extract_if(.., |(_, region_id)| {
540                self.region_migration_manager.tracker().contains(*region_id)
541            })
542            .collect::<Vec<_>>();
543
544        for (datanode_id, region_id) in migrating_regions {
545            debug!(
546                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
547            );
548        }
549
550        if regions.is_empty() {
551            // If all detected regions are failover or migrating, just return.
552            return;
553        }
554
555        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
556            HashMap::with_capacity(regions.len());
557        for (datanode_id, region_id) in regions {
558            grouped_regions
559                .entry(datanode_id)
560                .or_default()
561                .push(region_id);
562        }
563
564        for (datanode_id, regions) in grouped_regions {
565            warn!(
566                "Detects region failures on datanode: {}, regions: {:?}",
567                datanode_id, regions
568            );
569            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
570            // because there may be false positives in failure detection on the datanode.
571            // So we only consider the datanode that reports the failure.
572            let failed_datanodes = [datanode_id];
573            match self
574                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
575                .await
576            {
577                Ok(tasks) => {
578                    let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
579                    for (task, count) in tasks {
580                        grouped_tasks
581                            .entry((task.from_peer.id, task.to_peer.id))
582                            .or_default()
583                            .push((task, count));
584                    }
585
586                    for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
587                        if tasks.is_empty() {
588                            continue;
589                        }
590                        let task = RegionMigrationTaskBatch::from_tasks(tasks);
591                        let region_ids = task.region_ids.clone();
592                        if let Err(err) = self.do_failover_tasks(task).await {
593                            error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
594                        }
595                    }
596                }
597                Err(err) => error!(err; "Failed to generate failover tasks"),
598            }
599        }
600    }
601
602    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
603        self.runtime_switch_manager
604            .maintenance_mode()
605            .await
606            .context(error::RuntimeSwitchManagerSnafu)
607    }
608
609    async fn select_peers(
610        &self,
611        from_peer_id: DatanodeId,
612        regions: &[RegionId],
613        failure_datanodes: &[DatanodeId],
614    ) -> Result<Vec<(RegionId, Peer)>> {
615        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
616        match &self.selector {
617            RegionSupervisorSelector::NaiveSelector(selector) => {
618                let opt = SelectorOptions {
619                    min_required_items: regions.len(),
620                    allow_duplication: true,
621                    exclude_peer_ids,
622                    workload_filter: Some(accept_ingest_workload),
623                    extensions: Default::default(),
624                };
625                let peers = selector.select(&self.selector_context, opt).await?;
626                ensure!(
627                    peers.len() == regions.len(),
628                    error::NoEnoughAvailableNodeSnafu {
629                        required: regions.len(),
630                        available: peers.len(),
631                        select_target: SelectTarget::Datanode,
632                    }
633                );
634                let region_peers = regions
635                    .iter()
636                    .zip(peers)
637                    .map(|(region_id, peer)| (*region_id, peer))
638                    .collect::<Vec<_>>();
639
640                Ok(region_peers)
641            }
642            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
643                let peers = selector
644                    .select(
645                        &self.selector_context,
646                        from_peer_id,
647                        regions,
648                        exclude_peer_ids,
649                    )
650                    .await?;
651                ensure!(
652                    peers.len() == regions.len(),
653                    error::NoEnoughAvailableNodeSnafu {
654                        required: regions.len(),
655                        available: peers.len(),
656                        select_target: SelectTarget::Datanode,
657                    }
658                );
659
660                Ok(peers)
661            }
662        }
663    }
664
665    async fn generate_failover_tasks(
666        &mut self,
667        from_peer_id: DatanodeId,
668        regions: &[RegionId],
669        failed_datanodes: &[DatanodeId],
670    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
671        let mut tasks = Vec::with_capacity(regions.len());
672        let from_peer = self
673            .peer_resolver
674            .datanode(from_peer_id)
675            .await
676            .ok()
677            .flatten()
678            .unwrap_or_else(|| Peer::empty(from_peer_id));
679
680        let region_peers = self
681            .select_peers(from_peer_id, regions, failed_datanodes)
682            .await?;
683
684        for (region_id, peer) in region_peers {
685            let count = *self
686                .failover_counts
687                .entry((from_peer_id, region_id))
688                .and_modify(|count| *count += 1)
689                .or_insert(1);
690            let task = RegionMigrationProcedureTask {
691                region_id,
692                from_peer: from_peer.clone(),
693                to_peer: peer,
694                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
695                trigger_reason: RegionMigrationTriggerReason::Failover,
696            };
697            tasks.push((task, count));
698        }
699
700        Ok(tasks)
701    }
702
703    async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
704        let from_peer_id = task.from_peer.id;
705        let to_peer_id = task.to_peer.id;
706        let timeout = task.timeout;
707        let trigger_reason = task.trigger_reason;
708        let result = self
709            .region_migration_manager
710            .submit_region_migration_task(task)
711            .await?;
712        self.handle_submit_region_migration_task_result(
713            from_peer_id,
714            to_peer_id,
715            timeout,
716            trigger_reason,
717            result,
718        )
719        .await
720    }
721
722    async fn handle_submit_region_migration_task_result(
723        &mut self,
724        from_peer_id: DatanodeId,
725        to_peer_id: DatanodeId,
726        timeout: Duration,
727        trigger_reason: RegionMigrationTriggerReason,
728        result: SubmitRegionMigrationTaskResult,
729    ) -> Result<()> {
730        if !result.migrated.is_empty() {
731            let detecting_regions = result
732                .migrated
733                .iter()
734                .map(|region_id| (from_peer_id, *region_id))
735                .collect::<Vec<_>>();
736            self.deregister_failure_detectors(detecting_regions).await;
737            info!(
738                "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
739                to_peer_id, result.migrated,
740            )
741        }
742        if !result.migrating.is_empty() {
743            info!(
744                "Region is still migrating, skipping failover for regions: {:?}",
745                result.migrating
746            );
747        }
748        if !result.region_not_found.is_empty() {
749            let detecting_regions = result
750                .region_not_found
751                .iter()
752                .map(|region_id| (from_peer_id, *region_id))
753                .collect::<Vec<_>>();
754            self.deregister_failure_detectors(detecting_regions).await;
755            info!(
756                "Region route not found, removed failover detectors for regions: {:?}",
757                result.region_not_found
758            );
759        }
760        if !result.table_not_found.is_empty() {
761            let detecting_regions = result
762                .table_not_found
763                .iter()
764                .map(|region_id| (from_peer_id, *region_id))
765                .collect::<Vec<_>>();
766            self.deregister_failure_detectors(detecting_regions).await;
767            info!(
768                "Table is not found, removed failover detectors for regions: {:?}",
769                result.table_not_found
770            );
771        }
772        if !result.leader_changed.is_empty() {
773            let detecting_regions = result
774                .leader_changed
775                .iter()
776                .map(|region_id| (from_peer_id, *region_id))
777                .collect::<Vec<_>>();
778            self.deregister_failure_detectors(detecting_regions).await;
779            info!(
780                "Region's leader peer changed, removed failover detectors for regions: {:?}",
781                result.leader_changed
782            );
783        }
784        if !result.peer_conflict.is_empty() {
785            info!(
786                "Region has peer conflict, ignore failover for regions: {:?}",
787                result.peer_conflict
788            );
789        }
790        if !result.submitted.is_empty() {
791            info!(
792                "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
793                result.submitted,
794                from_peer_id,
795                to_peer_id,
796                result.procedure_id,
797                timeout,
798                trigger_reason,
799            );
800        }
801
802        Ok(())
803    }
804
805    /// Detects the failure of regions.
806    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
807        self.failure_detector
808            .iter()
809            .filter_map(|e| {
810                // Intentionally not place `current_time_millis()` out of the iteration.
811                // The failure detection determination should be happened "just in time",
812                // i.e., failed or not has to be compared with the most recent "now".
813                // Besides, it might reduce the false positive of failure detection,
814                // because during the iteration, heartbeats are coming in as usual,
815                // and the `phi`s are still updating.
816                if !e.failure_detector().is_available(current_time_millis()) {
817                    Some(*e.region_ident())
818                } else {
819                    None
820                }
821            })
822            .collect::<Vec<_>>()
823    }
824
825    /// Returns all regions that registered in the failure detector.
826    fn regions(&self) -> HashSet<RegionId> {
827        self.failure_detector
828            .iter()
829            .map(|e| e.region_ident().1)
830            .collect::<HashSet<_>>()
831    }
832
833    /// Updates the state of corresponding failure detectors.
834    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
835        for region_id in heartbeat.regions {
836            let detecting_region = (heartbeat.datanode_id, region_id);
837            let mut detector = self
838                .failure_detector
839                .region_failure_detector(detecting_region);
840            detector.heartbeat(heartbeat.timestamp);
841        }
842    }
843
844    fn clear(&self) {
845        self.failure_detector.clear();
846    }
847}
848
849#[cfg(test)]
850pub(crate) mod tests {
851    use std::assert_matches;
852    use std::collections::HashMap;
853    use std::sync::{Arc, Mutex};
854    use std::time::Duration;
855
856    use common_meta::ddl::RegionFailureDetectorController;
857    use common_meta::ddl::test_util::{
858        test_create_logical_table_task, test_create_physical_table_task,
859    };
860    use common_meta::key::table_route::{
861        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
862    };
863    use common_meta::key::{TableMetadataManager, runtime_switch};
864    use common_meta::peer::Peer;
865    use common_meta::rpc::router::{Region, RegionRoute};
866    use common_meta::test_util::NoopPeerResolver;
867    use common_telemetry::info;
868    use common_time::util::current_time_millis;
869    use rand::Rng;
870    use store_api::storage::RegionId;
871    use tokio::sync::mpsc::Sender;
872    use tokio::sync::oneshot;
873    use tokio::time::sleep;
874
875    use super::RegionSupervisorSelector;
876    use crate::procedure::region_migration::RegionMigrationTriggerReason;
877    use crate::procedure::region_migration::manager::{
878        RegionMigrationManager, SubmitRegionMigrationTaskResult,
879    };
880    use crate::procedure::region_migration::test_util::TestingEnv;
881    use crate::region::supervisor::{
882        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
883        RegionSupervisorTicker,
884    };
885    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
886
887    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
888        let env = TestingEnv::new();
889        let selector_context = new_test_selector_context();
890        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
891        let context_factory = env.context_factory();
892        let region_migration_manager = Arc::new(RegionMigrationManager::new(
893            env.procedure_manager().clone(),
894            context_factory,
895        ));
896        let runtime_switch_manager =
897            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
898        let peer_resolver = Arc::new(NoopPeerResolver);
899        let (tx, rx) = RegionSupervisor::channel();
900        let kv_backend = env.kv_backend();
901
902        (
903            RegionSupervisor::new(
904                rx,
905                Default::default(),
906                selector_context,
907                RegionSupervisorSelector::NaiveSelector(selector),
908                region_migration_manager,
909                runtime_switch_manager,
910                peer_resolver,
911                kv_backend,
912            ),
913            tx,
914        )
915    }
916
917    #[tokio::test]
918    async fn test_heartbeat() {
919        let (mut supervisor, sender) = new_test_supervisor();
920        tokio::spawn(async move { supervisor.run().await });
921
922        sender
923            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
924                datanode_id: 0,
925                regions: vec![RegionId::new(1, 1)],
926                timestamp: 100,
927            }))
928            .await
929            .unwrap();
930        let (tx, rx) = oneshot::channel();
931        sender.send(Event::Dump(tx)).await.unwrap();
932        let detector = rx.await.unwrap();
933        assert!(detector.contains(&(0, RegionId::new(1, 1))));
934
935        // Clear up
936        sender.send(Event::Clear).await.unwrap();
937        let (tx, rx) = oneshot::channel();
938        sender.send(Event::Dump(tx)).await.unwrap();
939        assert!(rx.await.unwrap().is_empty());
940
941        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
942            let mut rng = rand::rng();
943            let start = current_time_millis();
944            (0..2000)
945                .map(|i| DatanodeHeartbeat {
946                    timestamp: start + i * 1000 + rng.random_range(0..100),
947                    datanode_id,
948                    regions: region_ids
949                        .iter()
950                        .map(|number| RegionId::new(0, *number))
951                        .collect(),
952                })
953                .collect::<Vec<_>>()
954        }
955
956        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
957        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
958        for heartbeat in heartbeats {
959            sender
960                .send(Event::HeartbeatArrived(heartbeat))
961                .await
962                .unwrap();
963        }
964
965        let (tx, rx) = oneshot::channel();
966        sender.send(Event::Dump(tx)).await.unwrap();
967        let detector = rx.await.unwrap();
968        assert_eq!(detector.len(), 3);
969
970        for e in detector.iter() {
971            let fd = e.failure_detector();
972            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
973            let start = last_heartbeat_time;
974
975            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
976            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
977                let now = start + i * 1000;
978                assert_eq!(fd.phi(now), 0.0);
979            }
980
981            // ... then in less than two seconds, phi is above the threshold.
982            // The same effect can be seen in the diagrams in Akka's document.
983            let now = start + acceptable_heartbeat_pause_millis + 1000;
984            assert!(fd.phi(now) < fd.threshold() as _);
985            let now = start + acceptable_heartbeat_pause_millis + 2000;
986            assert!(fd.phi(now) > fd.threshold() as _);
987        }
988    }
989
990    #[tokio::test]
991    async fn test_supervisor_ticker() {
992        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
993        let ticker = RegionSupervisorTicker {
994            tick_handle: Mutex::new(None),
995            initialization_handle: Mutex::new(None),
996            tick_interval: Duration::from_millis(10),
997            initialization_delay: Duration::from_millis(100),
998            initialization_retry_period: Duration::from_millis(100),
999            sender: tx,
1000        };
1001        // It's ok if we start the ticker again.
1002        for _ in 0..2 {
1003            ticker.start();
1004            sleep(Duration::from_millis(100)).await;
1005            ticker.stop();
1006            assert!(!rx.is_empty());
1007            while let Ok(event) = rx.try_recv() {
1008                assert_matches!(
1009                    event,
1010                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
1011                );
1012            }
1013            assert!(ticker.initialization_handle.lock().unwrap().is_none());
1014            assert!(ticker.tick_handle.lock().unwrap().is_none());
1015        }
1016    }
1017
1018    #[tokio::test]
1019    async fn test_initialize_all_regions_event_handling() {
1020        common_telemetry::init_default_ut_logging();
1021        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1022        let ticker = RegionSupervisorTicker {
1023            tick_handle: Mutex::new(None),
1024            initialization_handle: Mutex::new(None),
1025            tick_interval: Duration::from_millis(1000),
1026            initialization_delay: Duration::from_millis(50),
1027            initialization_retry_period: Duration::from_millis(50),
1028            sender: tx,
1029        };
1030        ticker.start();
1031        sleep(Duration::from_millis(60)).await;
1032        let handle = tokio::spawn(async move {
1033            let mut counter = 0;
1034            while let Some(event) = rx.recv().await {
1035                if let Event::InitializeAllRegions(tx) = event {
1036                    if counter == 0 {
1037                        // Ignore the first event
1038                        counter += 1;
1039                        continue;
1040                    }
1041                    tx.send(()).unwrap();
1042                    info!("Responded initialize all regions event");
1043                    break;
1044                }
1045            }
1046            rx
1047        });
1048
1049        let rx = handle.await.unwrap();
1050        for _ in 0..3 {
1051            sleep(Duration::from_millis(100)).await;
1052            assert!(rx.is_empty());
1053        }
1054    }
1055
1056    #[tokio::test]
1057    async fn test_initialize_all_regions() {
1058        common_telemetry::init_default_ut_logging();
1059        let (mut supervisor, sender) = new_test_supervisor();
1060        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1061
1062        // Create a physical table metadata
1063        let table_id = 1024;
1064        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1065        create_physical_table_task.set_table_id(table_id);
1066        let table_info = create_physical_table_task.table_info;
1067        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1068            region: Region {
1069                id: RegionId::new(table_id, 0),
1070                ..Default::default()
1071            },
1072            leader_peer: Some(Peer::empty(1)),
1073            ..Default::default()
1074        }]);
1075        let table_route_value = TableRouteValue::Physical(table_route);
1076        table_metadata_manager
1077            .create_table_metadata(table_info, table_route_value, HashMap::new())
1078            .await
1079            .unwrap();
1080
1081        // Create a logical table metadata
1082        let logical_table_id = 1025;
1083        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1084        test_create_logical_table_task.set_table_id(logical_table_id);
1085        let table_info = test_create_logical_table_task.table_info;
1086        let table_route = LogicalTableRouteValue::new(1024);
1087        let table_route_value = TableRouteValue::Logical(table_route);
1088        table_metadata_manager
1089            .create_table_metadata(table_info, table_route_value, HashMap::new())
1090            .await
1091            .unwrap();
1092        tokio::spawn(async move { supervisor.run().await });
1093        let (tx, rx) = oneshot::channel();
1094        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1095        assert!(rx.await.is_ok());
1096
1097        let (tx, rx) = oneshot::channel();
1098        sender.send(Event::Dump(tx)).await.unwrap();
1099        let detector = rx.await.unwrap();
1100        assert_eq!(detector.len(), 1);
1101        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1102    }
1103
1104    #[tokio::test]
1105    async fn test_initialize_all_regions_with_maintenance_mode() {
1106        common_telemetry::init_default_ut_logging();
1107        let (mut supervisor, sender) = new_test_supervisor();
1108
1109        supervisor
1110            .runtime_switch_manager
1111            .set_maintenance_mode()
1112            .await
1113            .unwrap();
1114        tokio::spawn(async move { supervisor.run().await });
1115        let (tx, rx) = oneshot::channel();
1116        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1117        // The sender is dropped, so the receiver will receive an error.
1118        assert!(rx.await.is_err());
1119    }
1120
1121    #[tokio::test]
1122    async fn test_region_failure_detector_controller() {
1123        let (mut supervisor, sender) = new_test_supervisor();
1124        let controller = RegionFailureDetectorControl::new(sender.clone());
1125        tokio::spawn(async move { supervisor.run().await });
1126        let detecting_region = (1, RegionId::new(1, 1));
1127        controller
1128            .register_failure_detectors(vec![detecting_region])
1129            .await;
1130
1131        let (tx, rx) = oneshot::channel();
1132        sender.send(Event::Dump(tx)).await.unwrap();
1133        let detector = rx.await.unwrap();
1134        let region_detector = detector.region_failure_detector(detecting_region).clone();
1135
1136        // Registers failure detector again
1137        controller
1138            .register_failure_detectors(vec![detecting_region])
1139            .await;
1140        let (tx, rx) = oneshot::channel();
1141        sender.send(Event::Dump(tx)).await.unwrap();
1142        let detector = rx.await.unwrap();
1143        let got = detector.region_failure_detector(detecting_region).clone();
1144        assert_eq!(region_detector, got);
1145
1146        controller
1147            .deregister_failure_detectors(vec![detecting_region])
1148            .await;
1149        let (tx, rx) = oneshot::channel();
1150        sender.send(Event::Dump(tx)).await.unwrap();
1151        assert!(rx.await.unwrap().is_empty());
1152    }
1153
1154    #[tokio::test]
1155    async fn test_handle_submit_region_migration_task_result_migrated() {
1156        common_telemetry::init_default_ut_logging();
1157        let (mut supervisor, _) = new_test_supervisor();
1158        let region_id = RegionId::new(1, 1);
1159        let detecting_region = (1, region_id);
1160        supervisor
1161            .register_failure_detectors(vec![detecting_region])
1162            .await;
1163        supervisor.failover_counts.insert(detecting_region, 1);
1164        let result = SubmitRegionMigrationTaskResult {
1165            migrated: vec![region_id],
1166            ..Default::default()
1167        };
1168        supervisor
1169            .handle_submit_region_migration_task_result(
1170                1,
1171                2,
1172                Duration::from_millis(1000),
1173                RegionMigrationTriggerReason::Manual,
1174                result,
1175            )
1176            .await
1177            .unwrap();
1178        assert!(!supervisor.failure_detector.contains(&detecting_region));
1179        assert!(supervisor.failover_counts.is_empty());
1180    }
1181
1182    #[tokio::test]
1183    async fn test_handle_submit_region_migration_task_result_migrating() {
1184        common_telemetry::init_default_ut_logging();
1185        let (mut supervisor, _) = new_test_supervisor();
1186        let region_id = RegionId::new(1, 1);
1187        let detecting_region = (1, region_id);
1188        supervisor
1189            .register_failure_detectors(vec![detecting_region])
1190            .await;
1191        supervisor.failover_counts.insert(detecting_region, 1);
1192        let result = SubmitRegionMigrationTaskResult {
1193            migrating: vec![region_id],
1194            ..Default::default()
1195        };
1196        supervisor
1197            .handle_submit_region_migration_task_result(
1198                1,
1199                2,
1200                Duration::from_millis(1000),
1201                RegionMigrationTriggerReason::Manual,
1202                result,
1203            )
1204            .await
1205            .unwrap();
1206        assert!(supervisor.failure_detector.contains(&detecting_region));
1207        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1208    }
1209
1210    #[tokio::test]
1211    async fn test_handle_submit_region_migration_task_result_table_not_found() {
1212        common_telemetry::init_default_ut_logging();
1213        let (mut supervisor, _) = new_test_supervisor();
1214        let region_id = RegionId::new(1, 1);
1215        let detecting_region = (1, region_id);
1216        supervisor
1217            .register_failure_detectors(vec![detecting_region])
1218            .await;
1219        supervisor.failover_counts.insert(detecting_region, 1);
1220        let result = SubmitRegionMigrationTaskResult {
1221            table_not_found: vec![region_id],
1222            ..Default::default()
1223        };
1224        supervisor
1225            .handle_submit_region_migration_task_result(
1226                1,
1227                2,
1228                Duration::from_millis(1000),
1229                RegionMigrationTriggerReason::Manual,
1230                result,
1231            )
1232            .await
1233            .unwrap();
1234        assert!(!supervisor.failure_detector.contains(&detecting_region));
1235        assert!(supervisor.failover_counts.is_empty());
1236    }
1237
1238    #[tokio::test]
1239    async fn test_handle_submit_region_migration_task_result_region_not_found() {
1240        common_telemetry::init_default_ut_logging();
1241        let (mut supervisor, _) = new_test_supervisor();
1242        let region_id = RegionId::new(1, 1);
1243        let detecting_region = (1, region_id);
1244        supervisor
1245            .register_failure_detectors(vec![detecting_region])
1246            .await;
1247        supervisor.failover_counts.insert(detecting_region, 1);
1248        let result = SubmitRegionMigrationTaskResult {
1249            region_not_found: vec![region_id],
1250            ..Default::default()
1251        };
1252        supervisor
1253            .handle_submit_region_migration_task_result(
1254                1,
1255                2,
1256                Duration::from_millis(1000),
1257                RegionMigrationTriggerReason::Manual,
1258                result,
1259            )
1260            .await
1261            .unwrap();
1262        assert!(!supervisor.failure_detector.contains(&detecting_region));
1263        assert!(supervisor.failover_counts.is_empty());
1264    }
1265
1266    #[tokio::test]
1267    async fn test_handle_submit_region_migration_task_result_leader_changed() {
1268        common_telemetry::init_default_ut_logging();
1269        let (mut supervisor, _) = new_test_supervisor();
1270        let region_id = RegionId::new(1, 1);
1271        let detecting_region = (1, region_id);
1272        supervisor
1273            .register_failure_detectors(vec![detecting_region])
1274            .await;
1275        supervisor.failover_counts.insert(detecting_region, 1);
1276        let result = SubmitRegionMigrationTaskResult {
1277            leader_changed: vec![region_id],
1278            ..Default::default()
1279        };
1280        supervisor
1281            .handle_submit_region_migration_task_result(
1282                1,
1283                2,
1284                Duration::from_millis(1000),
1285                RegionMigrationTriggerReason::Manual,
1286                result,
1287            )
1288            .await
1289            .unwrap();
1290        assert!(!supervisor.failure_detector.contains(&detecting_region));
1291        assert!(supervisor.failover_counts.is_empty());
1292    }
1293
1294    #[tokio::test]
1295    async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1296        common_telemetry::init_default_ut_logging();
1297        let (mut supervisor, _) = new_test_supervisor();
1298        let region_id = RegionId::new(1, 1);
1299        let detecting_region = (1, region_id);
1300        supervisor
1301            .register_failure_detectors(vec![detecting_region])
1302            .await;
1303        supervisor.failover_counts.insert(detecting_region, 1);
1304        let result = SubmitRegionMigrationTaskResult {
1305            peer_conflict: vec![region_id],
1306            ..Default::default()
1307        };
1308        supervisor
1309            .handle_submit_region_migration_task_result(
1310                1,
1311                2,
1312                Duration::from_millis(1000),
1313                RegionMigrationTriggerReason::Manual,
1314                result,
1315            )
1316            .await
1317            .unwrap();
1318        assert!(supervisor.failure_detector.contains(&detecting_region));
1319        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1320    }
1321
1322    #[tokio::test]
1323    async fn test_handle_submit_region_migration_task_result_submitted() {
1324        common_telemetry::init_default_ut_logging();
1325        let (mut supervisor, _) = new_test_supervisor();
1326        let region_id = RegionId::new(1, 1);
1327        let detecting_region = (1, region_id);
1328        supervisor
1329            .register_failure_detectors(vec![detecting_region])
1330            .await;
1331        supervisor.failover_counts.insert(detecting_region, 1);
1332        let result = SubmitRegionMigrationTaskResult {
1333            submitted: vec![region_id],
1334            ..Default::default()
1335        };
1336        supervisor
1337            .handle_submit_region_migration_task_result(
1338                1,
1339                2,
1340                Duration::from_millis(1000),
1341                RegionMigrationTriggerReason::Manual,
1342                result,
1343            )
1344            .await
1345            .unwrap();
1346        assert!(supervisor.failure_detector.contains(&detecting_region));
1347        assert!(supervisor.failover_counts.contains_key(&detecting_region));
1348    }
1349}