meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::datanode::Stat;
22use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
23use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
24use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
25use common_meta::key::{MetadataKey, MetadataValue};
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::leadership_notifier::LeadershipChangeListener;
28use common_meta::peer::{Peer, PeerLookupServiceRef};
29use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
30use common_meta::rpc::store::RangeRequest;
31use common_meta::DatanodeId;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
36use futures::{StreamExt, TryStreamExt};
37use snafu::{ensure, ResultExt};
38use store_api::storage::RegionId;
39use tokio::sync::mpsc::{Receiver, Sender};
40use tokio::sync::oneshot;
41use tokio::time::{interval, interval_at, MissedTickBehavior};
42
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47    RegionMigrationManagerRef, RegionMigrationTriggerReason,
48};
49use crate::procedure::region_migration::{
50    RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
51};
52use crate::region::failure_detector::RegionFailureDetector;
53use crate::selector::SelectorOptions;
54
55/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
56/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
57/// and a timestamp indicating when the heartbeat was sent.
58#[derive(Debug)]
59pub(crate) struct DatanodeHeartbeat {
60    datanode_id: DatanodeId,
61    // TODO(weny): Considers collecting the memtable size in regions.
62    regions: Vec<RegionId>,
63    timestamp: i64,
64}
65
66impl From<&Stat> for DatanodeHeartbeat {
67    fn from(value: &Stat) -> Self {
68        DatanodeHeartbeat {
69            datanode_id: value.id,
70            regions: value.region_stats.iter().map(|x| x.id).collect(),
71            timestamp: value.timestamp_millis,
72        }
73    }
74}
75
76/// `Event` represents various types of events that can be processed by the region supervisor.
77/// These events are crucial for managing state transitions and handling specific scenarios
78/// in the region lifecycle.
79///
80/// Variants:
81/// - `Tick`: This event is used to trigger region failure detection periodically.
82/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
83/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
84/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
85/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
86/// - `Clear`: This event is used to reset the state of the supervisor, typically used
87///   when a system-wide reset or reinitialization is needed.
88/// - `Dump`: (Available only in test) This event triggers a dump of the
89///   current state for debugging purposes. It allows developers to inspect the internal state
90///   of the supervisor during tests.
91pub(crate) enum Event {
92    Tick,
93    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
94    RegisterFailureDetectors(Vec<DetectingRegion>),
95    DeregisterFailureDetectors(Vec<DetectingRegion>),
96    HeartbeatArrived(DatanodeHeartbeat),
97    Clear,
98    #[cfg(test)]
99    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
100}
101
102#[cfg(test)]
103impl Event {
104    pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
105        match self {
106            Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
107            _ => unreachable!(),
108        }
109    }
110}
111
112impl Debug for Event {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            Self::Tick => write!(f, "Tick"),
116            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
117            Self::Clear => write!(f, "Clear"),
118            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
119            Self::RegisterFailureDetectors(arg0) => f
120                .debug_tuple("RegisterFailureDetectors")
121                .field(arg0)
122                .finish(),
123            Self::DeregisterFailureDetectors(arg0) => f
124                .debug_tuple("DeregisterFailureDetectors")
125                .field(arg0)
126                .finish(),
127            #[cfg(test)]
128            Self::Dump(_) => f.debug_struct("Dump").finish(),
129        }
130    }
131}
132
133pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
134
135/// A background job to generate [`Event::Tick`] type events.
136#[derive(Debug)]
137pub struct RegionSupervisorTicker {
138    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
139    tick_handle: Mutex<Option<JoinHandle<()>>>,
140
141    /// The interval of tick.
142    tick_interval: Duration,
143
144    /// The delay before initializing all region failure detectors.
145    initialization_delay: Duration,
146
147    /// The retry period for initializing all region failure detectors.
148    initialization_retry_period: Duration,
149
150    /// Sends [Event]s.
151    sender: Sender<Event>,
152}
153
154#[async_trait]
155impl LeadershipChangeListener for RegionSupervisorTicker {
156    fn name(&self) -> &'static str {
157        "RegionSupervisorTicker"
158    }
159
160    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
161        self.start();
162        Ok(())
163    }
164
165    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
166        self.stop();
167        Ok(())
168    }
169}
170
171impl RegionSupervisorTicker {
172    pub(crate) fn new(
173        tick_interval: Duration,
174        initialization_delay: Duration,
175        initialization_retry_period: Duration,
176        sender: Sender<Event>,
177    ) -> Self {
178        info!(
179            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
180            tick_interval, initialization_delay, initialization_retry_period
181        );
182        Self {
183            tick_handle: Mutex::new(None),
184            tick_interval,
185            initialization_delay,
186            initialization_retry_period,
187            sender,
188        }
189    }
190
191    /// Starts the ticker.
192    pub fn start(&self) {
193        let mut handle = self.tick_handle.lock().unwrap();
194        if handle.is_none() {
195            let sender = self.sender.clone();
196            let tick_interval = self.tick_interval;
197            let initialization_delay = self.initialization_delay;
198
199            let mut initialization_interval = interval_at(
200                tokio::time::Instant::now() + initialization_delay,
201                self.initialization_retry_period,
202            );
203            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
204            common_runtime::spawn_global(async move {
205                loop {
206                    initialization_interval.tick().await;
207                    let (tx, rx) = oneshot::channel();
208                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
209                        info!("EventReceiver is dropped, region failure detectors initialization loop is stopped");
210                        break;
211                    }
212                    if rx.await.is_ok() {
213                        info!("All region failure detectors are initialized.");
214                        break;
215                    }
216                }
217            });
218
219            let sender = self.sender.clone();
220            let ticker_loop = tokio::spawn(async move {
221                let mut tick_interval = interval(tick_interval);
222                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
223
224                if let Err(err) = sender.send(Event::Clear).await {
225                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
226                    return;
227                }
228                loop {
229                    tick_interval.tick().await;
230                    if sender.send(Event::Tick).await.is_err() {
231                        info!("EventReceiver is dropped, tick loop is stopped");
232                        break;
233                    }
234                }
235            });
236            *handle = Some(ticker_loop);
237        }
238    }
239
240    /// Stops the ticker.
241    pub fn stop(&self) {
242        let handle = self.tick_handle.lock().unwrap().take();
243        if let Some(handle) = handle {
244            handle.abort();
245            info!("The tick loop is stopped.");
246        }
247    }
248}
249
250impl Drop for RegionSupervisorTicker {
251    fn drop(&mut self) {
252        self.stop();
253    }
254}
255
256pub type RegionSupervisorRef = Arc<RegionSupervisor>;
257
258/// The default tick interval.
259pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
260/// The default initialization retry period.
261pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
262
263/// Selector for region supervisor.
264pub enum RegionSupervisorSelector {
265    NaiveSelector(SelectorRef),
266    RegionStatAwareSelector(RegionStatAwareSelectorRef),
267}
268
269/// The [`RegionSupervisor`] is used to detect Region failures
270/// and initiate Region failover upon detection, ensuring uninterrupted region service.
271pub struct RegionSupervisor {
272    /// Used to detect the failure of regions.
273    failure_detector: RegionFailureDetector,
274    /// Tracks the number of failovers for each region.
275    failover_counts: HashMap<DetectingRegion, u32>,
276    /// Receives [Event]s.
277    receiver: Receiver<Event>,
278    /// The context of [`SelectorRef`]
279    selector_context: SelectorContext,
280    /// Candidate node selector.
281    selector: RegionSupervisorSelector,
282    /// Region migration manager.
283    region_migration_manager: RegionMigrationManagerRef,
284    /// The maintenance mode manager.
285    runtime_switch_manager: RuntimeSwitchManagerRef,
286    /// Peer lookup service
287    peer_lookup: PeerLookupServiceRef,
288    /// The kv backend.
289    kv_backend: KvBackendRef,
290}
291
292/// Controller for managing failure detectors for regions.
293#[derive(Debug, Clone)]
294pub struct RegionFailureDetectorControl {
295    sender: Sender<Event>,
296}
297
298impl RegionFailureDetectorControl {
299    pub(crate) fn new(sender: Sender<Event>) -> Self {
300        Self { sender }
301    }
302}
303
304#[async_trait::async_trait]
305impl RegionFailureDetectorController for RegionFailureDetectorControl {
306    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
307        if let Err(err) = self
308            .sender
309            .send(Event::RegisterFailureDetectors(detecting_regions))
310            .await
311        {
312            error!(err; "RegionSupervisor has stop receiving heartbeat.");
313        }
314    }
315
316    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
317        if let Err(err) = self
318            .sender
319            .send(Event::DeregisterFailureDetectors(detecting_regions))
320            .await
321        {
322            error!(err; "RegionSupervisor has stop receiving heartbeat.");
323        }
324    }
325}
326
327/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
328#[derive(Clone)]
329pub(crate) struct HeartbeatAcceptor {
330    sender: Sender<Event>,
331}
332
333impl HeartbeatAcceptor {
334    pub(crate) fn new(sender: Sender<Event>) -> Self {
335        Self { sender }
336    }
337
338    /// Accepts heartbeats from datanodes.
339    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
340        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
341            error!(err; "RegionSupervisor has stop receiving heartbeat.");
342        }
343    }
344}
345
346impl RegionSupervisor {
347    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
348    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
349        tokio::sync::mpsc::channel(1024)
350    }
351
352    #[allow(clippy::too_many_arguments)]
353    pub(crate) fn new(
354        event_receiver: Receiver<Event>,
355        options: PhiAccrualFailureDetectorOptions,
356        selector_context: SelectorContext,
357        selector: RegionSupervisorSelector,
358        region_migration_manager: RegionMigrationManagerRef,
359        runtime_switch_manager: RuntimeSwitchManagerRef,
360        peer_lookup: PeerLookupServiceRef,
361        kv_backend: KvBackendRef,
362    ) -> Self {
363        Self {
364            failure_detector: RegionFailureDetector::new(options),
365            failover_counts: HashMap::new(),
366            receiver: event_receiver,
367            selector_context,
368            selector,
369            region_migration_manager,
370            runtime_switch_manager,
371            peer_lookup,
372            kv_backend,
373        }
374    }
375
376    /// Runs the main loop.
377    pub(crate) async fn run(&mut self) {
378        while let Some(event) = self.receiver.recv().await {
379            match event {
380                Event::InitializeAllRegions(sender) => {
381                    match self.is_maintenance_mode_enabled().await {
382                        Ok(false) => {}
383                        Ok(true) => {
384                            warn!("Skipping initialize all regions since maintenance mode is enabled.");
385                            continue;
386                        }
387                        Err(err) => {
388                            error!(err; "Failed to check maintenance mode during initialize all regions.");
389                            continue;
390                        }
391                    }
392
393                    if let Err(err) = self.initialize_all().await {
394                        error!(err; "Failed to initialize all regions.");
395                    } else {
396                        // Ignore the error.
397                        let _ = sender.send(());
398                    }
399                }
400                Event::Tick => {
401                    let regions = self.detect_region_failure();
402                    self.handle_region_failures(regions).await;
403                }
404                Event::RegisterFailureDetectors(detecting_regions) => {
405                    self.register_failure_detectors(detecting_regions).await
406                }
407                Event::DeregisterFailureDetectors(detecting_regions) => {
408                    self.deregister_failure_detectors(detecting_regions).await
409                }
410                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
411                Event::Clear => self.clear(),
412                #[cfg(test)]
413                Event::Dump(sender) => {
414                    let _ = sender.send(self.failure_detector.dump());
415                }
416            }
417        }
418        info!("RegionSupervisor is stopped!");
419    }
420
421    async fn initialize_all(&self) -> Result<()> {
422        let now = Instant::now();
423        let regions = self.regions();
424        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
425        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
426            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
427        })
428        .into_stream();
429
430        let mut stream = stream
431            .map_ok(|(_, value)| {
432                TableRouteValue::try_from_raw_value(&value)
433                    .context(error::TableMetadataManagerSnafu)
434            })
435            .boxed();
436        let mut detecting_regions = Vec::new();
437        while let Some(route) = stream
438            .try_next()
439            .await
440            .context(error::TableMetadataManagerSnafu)?
441        {
442            let route = route?;
443            if !route.is_physical() {
444                continue;
445            }
446
447            let physical_table_route = route.into_physical_table_route();
448            physical_table_route
449                .region_routes
450                .iter()
451                .for_each(|region_route| {
452                    if !regions.contains(&region_route.region.id) {
453                        if let Some(leader_peer) = &region_route.leader_peer {
454                            detecting_regions.push((leader_peer.id, region_route.region.id));
455                        }
456                    }
457                });
458        }
459
460        let num_detecting_regions = detecting_regions.len();
461        if !detecting_regions.is_empty() {
462            self.register_failure_detectors(detecting_regions).await;
463        }
464
465        info!(
466            "Initialize {} region failure detectors, elapsed: {:?}",
467            num_detecting_regions,
468            now.elapsed()
469        );
470
471        Ok(())
472    }
473
474    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
475        let ts_millis = current_time_millis();
476        for region in detecting_regions {
477            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
478            self.failure_detector
479                .maybe_init_region_failure_detector(region, ts_millis);
480        }
481    }
482
483    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
484        for region in detecting_regions {
485            self.failure_detector.remove(&region);
486            self.failover_counts.remove(&region);
487        }
488    }
489
490    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
491        if regions.is_empty() {
492            return;
493        }
494        match self.is_maintenance_mode_enabled().await {
495            Ok(false) => {}
496            Ok(true) => {
497                warn!(
498                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
499                    regions
500                );
501                return;
502            }
503            Err(err) => {
504                error!(err; "Failed to check maintenance mode");
505                return;
506            }
507        }
508
509        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
510        let migrating_regions = regions
511            .extract_if(.., |(_, region_id)| {
512                self.region_migration_manager.tracker().contains(*region_id)
513            })
514            .collect::<Vec<_>>();
515
516        for (datanode_id, region_id) in migrating_regions {
517            debug!(
518                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
519            );
520        }
521
522        if regions.is_empty() {
523            // If all detected regions are failover or migrating, just return.
524            return;
525        }
526
527        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
528            HashMap::with_capacity(regions.len());
529        for (datanode_id, region_id) in regions {
530            grouped_regions
531                .entry(datanode_id)
532                .or_default()
533                .push(region_id);
534        }
535
536        for (datanode_id, regions) in grouped_regions {
537            warn!(
538                "Detects region failures on datanode: {}, regions: {:?}",
539                datanode_id, regions
540            );
541            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
542            // because there may be false positives in failure detection on the datanode.
543            // So we only consider the datanode that reports the failure.
544            let failed_datanodes = [datanode_id];
545            match self
546                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
547                .await
548            {
549                Ok(tasks) => {
550                    for (task, count) in tasks {
551                        let region_id = task.region_id;
552                        let datanode_id = task.from_peer.id;
553                        if let Err(err) = self.do_failover(task, count).await {
554                            error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
555                        }
556                    }
557                }
558                Err(err) => error!(err; "Failed to generate failover tasks"),
559            }
560        }
561    }
562
563    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
564        self.runtime_switch_manager
565            .maintenance_mode()
566            .await
567            .context(error::RuntimeSwitchManagerSnafu)
568    }
569
570    async fn select_peers(
571        &self,
572        from_peer_id: DatanodeId,
573        regions: &[RegionId],
574        failure_datanodes: &[DatanodeId],
575    ) -> Result<Vec<(RegionId, Peer)>> {
576        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
577        match &self.selector {
578            RegionSupervisorSelector::NaiveSelector(selector) => {
579                let opt = SelectorOptions {
580                    min_required_items: regions.len(),
581                    allow_duplication: true,
582                    exclude_peer_ids,
583                };
584                let peers = selector.select(&self.selector_context, opt).await?;
585                ensure!(
586                    peers.len() == regions.len(),
587                    error::NoEnoughAvailableNodeSnafu {
588                        required: regions.len(),
589                        available: peers.len(),
590                        select_target: SelectTarget::Datanode,
591                    }
592                );
593                let region_peers = regions
594                    .iter()
595                    .zip(peers)
596                    .map(|(region_id, peer)| (*region_id, peer))
597                    .collect::<Vec<_>>();
598
599                Ok(region_peers)
600            }
601            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
602                let peers = selector
603                    .select(
604                        &self.selector_context,
605                        from_peer_id,
606                        regions,
607                        exclude_peer_ids,
608                    )
609                    .await?;
610                ensure!(
611                    peers.len() == regions.len(),
612                    error::NoEnoughAvailableNodeSnafu {
613                        required: regions.len(),
614                        available: peers.len(),
615                        select_target: SelectTarget::Datanode,
616                    }
617                );
618
619                Ok(peers)
620            }
621        }
622    }
623
624    async fn generate_failover_tasks(
625        &mut self,
626        from_peer_id: DatanodeId,
627        regions: &[RegionId],
628        failed_datanodes: &[DatanodeId],
629    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
630        let mut tasks = Vec::with_capacity(regions.len());
631        let from_peer = self
632            .peer_lookup
633            .datanode(from_peer_id)
634            .await
635            .ok()
636            .flatten()
637            .unwrap_or_else(|| Peer::empty(from_peer_id));
638
639        let region_peers = self
640            .select_peers(from_peer_id, regions, failed_datanodes)
641            .await?;
642
643        for (region_id, peer) in region_peers {
644            let count = *self
645                .failover_counts
646                .entry((from_peer_id, region_id))
647                .and_modify(|count| *count += 1)
648                .or_insert(1);
649            let task = RegionMigrationProcedureTask {
650                region_id,
651                from_peer: from_peer.clone(),
652                to_peer: peer,
653                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
654                trigger_reason: RegionMigrationTriggerReason::Failover,
655            };
656            tasks.push((task, count));
657        }
658
659        Ok(tasks)
660    }
661
662    async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
663        let from_peer_id = task.from_peer.id;
664        let to_peer_id = task.to_peer.id;
665        let region_id = task.region_id;
666
667        info!(
668            "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
669            task.region_id, task.from_peer, task.to_peer, task.timeout, count
670        );
671
672        if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
673            return match err {
674                RegionMigrated { .. } => {
675                    info!(
676                        "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
677                        to_peer_id, region_id, from_peer_id
678                    );
679                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
680                        .await;
681                    Ok(())
682                }
683                // Returns Ok if it's running or table is dropped.
684                MigrationRunning { .. } => {
685                    info!(
686                        "Another region migration is running, skip failover for region: {}, datanode: {}",
687                        region_id, from_peer_id
688                    );
689                    Ok(())
690                }
691                TableRouteNotFound { .. } => {
692                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
693                        .await;
694                    info!(
695                        "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
696                        region_id, from_peer_id
697                    );
698                    Ok(())
699                }
700                LeaderPeerChanged { .. } => {
701                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
702                        .await;
703                    info!(
704                        "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
705                        region_id, from_peer_id
706                    );
707                    Ok(())
708                }
709                err => Err(err),
710            };
711        };
712
713        Ok(())
714    }
715
716    /// Detects the failure of regions.
717    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
718        self.failure_detector
719            .iter()
720            .filter_map(|e| {
721                // Intentionally not place `current_time_millis()` out of the iteration.
722                // The failure detection determination should be happened "just in time",
723                // i.e., failed or not has to be compared with the most recent "now".
724                // Besides, it might reduce the false positive of failure detection,
725                // because during the iteration, heartbeats are coming in as usual,
726                // and the `phi`s are still updating.
727                if !e.failure_detector().is_available(current_time_millis()) {
728                    Some(*e.region_ident())
729                } else {
730                    None
731                }
732            })
733            .collect::<Vec<_>>()
734    }
735
736    /// Returns all regions that registered in the failure detector.
737    fn regions(&self) -> HashSet<RegionId> {
738        self.failure_detector
739            .iter()
740            .map(|e| e.region_ident().1)
741            .collect::<HashSet<_>>()
742    }
743
744    /// Updates the state of corresponding failure detectors.
745    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
746        for region_id in heartbeat.regions {
747            let detecting_region = (heartbeat.datanode_id, region_id);
748            let mut detector = self
749                .failure_detector
750                .region_failure_detector(detecting_region);
751            detector.heartbeat(heartbeat.timestamp);
752        }
753    }
754
755    fn clear(&self) {
756        self.failure_detector.clear();
757    }
758}
759
760#[cfg(test)]
761pub(crate) mod tests {
762    use std::assert_matches::assert_matches;
763    use std::collections::HashMap;
764    use std::sync::{Arc, Mutex};
765    use std::time::Duration;
766
767    use common_meta::ddl::test_util::{
768        test_create_logical_table_task, test_create_physical_table_task,
769    };
770    use common_meta::ddl::RegionFailureDetectorController;
771    use common_meta::key::table_route::{
772        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
773    };
774    use common_meta::key::{runtime_switch, TableMetadataManager};
775    use common_meta::peer::Peer;
776    use common_meta::rpc::router::{Region, RegionRoute};
777    use common_meta::test_util::NoopPeerLookupService;
778    use common_telemetry::info;
779    use common_time::util::current_time_millis;
780    use rand::Rng;
781    use store_api::storage::RegionId;
782    use tokio::sync::mpsc::Sender;
783    use tokio::sync::oneshot;
784    use tokio::time::sleep;
785
786    use super::RegionSupervisorSelector;
787    use crate::procedure::region_migration::manager::RegionMigrationManager;
788    use crate::procedure::region_migration::test_util::TestingEnv;
789    use crate::region::supervisor::{
790        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
791        RegionSupervisorTicker,
792    };
793    use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
794
795    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
796        let env = TestingEnv::new();
797        let selector_context = new_test_selector_context();
798        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
799        let context_factory = env.context_factory();
800        let region_migration_manager = Arc::new(RegionMigrationManager::new(
801            env.procedure_manager().clone(),
802            context_factory,
803        ));
804        let runtime_switch_manager =
805            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
806        let peer_lookup = Arc::new(NoopPeerLookupService);
807        let (tx, rx) = RegionSupervisor::channel();
808        let kv_backend = env.kv_backend();
809
810        (
811            RegionSupervisor::new(
812                rx,
813                Default::default(),
814                selector_context,
815                RegionSupervisorSelector::NaiveSelector(selector),
816                region_migration_manager,
817                runtime_switch_manager,
818                peer_lookup,
819                kv_backend,
820            ),
821            tx,
822        )
823    }
824
825    #[tokio::test]
826    async fn test_heartbeat() {
827        let (mut supervisor, sender) = new_test_supervisor();
828        tokio::spawn(async move { supervisor.run().await });
829
830        sender
831            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
832                datanode_id: 0,
833                regions: vec![RegionId::new(1, 1)],
834                timestamp: 100,
835            }))
836            .await
837            .unwrap();
838        let (tx, rx) = oneshot::channel();
839        sender.send(Event::Dump(tx)).await.unwrap();
840        let detector = rx.await.unwrap();
841        assert!(detector.contains(&(0, RegionId::new(1, 1))));
842
843        // Clear up
844        sender.send(Event::Clear).await.unwrap();
845        let (tx, rx) = oneshot::channel();
846        sender.send(Event::Dump(tx)).await.unwrap();
847        assert!(rx.await.unwrap().is_empty());
848
849        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
850            let mut rng = rand::rng();
851            let start = current_time_millis();
852            (0..2000)
853                .map(|i| DatanodeHeartbeat {
854                    timestamp: start + i * 1000 + rng.random_range(0..100),
855                    datanode_id,
856                    regions: region_ids
857                        .iter()
858                        .map(|number| RegionId::new(0, *number))
859                        .collect(),
860                })
861                .collect::<Vec<_>>()
862        }
863
864        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
865        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
866        for heartbeat in heartbeats {
867            sender
868                .send(Event::HeartbeatArrived(heartbeat))
869                .await
870                .unwrap();
871        }
872
873        let (tx, rx) = oneshot::channel();
874        sender.send(Event::Dump(tx)).await.unwrap();
875        let detector = rx.await.unwrap();
876        assert_eq!(detector.len(), 3);
877
878        for e in detector.iter() {
879            let fd = e.failure_detector();
880            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
881            let start = last_heartbeat_time;
882
883            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
884            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
885                let now = start + i * 1000;
886                assert_eq!(fd.phi(now), 0.0);
887            }
888
889            // ... then in less than two seconds, phi is above the threshold.
890            // The same effect can be seen in the diagrams in Akka's document.
891            let now = start + acceptable_heartbeat_pause_millis + 1000;
892            assert!(fd.phi(now) < fd.threshold() as _);
893            let now = start + acceptable_heartbeat_pause_millis + 2000;
894            assert!(fd.phi(now) > fd.threshold() as _);
895        }
896    }
897
898    #[tokio::test]
899    async fn test_supervisor_ticker() {
900        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
901        let ticker = RegionSupervisorTicker {
902            tick_handle: Mutex::new(None),
903            tick_interval: Duration::from_millis(10),
904            initialization_delay: Duration::from_millis(100),
905            initialization_retry_period: Duration::from_millis(100),
906            sender: tx,
907        };
908        // It's ok if we start the ticker again.
909        for _ in 0..2 {
910            ticker.start();
911            sleep(Duration::from_millis(100)).await;
912            ticker.stop();
913            assert!(!rx.is_empty());
914            while let Ok(event) = rx.try_recv() {
915                assert_matches!(
916                    event,
917                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
918                );
919            }
920        }
921    }
922
923    #[tokio::test]
924    async fn test_initialize_all_regions_event_handling() {
925        common_telemetry::init_default_ut_logging();
926        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
927        let ticker = RegionSupervisorTicker {
928            tick_handle: Mutex::new(None),
929            tick_interval: Duration::from_millis(1000),
930            initialization_delay: Duration::from_millis(50),
931            initialization_retry_period: Duration::from_millis(50),
932            sender: tx,
933        };
934        ticker.start();
935        sleep(Duration::from_millis(60)).await;
936        let handle = tokio::spawn(async move {
937            let mut counter = 0;
938            while let Some(event) = rx.recv().await {
939                if let Event::InitializeAllRegions(tx) = event {
940                    if counter == 0 {
941                        // Ignore the first event
942                        counter += 1;
943                        continue;
944                    }
945                    tx.send(()).unwrap();
946                    info!("Responded initialize all regions event");
947                    break;
948                }
949            }
950            rx
951        });
952
953        let rx = handle.await.unwrap();
954        for _ in 0..3 {
955            sleep(Duration::from_millis(100)).await;
956            assert!(rx.is_empty());
957        }
958    }
959
960    #[tokio::test]
961    async fn test_initialize_all_regions() {
962        common_telemetry::init_default_ut_logging();
963        let (mut supervisor, sender) = new_test_supervisor();
964        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
965
966        // Create a physical table metadata
967        let table_id = 1024;
968        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
969        create_physical_table_task.set_table_id(table_id);
970        let table_info = create_physical_table_task.table_info;
971        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
972            region: Region {
973                id: RegionId::new(table_id, 0),
974                ..Default::default()
975            },
976            leader_peer: Some(Peer::empty(1)),
977            ..Default::default()
978        }]);
979        let table_route_value = TableRouteValue::Physical(table_route);
980        table_metadata_manager
981            .create_table_metadata(table_info, table_route_value, HashMap::new())
982            .await
983            .unwrap();
984
985        // Create a logical table metadata
986        let logical_table_id = 1025;
987        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
988        test_create_logical_table_task.set_table_id(logical_table_id);
989        let table_info = test_create_logical_table_task.table_info;
990        let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
991        let table_route_value = TableRouteValue::Logical(table_route);
992        table_metadata_manager
993            .create_table_metadata(table_info, table_route_value, HashMap::new())
994            .await
995            .unwrap();
996        tokio::spawn(async move { supervisor.run().await });
997        let (tx, rx) = oneshot::channel();
998        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
999        assert!(rx.await.is_ok());
1000
1001        let (tx, rx) = oneshot::channel();
1002        sender.send(Event::Dump(tx)).await.unwrap();
1003        let detector = rx.await.unwrap();
1004        assert_eq!(detector.len(), 1);
1005        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1006    }
1007
1008    #[tokio::test]
1009    async fn test_initialize_all_regions_with_maintenance_mode() {
1010        common_telemetry::init_default_ut_logging();
1011        let (mut supervisor, sender) = new_test_supervisor();
1012
1013        supervisor
1014            .runtime_switch_manager
1015            .set_maintenance_mode()
1016            .await
1017            .unwrap();
1018        tokio::spawn(async move { supervisor.run().await });
1019        let (tx, rx) = oneshot::channel();
1020        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1021        // The sender is dropped, so the receiver will receive an error.
1022        assert!(rx.await.is_err());
1023    }
1024
1025    #[tokio::test]
1026    async fn test_region_failure_detector_controller() {
1027        let (mut supervisor, sender) = new_test_supervisor();
1028        let controller = RegionFailureDetectorControl::new(sender.clone());
1029        tokio::spawn(async move { supervisor.run().await });
1030        let detecting_region = (1, RegionId::new(1, 1));
1031        controller
1032            .register_failure_detectors(vec![detecting_region])
1033            .await;
1034
1035        let (tx, rx) = oneshot::channel();
1036        sender.send(Event::Dump(tx)).await.unwrap();
1037        let detector = rx.await.unwrap();
1038        let region_detector = detector.region_failure_detector(detecting_region).clone();
1039
1040        // Registers failure detector again
1041        controller
1042            .register_failure_detectors(vec![detecting_region])
1043            .await;
1044        let (tx, rx) = oneshot::channel();
1045        sender.send(Event::Dump(tx)).await.unwrap();
1046        let detector = rx.await.unwrap();
1047        let got = detector.region_failure_detector(detecting_region).clone();
1048        assert_eq!(region_detector, got);
1049
1050        controller
1051            .deregister_failure_detectors(vec![detecting_region])
1052            .await;
1053        let (tx, rx) = oneshot::channel();
1054        sender.send(Event::Dump(tx)).await.unwrap();
1055        assert!(rx.await.unwrap().is_empty());
1056    }
1057}