meta_srv/region/
supervisor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
36use futures::{StreamExt, TryStreamExt};
37use snafu::{ResultExt, ensure};
38use store_api::storage::RegionId;
39use tokio::sync::mpsc::{Receiver, Sender};
40use tokio::sync::oneshot;
41use tokio::time::{MissedTickBehavior, interval, interval_at};
42
43use crate::discovery::utils::accept_ingest_workload;
44use crate::error::{self, Result};
45use crate::failure_detector::PhiAccrualFailureDetectorOptions;
46use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
47use crate::procedure::region_migration::manager::{
48    RegionMigrationManagerRef, RegionMigrationTriggerReason,
49};
50use crate::procedure::region_migration::{
51    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
58/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
59/// and a timestamp indicating when the heartbeat was sent.
60#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62    datanode_id: DatanodeId,
63    // TODO(weny): Considers collecting the memtable size in regions.
64    regions: Vec<RegionId>,
65    timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69    fn from(value: &Stat) -> Self {
70        DatanodeHeartbeat {
71            datanode_id: value.id,
72            regions: value.region_stats.iter().map(|x| x.id).collect(),
73            timestamp: value.timestamp_millis,
74        }
75    }
76}
77
78/// `Event` represents various types of events that can be processed by the region supervisor.
79/// These events are crucial for managing state transitions and handling specific scenarios
80/// in the region lifecycle.
81///
82/// Variants:
83/// - `Tick`: This event is used to trigger region failure detection periodically.
84/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
85/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
86/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
87/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
88/// - `Clear`: This event is used to reset the state of the supervisor, typically used
89///   when a system-wide reset or reinitialization is needed.
90/// - `Dump`: (Available only in test) This event triggers a dump of the
91///   current state for debugging purposes. It allows developers to inspect the internal state
92///   of the supervisor during tests.
93pub(crate) enum Event {
94    Tick,
95    InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
96    RegisterFailureDetectors(Vec<DetectingRegion>),
97    DeregisterFailureDetectors(Vec<DetectingRegion>),
98    HeartbeatArrived(DatanodeHeartbeat),
99    Clear,
100    #[cfg(test)]
101    Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
102}
103
104impl Debug for Event {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Tick => write!(f, "Tick"),
108            Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
109            Self::Clear => write!(f, "Clear"),
110            Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
111            Self::RegisterFailureDetectors(arg0) => f
112                .debug_tuple("RegisterFailureDetectors")
113                .field(arg0)
114                .finish(),
115            Self::DeregisterFailureDetectors(arg0) => f
116                .debug_tuple("DeregisterFailureDetectors")
117                .field(arg0)
118                .finish(),
119            #[cfg(test)]
120            Self::Dump(_) => f.debug_struct("Dump").finish(),
121        }
122    }
123}
124
125pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
126
127/// A background job to generate [`Event::Tick`] type events.
128#[derive(Debug)]
129pub struct RegionSupervisorTicker {
130    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
131    tick_handle: Mutex<Option<JoinHandle<()>>>,
132
133    /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
134    initialization_handler: Mutex<Option<JoinHandle<()>>>,
135
136    /// The interval of tick.
137    tick_interval: Duration,
138
139    /// The delay before initializing all region failure detectors.
140    initialization_delay: Duration,
141
142    /// The retry period for initializing all region failure detectors.
143    initialization_retry_period: Duration,
144
145    /// Sends [Event]s.
146    sender: Sender<Event>,
147}
148
149#[async_trait]
150impl LeadershipChangeListener for RegionSupervisorTicker {
151    fn name(&self) -> &'static str {
152        "RegionSupervisorTicker"
153    }
154
155    async fn on_leader_start(&self) -> common_meta::error::Result<()> {
156        self.start();
157        Ok(())
158    }
159
160    async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
161        self.stop();
162        Ok(())
163    }
164}
165
166impl RegionSupervisorTicker {
167    pub(crate) fn new(
168        tick_interval: Duration,
169        initialization_delay: Duration,
170        initialization_retry_period: Duration,
171        sender: Sender<Event>,
172    ) -> Self {
173        info!(
174            "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
175            tick_interval, initialization_delay, initialization_retry_period
176        );
177        Self {
178            tick_handle: Mutex::new(None),
179            initialization_handler: Mutex::new(None),
180            tick_interval,
181            initialization_delay,
182            initialization_retry_period,
183            sender,
184        }
185    }
186
187    /// Starts the ticker.
188    pub fn start(&self) {
189        let mut handle = self.tick_handle.lock().unwrap();
190        if handle.is_none() {
191            let sender = self.sender.clone();
192            let tick_interval = self.tick_interval;
193            let initialization_delay = self.initialization_delay;
194
195            let mut initialization_interval = interval_at(
196                tokio::time::Instant::now() + initialization_delay,
197                self.initialization_retry_period,
198            );
199            initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200            let initialization_handler = common_runtime::spawn_global(async move {
201                loop {
202                    initialization_interval.tick().await;
203                    let (tx, rx) = oneshot::channel();
204                    if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
205                        info!(
206                            "EventReceiver is dropped, region failure detectors initialization loop is stopped"
207                        );
208                        break;
209                    }
210                    if rx.await.is_ok() {
211                        info!("All region failure detectors are initialized.");
212                        break;
213                    }
214                }
215            });
216            *self.initialization_handler.lock().unwrap() = Some(initialization_handler);
217
218            let sender = self.sender.clone();
219            let ticker_loop = tokio::spawn(async move {
220                let mut tick_interval = interval(tick_interval);
221                tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222
223                if let Err(err) = sender.send(Event::Clear).await {
224                    warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
225                    return;
226                }
227                loop {
228                    tick_interval.tick().await;
229                    if sender.send(Event::Tick).await.is_err() {
230                        info!("EventReceiver is dropped, tick loop is stopped");
231                        break;
232                    }
233                }
234            });
235            *handle = Some(ticker_loop);
236        }
237    }
238
239    /// Stops the ticker.
240    pub fn stop(&self) {
241        let handle = self.tick_handle.lock().unwrap().take();
242        if let Some(handle) = handle {
243            handle.abort();
244            info!("The tick loop is stopped.");
245        }
246        let initialization_handler = self.initialization_handler.lock().unwrap().take();
247        if let Some(initialization_handler) = initialization_handler {
248            initialization_handler.abort();
249            info!("The initialization loop is stopped.");
250        }
251    }
252}
253
254impl Drop for RegionSupervisorTicker {
255    fn drop(&mut self) {
256        self.stop();
257    }
258}
259
260pub type RegionSupervisorRef = Arc<RegionSupervisor>;
261
262/// The default tick interval.
263pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
264/// The default initialization retry period.
265pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
266
267/// Selector for region supervisor.
268pub enum RegionSupervisorSelector {
269    NaiveSelector(SelectorRef),
270    RegionStatAwareSelector(RegionStatAwareSelectorRef),
271}
272
273/// The [`RegionSupervisor`] is used to detect Region failures
274/// and initiate Region failover upon detection, ensuring uninterrupted region service.
275pub struct RegionSupervisor {
276    /// Used to detect the failure of regions.
277    failure_detector: RegionFailureDetector,
278    /// Tracks the number of failovers for each region.
279    failover_counts: HashMap<DetectingRegion, u32>,
280    /// Receives [Event]s.
281    receiver: Receiver<Event>,
282    /// The context of [`SelectorRef`]
283    selector_context: SelectorContext,
284    /// Candidate node selector.
285    selector: RegionSupervisorSelector,
286    /// Region migration manager.
287    region_migration_manager: RegionMigrationManagerRef,
288    /// The maintenance mode manager.
289    runtime_switch_manager: RuntimeSwitchManagerRef,
290    /// Peer resolver
291    peer_resolver: PeerResolverRef,
292    /// The kv backend.
293    kv_backend: KvBackendRef,
294    /// The meta state, used to check if the current metasrv is the leader.
295    state: Option<StateRef>,
296}
297
298/// Controller for managing failure detectors for regions.
299#[derive(Debug, Clone)]
300pub struct RegionFailureDetectorControl {
301    sender: Sender<Event>,
302}
303
304impl RegionFailureDetectorControl {
305    pub(crate) fn new(sender: Sender<Event>) -> Self {
306        Self { sender }
307    }
308}
309
310#[async_trait::async_trait]
311impl RegionFailureDetectorController for RegionFailureDetectorControl {
312    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
313        if let Err(err) = self
314            .sender
315            .send(Event::RegisterFailureDetectors(detecting_regions))
316            .await
317        {
318            error!(err; "RegionSupervisor has stop receiving heartbeat.");
319        }
320    }
321
322    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
323        if let Err(err) = self
324            .sender
325            .send(Event::DeregisterFailureDetectors(detecting_regions))
326            .await
327        {
328            error!(err; "RegionSupervisor has stop receiving heartbeat.");
329        }
330    }
331}
332
333/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
334#[derive(Clone)]
335pub(crate) struct HeartbeatAcceptor {
336    sender: Sender<Event>,
337}
338
339impl HeartbeatAcceptor {
340    pub(crate) fn new(sender: Sender<Event>) -> Self {
341        Self { sender }
342    }
343
344    /// Accepts heartbeats from datanodes.
345    pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
346        if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
347            error!(err; "RegionSupervisor has stop receiving heartbeat.");
348        }
349    }
350}
351
352impl RegionSupervisor {
353    /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
354    pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
355        tokio::sync::mpsc::channel(1024)
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn new(
360        event_receiver: Receiver<Event>,
361        options: PhiAccrualFailureDetectorOptions,
362        selector_context: SelectorContext,
363        selector: RegionSupervisorSelector,
364        region_migration_manager: RegionMigrationManagerRef,
365        runtime_switch_manager: RuntimeSwitchManagerRef,
366        peer_resolver: PeerResolverRef,
367        kv_backend: KvBackendRef,
368    ) -> Self {
369        Self {
370            failure_detector: RegionFailureDetector::new(options),
371            failover_counts: HashMap::new(),
372            receiver: event_receiver,
373            selector_context,
374            selector,
375            region_migration_manager,
376            runtime_switch_manager,
377            peer_resolver,
378            kv_backend,
379            state: None,
380        }
381    }
382
383    /// Sets the meta state.
384    pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385        self.state = Some(state);
386        self
387    }
388
389    /// Runs the main loop.
390    pub(crate) async fn run(&mut self) {
391        while let Some(event) = self.receiver.recv().await {
392            if let Some(state) = self.state.as_ref()
393                && !state.read().unwrap().is_leader()
394            {
395                warn!(
396                    "The current metasrv is not the leader, ignore {:?} event",
397                    event
398                );
399                continue;
400            }
401
402            match event {
403                Event::InitializeAllRegions(sender) => {
404                    match self.is_maintenance_mode_enabled().await {
405                        Ok(false) => {}
406                        Ok(true) => {
407                            warn!(
408                                "Skipping initialize all regions since maintenance mode is enabled."
409                            );
410                            continue;
411                        }
412                        Err(err) => {
413                            error!(err; "Failed to check maintenance mode during initialize all regions.");
414                            continue;
415                        }
416                    }
417
418                    if let Err(err) = self.initialize_all().await {
419                        error!(err; "Failed to initialize all regions.");
420                    } else {
421                        // Ignore the error.
422                        let _ = sender.send(());
423                    }
424                }
425                Event::Tick => {
426                    let regions = self.detect_region_failure();
427                    self.handle_region_failures(regions).await;
428                }
429                Event::RegisterFailureDetectors(detecting_regions) => {
430                    self.register_failure_detectors(detecting_regions).await
431                }
432                Event::DeregisterFailureDetectors(detecting_regions) => {
433                    self.deregister_failure_detectors(detecting_regions).await
434                }
435                Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
436                Event::Clear => {
437                    self.clear();
438                    info!("Region supervisor is initialized.");
439                }
440                #[cfg(test)]
441                Event::Dump(sender) => {
442                    let _ = sender.send(self.failure_detector.dump());
443                }
444            }
445        }
446        info!("RegionSupervisor is stopped!");
447    }
448
449    async fn initialize_all(&self) -> Result<()> {
450        let now = Instant::now();
451        let regions = self.regions();
452        let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
453        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
454            TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
455        })
456        .into_stream();
457
458        let mut stream = stream
459            .map_ok(|(_, value)| {
460                TableRouteValue::try_from_raw_value(&value)
461                    .context(error::TableMetadataManagerSnafu)
462            })
463            .boxed();
464        let mut detecting_regions = Vec::new();
465        while let Some(route) = stream
466            .try_next()
467            .await
468            .context(error::TableMetadataManagerSnafu)?
469        {
470            let route = route?;
471            if !route.is_physical() {
472                continue;
473            }
474
475            let physical_table_route = route.into_physical_table_route();
476            physical_table_route
477                .region_routes
478                .iter()
479                .for_each(|region_route| {
480                    if !regions.contains(&region_route.region.id)
481                        && let Some(leader_peer) = &region_route.leader_peer
482                    {
483                        detecting_regions.push((leader_peer.id, region_route.region.id));
484                    }
485                });
486        }
487
488        let num_detecting_regions = detecting_regions.len();
489        if !detecting_regions.is_empty() {
490            self.register_failure_detectors(detecting_regions).await;
491        }
492
493        info!(
494            "Initialize {} region failure detectors, elapsed: {:?}",
495            num_detecting_regions,
496            now.elapsed()
497        );
498
499        Ok(())
500    }
501
502    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
503        let ts_millis = current_time_millis();
504        for region in detecting_regions {
505            // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
506            self.failure_detector
507                .maybe_init_region_failure_detector(region, ts_millis);
508        }
509    }
510
511    async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
512        for region in detecting_regions {
513            self.failure_detector.remove(&region);
514            self.failover_counts.remove(&region);
515        }
516    }
517
518    async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
519        if regions.is_empty() {
520            return;
521        }
522        match self.is_maintenance_mode_enabled().await {
523            Ok(false) => {}
524            Ok(true) => {
525                warn!(
526                    "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
527                    regions
528                );
529                return;
530            }
531            Err(err) => {
532                error!(err; "Failed to check maintenance mode");
533                return;
534            }
535        }
536
537        // Extracts regions that are migrating(failover), which means they are already being triggered failover.
538        let migrating_regions = regions
539            .extract_if(.., |(_, region_id)| {
540                self.region_migration_manager.tracker().contains(*region_id)
541            })
542            .collect::<Vec<_>>();
543
544        for (datanode_id, region_id) in migrating_regions {
545            debug!(
546                "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
547            );
548        }
549
550        if regions.is_empty() {
551            // If all detected regions are failover or migrating, just return.
552            return;
553        }
554
555        let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
556            HashMap::with_capacity(regions.len());
557        for (datanode_id, region_id) in regions {
558            grouped_regions
559                .entry(datanode_id)
560                .or_default()
561                .push(region_id);
562        }
563
564        for (datanode_id, regions) in grouped_regions {
565            warn!(
566                "Detects region failures on datanode: {}, regions: {:?}",
567                datanode_id, regions
568            );
569            // We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
570            // because there may be false positives in failure detection on the datanode.
571            // So we only consider the datanode that reports the failure.
572            let failed_datanodes = [datanode_id];
573            match self
574                .generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
575                .await
576            {
577                Ok(tasks) => {
578                    for (task, count) in tasks {
579                        let region_id = task.region_id;
580                        let datanode_id = task.from_peer.id;
581                        if let Err(err) = self.do_failover(task, count).await {
582                            error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
583                        }
584                    }
585                }
586                Err(err) => error!(err; "Failed to generate failover tasks"),
587            }
588        }
589    }
590
591    pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
592        self.runtime_switch_manager
593            .maintenance_mode()
594            .await
595            .context(error::RuntimeSwitchManagerSnafu)
596    }
597
598    async fn select_peers(
599        &self,
600        from_peer_id: DatanodeId,
601        regions: &[RegionId],
602        failure_datanodes: &[DatanodeId],
603    ) -> Result<Vec<(RegionId, Peer)>> {
604        let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
605        match &self.selector {
606            RegionSupervisorSelector::NaiveSelector(selector) => {
607                let opt = SelectorOptions {
608                    min_required_items: regions.len(),
609                    allow_duplication: true,
610                    exclude_peer_ids,
611                    workload_filter: Some(accept_ingest_workload),
612                };
613                let peers = selector.select(&self.selector_context, opt).await?;
614                ensure!(
615                    peers.len() == regions.len(),
616                    error::NoEnoughAvailableNodeSnafu {
617                        required: regions.len(),
618                        available: peers.len(),
619                        select_target: SelectTarget::Datanode,
620                    }
621                );
622                let region_peers = regions
623                    .iter()
624                    .zip(peers)
625                    .map(|(region_id, peer)| (*region_id, peer))
626                    .collect::<Vec<_>>();
627
628                Ok(region_peers)
629            }
630            RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
631                let peers = selector
632                    .select(
633                        &self.selector_context,
634                        from_peer_id,
635                        regions,
636                        exclude_peer_ids,
637                    )
638                    .await?;
639                ensure!(
640                    peers.len() == regions.len(),
641                    error::NoEnoughAvailableNodeSnafu {
642                        required: regions.len(),
643                        available: peers.len(),
644                        select_target: SelectTarget::Datanode,
645                    }
646                );
647
648                Ok(peers)
649            }
650        }
651    }
652
653    async fn generate_failover_tasks(
654        &mut self,
655        from_peer_id: DatanodeId,
656        regions: &[RegionId],
657        failed_datanodes: &[DatanodeId],
658    ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
659        let mut tasks = Vec::with_capacity(regions.len());
660        let from_peer = self
661            .peer_resolver
662            .datanode(from_peer_id)
663            .await
664            .ok()
665            .flatten()
666            .unwrap_or_else(|| Peer::empty(from_peer_id));
667
668        let region_peers = self
669            .select_peers(from_peer_id, regions, failed_datanodes)
670            .await?;
671
672        for (region_id, peer) in region_peers {
673            let count = *self
674                .failover_counts
675                .entry((from_peer_id, region_id))
676                .and_modify(|count| *count += 1)
677                .or_insert(1);
678            let task = RegionMigrationProcedureTask {
679                region_id,
680                from_peer: from_peer.clone(),
681                to_peer: peer,
682                timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
683                trigger_reason: RegionMigrationTriggerReason::Failover,
684            };
685            tasks.push((task, count));
686        }
687
688        Ok(tasks)
689    }
690
691    async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
692        let from_peer_id = task.from_peer.id;
693        let to_peer_id = task.to_peer.id;
694        let region_id = task.region_id;
695
696        info!(
697            "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
698            task.region_id, task.from_peer, task.to_peer, task.timeout, count
699        );
700
701        if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
702            return match err {
703                RegionMigrated { .. } => {
704                    info!(
705                        "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
706                        to_peer_id, region_id, from_peer_id
707                    );
708                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
709                        .await;
710                    Ok(())
711                }
712                // Returns Ok if it's running or table is dropped.
713                MigrationRunning { .. } => {
714                    info!(
715                        "Another region migration is running, skip failover for region: {}, datanode: {}",
716                        region_id, from_peer_id
717                    );
718                    Ok(())
719                }
720                TableRouteNotFound { .. } => {
721                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
722                        .await;
723                    info!(
724                        "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
725                        region_id, from_peer_id
726                    );
727                    Ok(())
728                }
729                LeaderPeerChanged { .. } => {
730                    self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
731                        .await;
732                    info!(
733                        "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
734                        region_id, from_peer_id
735                    );
736                    Ok(())
737                }
738                err => Err(err),
739            };
740        };
741
742        Ok(())
743    }
744
745    /// Detects the failure of regions.
746    fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
747        self.failure_detector
748            .iter()
749            .filter_map(|e| {
750                // Intentionally not place `current_time_millis()` out of the iteration.
751                // The failure detection determination should be happened "just in time",
752                // i.e., failed or not has to be compared with the most recent "now".
753                // Besides, it might reduce the false positive of failure detection,
754                // because during the iteration, heartbeats are coming in as usual,
755                // and the `phi`s are still updating.
756                if !e.failure_detector().is_available(current_time_millis()) {
757                    Some(*e.region_ident())
758                } else {
759                    None
760                }
761            })
762            .collect::<Vec<_>>()
763    }
764
765    /// Returns all regions that registered in the failure detector.
766    fn regions(&self) -> HashSet<RegionId> {
767        self.failure_detector
768            .iter()
769            .map(|e| e.region_ident().1)
770            .collect::<HashSet<_>>()
771    }
772
773    /// Updates the state of corresponding failure detectors.
774    fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
775        for region_id in heartbeat.regions {
776            let detecting_region = (heartbeat.datanode_id, region_id);
777            let mut detector = self
778                .failure_detector
779                .region_failure_detector(detecting_region);
780            detector.heartbeat(heartbeat.timestamp);
781        }
782    }
783
784    fn clear(&self) {
785        self.failure_detector.clear();
786    }
787}
788
789#[cfg(test)]
790pub(crate) mod tests {
791    use std::assert_matches::assert_matches;
792    use std::collections::HashMap;
793    use std::sync::{Arc, Mutex};
794    use std::time::Duration;
795
796    use common_meta::ddl::RegionFailureDetectorController;
797    use common_meta::ddl::test_util::{
798        test_create_logical_table_task, test_create_physical_table_task,
799    };
800    use common_meta::key::table_route::{
801        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
802    };
803    use common_meta::key::{TableMetadataManager, runtime_switch};
804    use common_meta::peer::Peer;
805    use common_meta::rpc::router::{Region, RegionRoute};
806    use common_meta::test_util::NoopPeerResolver;
807    use common_telemetry::info;
808    use common_time::util::current_time_millis;
809    use rand::Rng;
810    use store_api::storage::RegionId;
811    use tokio::sync::mpsc::Sender;
812    use tokio::sync::oneshot;
813    use tokio::time::sleep;
814
815    use super::RegionSupervisorSelector;
816    use crate::procedure::region_migration::manager::RegionMigrationManager;
817    use crate::procedure::region_migration::test_util::TestingEnv;
818    use crate::region::supervisor::{
819        DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
820        RegionSupervisorTicker,
821    };
822    use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
823
824    pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
825        let env = TestingEnv::new();
826        let selector_context = new_test_selector_context();
827        let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
828        let context_factory = env.context_factory();
829        let region_migration_manager = Arc::new(RegionMigrationManager::new(
830            env.procedure_manager().clone(),
831            context_factory,
832        ));
833        let runtime_switch_manager =
834            Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
835        let peer_resolver = Arc::new(NoopPeerResolver);
836        let (tx, rx) = RegionSupervisor::channel();
837        let kv_backend = env.kv_backend();
838
839        (
840            RegionSupervisor::new(
841                rx,
842                Default::default(),
843                selector_context,
844                RegionSupervisorSelector::NaiveSelector(selector),
845                region_migration_manager,
846                runtime_switch_manager,
847                peer_resolver,
848                kv_backend,
849            ),
850            tx,
851        )
852    }
853
854    #[tokio::test]
855    async fn test_heartbeat() {
856        let (mut supervisor, sender) = new_test_supervisor();
857        tokio::spawn(async move { supervisor.run().await });
858
859        sender
860            .send(Event::HeartbeatArrived(DatanodeHeartbeat {
861                datanode_id: 0,
862                regions: vec![RegionId::new(1, 1)],
863                timestamp: 100,
864            }))
865            .await
866            .unwrap();
867        let (tx, rx) = oneshot::channel();
868        sender.send(Event::Dump(tx)).await.unwrap();
869        let detector = rx.await.unwrap();
870        assert!(detector.contains(&(0, RegionId::new(1, 1))));
871
872        // Clear up
873        sender.send(Event::Clear).await.unwrap();
874        let (tx, rx) = oneshot::channel();
875        sender.send(Event::Dump(tx)).await.unwrap();
876        assert!(rx.await.unwrap().is_empty());
877
878        fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
879            let mut rng = rand::rng();
880            let start = current_time_millis();
881            (0..2000)
882                .map(|i| DatanodeHeartbeat {
883                    timestamp: start + i * 1000 + rng.random_range(0..100),
884                    datanode_id,
885                    regions: region_ids
886                        .iter()
887                        .map(|number| RegionId::new(0, *number))
888                        .collect(),
889                })
890                .collect::<Vec<_>>()
891        }
892
893        let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
894        let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
895        for heartbeat in heartbeats {
896            sender
897                .send(Event::HeartbeatArrived(heartbeat))
898                .await
899                .unwrap();
900        }
901
902        let (tx, rx) = oneshot::channel();
903        sender.send(Event::Dump(tx)).await.unwrap();
904        let detector = rx.await.unwrap();
905        assert_eq!(detector.len(), 3);
906
907        for e in detector.iter() {
908            let fd = e.failure_detector();
909            let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
910            let start = last_heartbeat_time;
911
912            // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
913            for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
914                let now = start + i * 1000;
915                assert_eq!(fd.phi(now), 0.0);
916            }
917
918            // ... then in less than two seconds, phi is above the threshold.
919            // The same effect can be seen in the diagrams in Akka's document.
920            let now = start + acceptable_heartbeat_pause_millis + 1000;
921            assert!(fd.phi(now) < fd.threshold() as _);
922            let now = start + acceptable_heartbeat_pause_millis + 2000;
923            assert!(fd.phi(now) > fd.threshold() as _);
924        }
925    }
926
927    #[tokio::test]
928    async fn test_supervisor_ticker() {
929        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
930        let ticker = RegionSupervisorTicker {
931            tick_handle: Mutex::new(None),
932            initialization_handler: Mutex::new(None),
933            tick_interval: Duration::from_millis(10),
934            initialization_delay: Duration::from_millis(100),
935            initialization_retry_period: Duration::from_millis(100),
936            sender: tx,
937        };
938        // It's ok if we start the ticker again.
939        for _ in 0..2 {
940            ticker.start();
941            sleep(Duration::from_millis(100)).await;
942            ticker.stop();
943            assert!(!rx.is_empty());
944            while let Ok(event) = rx.try_recv() {
945                assert_matches!(
946                    event,
947                    Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
948                );
949            }
950        }
951    }
952
953    #[tokio::test]
954    async fn test_initialize_all_regions_event_handling() {
955        common_telemetry::init_default_ut_logging();
956        let (tx, mut rx) = tokio::sync::mpsc::channel(128);
957        let ticker = RegionSupervisorTicker {
958            tick_handle: Mutex::new(None),
959            initialization_handler: Mutex::new(None),
960            tick_interval: Duration::from_millis(1000),
961            initialization_delay: Duration::from_millis(50),
962            initialization_retry_period: Duration::from_millis(50),
963            sender: tx,
964        };
965        ticker.start();
966        sleep(Duration::from_millis(60)).await;
967        let handle = tokio::spawn(async move {
968            let mut counter = 0;
969            while let Some(event) = rx.recv().await {
970                if let Event::InitializeAllRegions(tx) = event {
971                    if counter == 0 {
972                        // Ignore the first event
973                        counter += 1;
974                        continue;
975                    }
976                    tx.send(()).unwrap();
977                    info!("Responded initialize all regions event");
978                    break;
979                }
980            }
981            rx
982        });
983
984        let rx = handle.await.unwrap();
985        for _ in 0..3 {
986            sleep(Duration::from_millis(100)).await;
987            assert!(rx.is_empty());
988        }
989    }
990
991    #[tokio::test]
992    async fn test_initialize_all_regions() {
993        common_telemetry::init_default_ut_logging();
994        let (mut supervisor, sender) = new_test_supervisor();
995        let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
996
997        // Create a physical table metadata
998        let table_id = 1024;
999        let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1000        create_physical_table_task.set_table_id(table_id);
1001        let table_info = create_physical_table_task.table_info;
1002        let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1003            region: Region {
1004                id: RegionId::new(table_id, 0),
1005                ..Default::default()
1006            },
1007            leader_peer: Some(Peer::empty(1)),
1008            ..Default::default()
1009        }]);
1010        let table_route_value = TableRouteValue::Physical(table_route);
1011        table_metadata_manager
1012            .create_table_metadata(table_info, table_route_value, HashMap::new())
1013            .await
1014            .unwrap();
1015
1016        // Create a logical table metadata
1017        let logical_table_id = 1025;
1018        let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1019        test_create_logical_table_task.set_table_id(logical_table_id);
1020        let table_info = test_create_logical_table_task.table_info;
1021        let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
1022        let table_route_value = TableRouteValue::Logical(table_route);
1023        table_metadata_manager
1024            .create_table_metadata(table_info, table_route_value, HashMap::new())
1025            .await
1026            .unwrap();
1027        tokio::spawn(async move { supervisor.run().await });
1028        let (tx, rx) = oneshot::channel();
1029        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1030        assert!(rx.await.is_ok());
1031
1032        let (tx, rx) = oneshot::channel();
1033        sender.send(Event::Dump(tx)).await.unwrap();
1034        let detector = rx.await.unwrap();
1035        assert_eq!(detector.len(), 1);
1036        assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1037    }
1038
1039    #[tokio::test]
1040    async fn test_initialize_all_regions_with_maintenance_mode() {
1041        common_telemetry::init_default_ut_logging();
1042        let (mut supervisor, sender) = new_test_supervisor();
1043
1044        supervisor
1045            .runtime_switch_manager
1046            .set_maintenance_mode()
1047            .await
1048            .unwrap();
1049        tokio::spawn(async move { supervisor.run().await });
1050        let (tx, rx) = oneshot::channel();
1051        sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1052        // The sender is dropped, so the receiver will receive an error.
1053        assert!(rx.await.is_err());
1054    }
1055
1056    #[tokio::test]
1057    async fn test_region_failure_detector_controller() {
1058        let (mut supervisor, sender) = new_test_supervisor();
1059        let controller = RegionFailureDetectorControl::new(sender.clone());
1060        tokio::spawn(async move { supervisor.run().await });
1061        let detecting_region = (1, RegionId::new(1, 1));
1062        controller
1063            .register_failure_detectors(vec![detecting_region])
1064            .await;
1065
1066        let (tx, rx) = oneshot::channel();
1067        sender.send(Event::Dump(tx)).await.unwrap();
1068        let detector = rx.await.unwrap();
1069        let region_detector = detector.region_failure_detector(detecting_region).clone();
1070
1071        // Registers failure detector again
1072        controller
1073            .register_failure_detectors(vec![detecting_region])
1074            .await;
1075        let (tx, rx) = oneshot::channel();
1076        sender.send(Event::Dump(tx)).await.unwrap();
1077        let detector = rx.await.unwrap();
1078        let got = detector.region_failure_detector(detecting_region).clone();
1079        assert_eq!(region_detector, got);
1080
1081        controller
1082            .deregister_failure_detectors(vec![detecting_region])
1083            .await;
1084        let (tx, rx) = oneshot::channel();
1085        sender.send(Event::Dump(tx)).await.unwrap();
1086        assert!(rx.await.unwrap().is_empty());
1087    }
1088}