meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
36use futures::{StreamExt, TryStreamExt};
37use snafu::{ResultExt, ensure};
38use store_api::storage::RegionId;
39use tokio::sync::mpsc::{Receiver, Sender};
40use tokio::sync::oneshot;
41use tokio::time::{MissedTickBehavior, interval, interval_at};
42
43use crate::discovery::utils::accept_ingest_workload;
44use crate::error::{self, Result};
45use crate::failure_detector::PhiAccrualFailureDetectorOptions;
46use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
47use crate::procedure::region_migration::manager::{
48    RegionMigrationManagerRef, RegionMigrationTriggerReason,
49};
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55
56/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
57/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
58/// and a timestamp indicating when the heartbeat was sent.
59#[derive(Debug)]
60pub(crate) struct DatanodeHeartbeat {
61    datanode_id: DatanodeId,
62    // TODO(weny): Considers collecting the memtable size in regions.
63    regions: Vec<RegionId>,
64    timestamp: i64,
65}
66
67impl From<&Stat> for DatanodeHeartbeat {
68    fn from(value: &Stat) -> Self {
69        DatanodeHeartbeat {
70            datanode_id: value.id,
71            regions: value.region_stats.iter().map(|x| x.id).collect(),
72            timestamp: value.timestamp_millis,
73        }
74    }
75}
76
77/// `Event` represents various types of events that can be processed by the region supervisor.
78/// These events are crucial for managing state transitions and handling specific scenarios
79/// in the region lifecycle.
80///
81/// Variants:
82/// - `Tick`: This event is used to trigger region failure detection periodically.
83/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
84/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
85/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
86/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
87/// - `Clear`: This event is used to reset the state of the supervisor, typically used
88///   when a system-wide reset or reinitialization is needed.
89/// - `Dump`: (Available only in test) This event triggers a dump of the
90///   current state for debugging purposes. It allows developers to inspect the internal state
91///   of the supervisor during tests.
92pub(crate) enum Event {
93    Tick,
94    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
95    RegisterFailureDetectors(Vec<DetectingRegion>),
96    DeregisterFailureDetectors(Vec<DetectingRegion>),
97    HeartbeatArrived(DatanodeHeartbeat),
98    Clear,
99    #[cfg(test)]
100    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
101}
102
103#[cfg(test)]
104impl Event {
105    pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
106        match self {
107            Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
108            _ => unreachable!(),
109        }
110    }
111}
112
113impl Debug for Event {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        match self {
116            Self::Tick => write!(f, "Tick"),
117            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
118            Self::Clear => write!(f, "Clear"),
119            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
120            Self::RegisterFailureDetectors(arg0) => f
121                .debug_tuple("RegisterFailureDetectors")
122                .field(arg0)
123                .finish(),
124            Self::DeregisterFailureDetectors(arg0) => f
125                .debug_tuple("DeregisterFailureDetectors")
126                .field(arg0)
127                .finish(),
128            #[cfg(test)]
129            Self::Dump(_) => f.debug_struct("Dump").finish(),
130        }
131    }
132}
133
134pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
135
136/// A background job to generate [`Event::Tick`] type events.
137#[derive(Debug)]
138pub struct RegionSupervisorTicker {
139    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
140    tick_handle: Mutex<Option<JoinHandle<()>>>,
141
142    /// The interval of tick.
143    tick_interval: Duration,
144
145    /// The delay before initializing all region failure detectors.
146    initialization_delay: Duration,
147
148    /// The retry period for initializing all region failure detectors.
149    initialization_retry_period: Duration,
150
151    /// Sends [Event]s.
152    sender: Sender<Event>,
153}
154
155#[async_trait]
156impl LeadershipChangeListener for RegionSupervisorTicker {
157    fn name(&self) -> &'static str {
158        "RegionSupervisorTicker"
159    }
160
161    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
162        self.start();
163        Ok(())
164    }
165
166    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
167        self.stop();
168        Ok(())
169    }
170}
171
172impl RegionSupervisorTicker {
173    pub(crate) fn new(
174        tick_interval: Duration,
175        initialization_delay: Duration,
176        initialization_retry_period: Duration,
177        sender: Sender<Event>,
178    ) -> Self {
179        info!(
180            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
181            tick_interval, initialization_delay, initialization_retry_period
182        );
183        Self {
184            tick_handle: Mutex::new(None),
185            tick_interval,
186            initialization_delay,
187            initialization_retry_period,
188            sender,
189        }
190    }
191
192    /// Starts the ticker.
193    pub fn start(&self) {
194        let mut handle = self.tick_handle.lock().unwrap();
195        if handle.is_none() {
196            let sender = self.sender.clone();
197            let tick_interval = self.tick_interval;
198            let initialization_delay = self.initialization_delay;
199
200            let mut initialization_interval = interval_at(
201                tokio::time::Instant::now() + initialization_delay,
202                self.initialization_retry_period,
203            );
204            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
205            common_runtime::spawn_global(async move {
206                loop {
207                    initialization_interval.tick().await;
208                    let (tx, rx) = oneshot::channel();
209                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
210                        info!(
211                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
212                        );
213                        break;
214                    }
215                    if rx.await.is_ok() {
216                        info!("All region failure detectors are initialized.");
217                        break;
218                    }
219                }
220            });
221
222            let sender = self.sender.clone();
223            let ticker_loop = tokio::spawn(async move {
224                let mut tick_interval = interval(tick_interval);
225                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
226
227                if let Err(err) = sender.send(Event::Clear).await {
228                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
229                    return;
230                }
231                loop {
232                    tick_interval.tick().await;
233                    if sender.send(Event::Tick).await.is_err() {
234                        info!("EventReceiver is dropped, tick loop is stopped");
235                        break;
236                    }
237                }
238            });
239            *handle = Some(ticker_loop);
240        }
241    }
242
243    /// Stops the ticker.
244    pub fn stop(&self) {
245        let handle = self.tick_handle.lock().unwrap().take();
246        if let Some(handle) = handle {
247            handle.abort();
248            info!("The tick loop is stopped.");
249        }
250    }
251}
252
253impl Drop for RegionSupervisorTicker {
254    fn drop(&mut self) {
255        self.stop();
256    }
257}
258
259pub type RegionSupervisorRef = Arc<RegionSupervisor>;
260
261/// The default tick interval.
262pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
263/// The default initialization retry period.
264pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
265
266/// Selector for region supervisor.
267pub enum RegionSupervisorSelector {
268    NaiveSelector(SelectorRef),
269    RegionStatAwareSelector(RegionStatAwareSelectorRef),
270}
271
272/// The [`RegionSupervisor`] is used to detect Region failures
273/// and initiate Region failover upon detection, ensuring uninterrupted region service.
274pub struct RegionSupervisor {
275    /// Used to detect the failure of regions.
276    failure_detector: RegionFailureDetector,
277    /// Tracks the number of failovers for each region.
278    failover_counts: HashMap<DetectingRegion, u32>,
279    /// Receives [Event]s.
280    receiver: Receiver<Event>,
281    /// The context of [`SelectorRef`]
282    selector_context: SelectorContext,
283    /// Candidate node selector.
284    selector: RegionSupervisorSelector,
285    /// Region migration manager.
286    region_migration_manager: RegionMigrationManagerRef,
287    /// The maintenance mode manager.
288    runtime_switch_manager: RuntimeSwitchManagerRef,
289    /// Peer resolver
290    peer_resolver: PeerResolverRef,
291    /// The kv backend.
292    kv_backend: KvBackendRef,
293}
294
295/// Controller for managing failure detectors for regions.
296#[derive(Debug, Clone)]
297pub struct RegionFailureDetectorControl {
298    sender: Sender<Event>,
299}
300
301impl RegionFailureDetectorControl {
302    pub(crate) fn new(sender: Sender<Event>) -> Self {
303        Self { sender }
304    }
305}
306
307#[async_trait::async_trait]
308impl RegionFailureDetectorController for RegionFailureDetectorControl {
309    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
310        if let Err(err) = self
311            .sender
312            .send(Event::RegisterFailureDetectors(detecting_regions))
313            .await
314        {
315            error!(err; "RegionSupervisor has stop receiving heartbeat.");
316        }
317    }
318
319    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
320        if let Err(err) = self
321            .sender
322            .send(Event::DeregisterFailureDetectors(detecting_regions))
323            .await
324        {
325            error!(err; "RegionSupervisor has stop receiving heartbeat.");
326        }
327    }
328}
329
330/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
331#[derive(Clone)]
332pub(crate) struct HeartbeatAcceptor {
333    sender: Sender<Event>,
334}
335
336impl HeartbeatAcceptor {
337    pub(crate) fn new(sender: Sender<Event>) -> Self {
338        Self { sender }
339    }
340
341    /// Accepts heartbeats from datanodes.
342    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
343        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
344            error!(err; "RegionSupervisor has stop receiving heartbeat.");
345        }
346    }
347}
348
349impl RegionSupervisor {
350    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
351    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
352        tokio::sync::mpsc::channel(1024)
353    }
354
355    #[allow(clippy::too_many_arguments)]
356    pub(crate) fn new(
357        event_receiver: Receiver<Event>,
358        options: PhiAccrualFailureDetectorOptions,
359        selector_context: SelectorContext,
360        selector: RegionSupervisorSelector,
361        region_migration_manager: RegionMigrationManagerRef,
362        runtime_switch_manager: RuntimeSwitchManagerRef,
363        peer_resolver: PeerResolverRef,
364        kv_backend: KvBackendRef,
365    ) -> Self {
366        Self {
367            failure_detector: RegionFailureDetector::new(options),
368            failover_counts: HashMap::new(),
369            receiver: event_receiver,
370            selector_context,
371            selector,
372            region_migration_manager,
373            runtime_switch_manager,
374            peer_resolver,
375            kv_backend,
376        }
377    }
378
379    /// Runs the main loop.
380    pub(crate) async fn run(&mut self) {
381        while let Some(event) = self.receiver.recv().await {
382            match event {
383                Event::InitializeAllRegions(sender) => {
384                    match self.is_maintenance_mode_enabled().await {
385                        Ok(false) => {}
386                        Ok(true) => {
387                            warn!(
388                                "Skipping initialize all regions since maintenance mode is enabled."
389                            );
390                            continue;
391                        }
392                        Err(err) => {
393                            error!(err; "Failed to check maintenance mode during initialize all regions.");
394                            continue;
395                        }
396                    }
397
398                    if let Err(err) = self.initialize_all().await {
399                        error!(err; "Failed to initialize all regions.");
400                    } else {
401                        // Ignore the error.
402                        let _ = sender.send(());
403                    }
404                }
405                Event::Tick => {
406                    let regions = self.detect_region_failure();
407                    self.handle_region_failures(regions).await;
408                }
409                Event::RegisterFailureDetectors(detecting_regions) => {
410                    self.register_failure_detectors(detecting_regions).await
411                }
412                Event::DeregisterFailureDetectors(detecting_regions) => {
413                    self.deregister_failure_detectors(detecting_regions).await
414                }
415                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
416                Event::Clear => self.clear(),
417                #[cfg(test)]
418                Event::Dump(sender) => {
419                    let _ = sender.send(self.failure_detector.dump());
420                }
421            }
422        }
423        info!("RegionSupervisor is stopped!");
424    }
425
426    async fn initialize_all(&self) -> Result<()> {
427        let now = Instant::now();
428        let regions = self.regions();
429        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
430        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
431            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
432        })
433        .into_stream();
434
435        let mut stream = stream
436            .map_ok(|(_, value)| {
437                TableRouteValue::try_from_raw_value(&value)
438                    .context(error::TableMetadataManagerSnafu)
439            })
440            .boxed();
441        let mut detecting_regions = Vec::new();
442        while let Some(route) = stream
443            .try_next()
444            .await
445            .context(error::TableMetadataManagerSnafu)?
446        {
447            let route = route?;
448            if !route.is_physical() {
449                continue;
450            }
451
452            let physical_table_route = route.into_physical_table_route();
453            physical_table_route
454                .region_routes
455                .iter()
456                .for_each(|region_route| {
457                    if !regions.contains(&region_route.region.id)
458                        && let Some(leader_peer) = &region_route.leader_peer
459                    {
460                        detecting_regions.push((leader_peer.id, region_route.region.id));
461                    }
462                });
463        }
464
465        let num_detecting_regions = detecting_regions.len();
466        if !detecting_regions.is_empty() {
467            self.register_failure_detectors(detecting_regions).await;
468        }
469
470        info!(
471            "Initialize {} region failure detectors, elapsed: {:?}",
472            num_detecting_regions,
473            now.elapsed()
474        );
475
476        Ok(())
477    }
478
479    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
480        let ts_millis = current_time_millis();
481        for region in detecting_regions {
482            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
483            self.failure_detector
484                .maybe_init_region_failure_detector(region, ts_millis);
485        }
486    }
487
488    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
489        for region in detecting_regions {
490            self.failure_detector.remove(&region);
491            self.failover_counts.remove(&region);
492        }
493    }
494
495    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
496        if regions.is_empty() {
497            return;
498        }
499        match self.is_maintenance_mode_enabled().await {
500            Ok(false) => {}
501            Ok(true) => {
502                warn!(
503                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
504                    regions
505                );
506                return;
507            }
508            Err(err) => {
509                error!(err; "Failed to check maintenance mode");
510                return;
511            }
512        }
513
514        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
515        let migrating_regions = regions
516            .extract_if(.., |(_, region_id)| {
517                self.region_migration_manager.tracker().contains(*region_id)
518            })
519            .collect::<Vec<_>>();
520
521        for (datanode_id, region_id) in migrating_regions {
522            debug!(
523                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
524            );
525        }
526
527        if regions.is_empty() {
528            // If all detected regions are failover or migrating, just return.
529            return;
530        }
531
532        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
533            HashMap::with_capacity(regions.len());
534        for (datanode_id, region_id) in regions {
535            grouped_regions
536                .entry(datanode_id)
537                .or_default()
538                .push(region_id);
539        }
540
541        for (datanode_id, regions) in grouped_regions {
542            warn!(
543                "Detects region failures on datanode: {}, regions: {:?}",
544                datanode_id, regions
545            );
546            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
547            // because there may be false positives in failure detection on the datanode.
548            // So we only consider the datanode that reports the failure.
549            let failed_datanodes = [datanode_id];
550            match self
551                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
552                .await
553            {
554                Ok(tasks) => {
555                    for (task, count) in tasks {
556                        let region_id = task.region_id;
557                        let datanode_id = task.from_peer.id;
558                        if let Err(err) = self.do_failover(task, count).await {
559                            error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
560                        }
561                    }
562                }
563                Err(err) => error!(err; "Failed to generate failover tasks"),
564            }
565        }
566    }
567
568    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
569        self.runtime_switch_manager
570            .maintenance_mode()
571            .await
572            .context(error::RuntimeSwitchManagerSnafu)
573    }
574
575    async fn select_peers(
576        &self,
577        from_peer_id: DatanodeId,
578        regions: &[RegionId],
579        failure_datanodes: &[DatanodeId],
580    ) -> Result<Vec<(RegionId, Peer)>> {
581        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
582        match &self.selector {
583            RegionSupervisorSelector::NaiveSelector(selector) => {
584                let opt = SelectorOptions {
585                    min_required_items: regions.len(),
586                    allow_duplication: true,
587                    exclude_peer_ids,
588                    workload_filter: Some(accept_ingest_workload),
589                };
590                let peers = selector.select(&self.selector_context, opt).await?;
591                ensure!(
592                    peers.len() == regions.len(),
593                    error::NoEnoughAvailableNodeSnafu {
594                        required: regions.len(),
595                        available: peers.len(),
596                        select_target: SelectTarget::Datanode,
597                    }
598                );
599                let region_peers = regions
600                    .iter()
601                    .zip(peers)
602                    .map(|(region_id, peer)| (*region_id, peer))
603                    .collect::<Vec<_>>();
604
605                Ok(region_peers)
606            }
607            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
608                let peers = selector
609                    .select(
610                        &self.selector_context,
611                        from_peer_id,
612                        regions,
613                        exclude_peer_ids,
614                    )
615                    .await?;
616                ensure!(
617                    peers.len() == regions.len(),
618                    error::NoEnoughAvailableNodeSnafu {
619                        required: regions.len(),
620                        available: peers.len(),
621                        select_target: SelectTarget::Datanode,
622                    }
623                );
624
625                Ok(peers)
626            }
627        }
628    }
629
630    async fn generate_failover_tasks(
631        &mut self,
632        from_peer_id: DatanodeId,
633        regions: &[RegionId],
634        failed_datanodes: &[DatanodeId],
635    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
636        let mut tasks = Vec::with_capacity(regions.len());
637        let from_peer = self
638            .peer_resolver
639            .datanode(from_peer_id)
640            .await
641            .ok()
642            .flatten()
643            .unwrap_or_else(|| Peer::empty(from_peer_id));
644
645        let region_peers = self
646            .select_peers(from_peer_id, regions, failed_datanodes)
647            .await?;
648
649        for (region_id, peer) in region_peers {
650            let count = *self
651                .failover_counts
652                .entry((from_peer_id, region_id))
653                .and_modify(|count| *count += 1)
654                .or_insert(1);
655            let task = RegionMigrationProcedureTask {
656                region_id,
657                from_peer: from_peer.clone(),
658                to_peer: peer,
659                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
660                trigger_reason: RegionMigrationTriggerReason::Failover,
661            };
662            tasks.push((task, count));
663        }
664
665        Ok(tasks)
666    }
667
668    async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
669        let from_peer_id = task.from_peer.id;
670        let to_peer_id = task.to_peer.id;
671        let region_id = task.region_id;
672
673        info!(
674            "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
675            task.region_id, task.from_peer, task.to_peer, task.timeout, count
676        );
677
678        if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
679            return match err {
680                RegionMigrated { .. } => {
681                    info!(
682                        "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
683                        to_peer_id, region_id, from_peer_id
684                    );
685                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
686                        .await;
687                    Ok(())
688                }
689                // Returns Ok if it's running or table is dropped.
690                MigrationRunning { .. } => {
691                    info!(
692                        "Another region migration is running, skip failover for region: {}, datanode: {}",
693                        region_id, from_peer_id
694                    );
695                    Ok(())
696                }
697                TableRouteNotFound { .. } => {
698                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
699                        .await;
700                    info!(
701                        "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
702                        region_id, from_peer_id
703                    );
704                    Ok(())
705                }
706                LeaderPeerChanged { .. } => {
707                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
708                        .await;
709                    info!(
710                        "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
711                        region_id, from_peer_id
712                    );
713                    Ok(())
714                }
715                err => Err(err),
716            };
717        };
718
719        Ok(())
720    }
721
722    /// Detects the failure of regions.
723    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
724        self.failure_detector
725            .iter()
726            .filter_map(|e| {
727                // Intentionally not place `current_time_millis()` out of the iteration.
728                // The failure detection determination should be happened "just in time",
729                // i.e., failed or not has to be compared with the most recent "now".
730                // Besides, it might reduce the false positive of failure detection,
731                // because during the iteration, heartbeats are coming in as usual,
732                // and the `phi`s are still updating.
733                if !e.failure_detector().is_available(current_time_millis()) {
734                    Some(*e.region_ident())
735                } else {
736                    None
737                }
738            })
739            .collect::<Vec<_>>()
740    }
741
742    /// Returns all regions that registered in the failure detector.
743    fn regions(&self) -> HashSet<RegionId> {
744        self.failure_detector
745            .iter()
746            .map(|e| e.region_ident().1)
747            .collect::<HashSet<_>>()
748    }
749
750    /// Updates the state of corresponding failure detectors.
751    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
752        for region_id in heartbeat.regions {
753            let detecting_region = (heartbeat.datanode_id, region_id);
754            let mut detector = self
755                .failure_detector
756                .region_failure_detector(detecting_region);
757            detector.heartbeat(heartbeat.timestamp);
758        }
759    }
760
761    fn clear(&self) {
762        self.failure_detector.clear();
763    }
764}
765
766#[cfg(test)]
767pub(crate) mod tests {
768    use std::assert_matches::assert_matches;
769    use std::collections::HashMap;
770    use std::sync::{Arc, Mutex};
771    use std::time::Duration;
772
773    use common_meta::ddl::RegionFailureDetectorController;
774    use common_meta::ddl::test_util::{
775        test_create_logical_table_task, test_create_physical_table_task,
776    };
777    use common_meta::key::table_route::{
778        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
779    };
780    use common_meta::key::{TableMetadataManager, runtime_switch};
781    use common_meta::peer::Peer;
782    use common_meta::rpc::router::{Region, RegionRoute};
783    use common_meta::test_util::NoopPeerResolver;
784    use common_telemetry::info;
785    use common_time::util::current_time_millis;
786    use rand::Rng;
787    use store_api::storage::RegionId;
788    use tokio::sync::mpsc::Sender;
789    use tokio::sync::oneshot;
790    use tokio::time::sleep;
791
792    use super::RegionSupervisorSelector;
793    use crate::procedure::region_migration::manager::RegionMigrationManager;
794    use crate::procedure::region_migration::test_util::TestingEnv;
795    use crate::region::supervisor::{
796        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
797        RegionSupervisorTicker,
798    };
799    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
800
801    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
802        let env = TestingEnv::new();
803        let selector_context = new_test_selector_context();
804        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
805        let context_factory = env.context_factory();
806        let region_migration_manager = Arc::new(RegionMigrationManager::new(
807            env.procedure_manager().clone(),
808            context_factory,
809        ));
810        let runtime_switch_manager =
811            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
812        let peer_resolver = Arc::new(NoopPeerResolver);
813        let (tx, rx) = RegionSupervisor::channel();
814        let kv_backend = env.kv_backend();
815
816        (
817            RegionSupervisor::new(
818                rx,
819                Default::default(),
820                selector_context,
821                RegionSupervisorSelector::NaiveSelector(selector),
822                region_migration_manager,
823                runtime_switch_manager,
824                peer_resolver,
825                kv_backend,
826            ),
827            tx,
828        )
829    }
830
831    #[tokio::test]
832    async fn test_heartbeat() {
833        let (mut supervisor, sender) = new_test_supervisor();
834        tokio::spawn(async move { supervisor.run().await });
835
836        sender
837            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
838                datanode_id: 0,
839                regions: vec![RegionId::new(1, 1)],
840                timestamp: 100,
841            }))
842            .await
843            .unwrap();
844        let (tx, rx) = oneshot::channel();
845        sender.send(Event::Dump(tx)).await.unwrap();
846        let detector = rx.await.unwrap();
847        assert!(detector.contains(&(0, RegionId::new(1, 1))));
848
849        // Clear up
850        sender.send(Event::Clear).await.unwrap();
851        let (tx, rx) = oneshot::channel();
852        sender.send(Event::Dump(tx)).await.unwrap();
853        assert!(rx.await.unwrap().is_empty());
854
855        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
856            let mut rng = rand::rng();
857            let start = current_time_millis();
858            (0..2000)
859                .map(|i| DatanodeHeartbeat {
860                    timestamp: start + i * 1000 + rng.random_range(0..100),
861                    datanode_id,
862                    regions: region_ids
863                        .iter()
864                        .map(|number| RegionId::new(0, *number))
865                        .collect(),
866                })
867                .collect::<Vec<_>>()
868        }
869
870        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
871        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
872        for heartbeat in heartbeats {
873            sender
874                .send(Event::HeartbeatArrived(heartbeat))
875                .await
876                .unwrap();
877        }
878
879        let (tx, rx) = oneshot::channel();
880        sender.send(Event::Dump(tx)).await.unwrap();
881        let detector = rx.await.unwrap();
882        assert_eq!(detector.len(), 3);
883
884        for e in detector.iter() {
885            let fd = e.failure_detector();
886            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
887            let start = last_heartbeat_time;
888
889            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
890            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
891                let now = start + i * 1000;
892                assert_eq!(fd.phi(now), 0.0);
893            }
894
895            // ... then in less than two seconds, phi is above the threshold.
896            // The same effect can be seen in the diagrams in Akka's document.
897            let now = start + acceptable_heartbeat_pause_millis + 1000;
898            assert!(fd.phi(now) < fd.threshold() as _);
899            let now = start + acceptable_heartbeat_pause_millis + 2000;
900            assert!(fd.phi(now) > fd.threshold() as _);
901        }
902    }
903
904    #[tokio::test]
905    async fn test_supervisor_ticker() {
906        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
907        let ticker = RegionSupervisorTicker {
908            tick_handle: Mutex::new(None),
909            tick_interval: Duration::from_millis(10),
910            initialization_delay: Duration::from_millis(100),
911            initialization_retry_period: Duration::from_millis(100),
912            sender: tx,
913        };
914        // It's ok if we start the ticker again.
915        for _ in 0..2 {
916            ticker.start();
917            sleep(Duration::from_millis(100)).await;
918            ticker.stop();
919            assert!(!rx.is_empty());
920            while let Ok(event) = rx.try_recv() {
921                assert_matches!(
922                    event,
923                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
924                );
925            }
926        }
927    }
928
929    #[tokio::test]
930    async fn test_initialize_all_regions_event_handling() {
931        common_telemetry::init_default_ut_logging();
932        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
933        let ticker = RegionSupervisorTicker {
934            tick_handle: Mutex::new(None),
935            tick_interval: Duration::from_millis(1000),
936            initialization_delay: Duration::from_millis(50),
937            initialization_retry_period: Duration::from_millis(50),
938            sender: tx,
939        };
940        ticker.start();
941        sleep(Duration::from_millis(60)).await;
942        let handle = tokio::spawn(async move {
943            let mut counter = 0;
944            while let Some(event) = rx.recv().await {
945                if let Event::InitializeAllRegions(tx) = event {
946                    if counter == 0 {
947                        // Ignore the first event
948                        counter += 1;
949                        continue;
950                    }
951                    tx.send(()).unwrap();
952                    info!("Responded initialize all regions event");
953                    break;
954                }
955            }
956            rx
957        });
958
959        let rx = handle.await.unwrap();
960        for _ in 0..3 {
961            sleep(Duration::from_millis(100)).await;
962            assert!(rx.is_empty());
963        }
964    }
965
966    #[tokio::test]
967    async fn test_initialize_all_regions() {
968        common_telemetry::init_default_ut_logging();
969        let (mut supervisor, sender) = new_test_supervisor();
970        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
971
972        // Create a physical table metadata
973        let table_id = 1024;
974        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
975        create_physical_table_task.set_table_id(table_id);
976        let table_info = create_physical_table_task.table_info;
977        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
978            region: Region {
979                id: RegionId::new(table_id, 0),
980                ..Default::default()
981            },
982            leader_peer: Some(Peer::empty(1)),
983            ..Default::default()
984        }]);
985        let table_route_value = TableRouteValue::Physical(table_route);
986        table_metadata_manager
987            .create_table_metadata(table_info, table_route_value, HashMap::new())
988            .await
989            .unwrap();
990
991        // Create a logical table metadata
992        let logical_table_id = 1025;
993        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
994        test_create_logical_table_task.set_table_id(logical_table_id);
995        let table_info = test_create_logical_table_task.table_info;
996        let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
997        let table_route_value = TableRouteValue::Logical(table_route);
998        table_metadata_manager
999            .create_table_metadata(table_info, table_route_value, HashMap::new())
1000            .await
1001            .unwrap();
1002        tokio::spawn(async move { supervisor.run().await });
1003        let (tx, rx) = oneshot::channel();
1004        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1005        assert!(rx.await.is_ok());
1006
1007        let (tx, rx) = oneshot::channel();
1008        sender.send(Event::Dump(tx)).await.unwrap();
1009        let detector = rx.await.unwrap();
1010        assert_eq!(detector.len(), 1);
1011        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1012    }
1013
1014    #[tokio::test]
1015    async fn test_initialize_all_regions_with_maintenance_mode() {
1016        common_telemetry::init_default_ut_logging();
1017        let (mut supervisor, sender) = new_test_supervisor();
1018
1019        supervisor
1020            .runtime_switch_manager
1021            .set_maintenance_mode()
1022            .await
1023            .unwrap();
1024        tokio::spawn(async move { supervisor.run().await });
1025        let (tx, rx) = oneshot::channel();
1026        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1027        // The sender is dropped, so the receiver will receive an error.
1028        assert!(rx.await.is_err());
1029    }
1030
1031    #[tokio::test]
1032    async fn test_region_failure_detector_controller() {
1033        let (mut supervisor, sender) = new_test_supervisor();
1034        let controller = RegionFailureDetectorControl::new(sender.clone());
1035        tokio::spawn(async move { supervisor.run().await });
1036        let detecting_region = (1, RegionId::new(1, 1));
1037        controller
1038            .register_failure_detectors(vec![detecting_region])
1039            .await;
1040
1041        let (tx, rx) = oneshot::channel();
1042        sender.send(Event::Dump(tx)).await.unwrap();
1043        let detector = rx.await.unwrap();
1044        let region_detector = detector.region_failure_detector(detecting_region).clone();
1045
1046        // Registers failure detector again
1047        controller
1048            .register_failure_detectors(vec![detecting_region])
1049            .await;
1050        let (tx, rx) = oneshot::channel();
1051        sender.send(Event::Dump(tx)).await.unwrap();
1052        let detector = rx.await.unwrap();
1053        let got = detector.region_failure_detector(detecting_region).clone();
1054        assert_eq!(region_detector, got);
1055
1056        controller
1057            .deregister_failure_detectors(vec![detecting_region])
1058            .await;
1059        let (tx, rx) = oneshot::channel();
1060        sender.send(Event::Dump(tx)).await.unwrap();
1061        assert!(rx.await.unwrap().is_empty());
1062    }
1063}