datanode/
alive_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::{RegionRole, SettableRegionRoleState};
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{Mutex, mpsc};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
43///
44/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached,
45/// the status of region be set to "readonly", ensures there is no side-effect in the entity system.
46///
47/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever
48/// via heartbeat. If the meta server decides some region could be resided in this Datanode,
49/// it will renew the lease of region, a deadline of [CountdownTask] will be reset.
50pub struct RegionAliveKeeper {
51    region_server: RegionServer,
52    tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53    heartbeat_interval_millis: Arc<AtomicU64>,
54    started: Arc<AtomicBool>,
55
56    /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
57    /// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically
58    /// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
59    /// duration acts like an "invariant point" for region's keep alive lease.
60    epoch: Instant,
61
62    countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66    /// Returns an empty [RegionAliveKeeper].
67    pub fn new(
68        region_server: RegionServer,
69        countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70        heartbeat_interval: Duration,
71    ) -> Self {
72        Self {
73            region_server,
74            tasks: Arc::new(Mutex::new(HashMap::new())),
75            heartbeat_interval_millis: Arc::new(AtomicU64::new(
76                heartbeat_interval.as_millis() as u64
77            )),
78            started: Arc::new(AtomicBool::new(false)),
79            epoch: Instant::now(),
80            countdown_task_handler_ext,
81        }
82    }
83
84    /// Update the heartbeat interval with the value received from Metasrv.
85    pub fn update_heartbeat_interval(&self, heartbeat_interval_millis: u64) {
86        self.heartbeat_interval_millis
87            .store(heartbeat_interval_millis, Ordering::Relaxed);
88    }
89
90    async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
91        self.tasks.lock().await.get(&region_id).cloned()
92    }
93
94    /// Add the countdown task for a specific region.
95    /// It will be ignored if the task exists.
96    pub async fn register_region(&self, region_id: RegionId) {
97        let handle = Arc::new(CountdownTaskHandle::new(
98            self.region_server.clone(),
99            self.countdown_task_handler_ext.clone(),
100            region_id,
101        ));
102
103        let should_start = {
104            let mut handles = self.tasks.lock().await;
105
106            // Check if already exists, return early if so
107            if handles.contains_key(&region_id) {
108                return;
109            }
110
111            // Insert new handle
112            handles.insert(region_id, handle.clone());
113
114            // Return whether we should start (check state inside lock)
115            self.started.load(Ordering::Relaxed)
116        };
117
118        if should_start {
119            handle
120                .start(self.heartbeat_interval_millis.load(Ordering::Relaxed))
121                .await;
122            info!("Region alive countdown for region {region_id} is started!");
123        } else {
124            info!(
125                "Region alive countdown for region {region_id} is registered but not started yet!"
126            );
127        }
128    }
129
130    /// Removes the countdown task for a specific region.
131    pub async fn deregister_region(&self, region_id: RegionId) {
132        if self.tasks.lock().await.remove(&region_id).is_some() {
133            info!("Deregister alive countdown for region {region_id}")
134        }
135    }
136
137    /// Renews the lease of regions to `deadline`.
138    async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
139        for region in regions {
140            let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
141            if let Some(handle) = self.find_handle(region_id).await {
142                handle
143                    .reset_deadline(role, deadline, region.extensions.clone())
144                    .await;
145            } else {
146                warn!(
147                    "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
148                );
149                // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
150            }
151        }
152    }
153
154    async fn close_staled_region(&self, region_id: RegionId) {
155        info!("Closing staled region: {region_id}");
156        let request = RegionRequest::Close(RegionCloseRequest {});
157        if let Err(e) = self.region_server.handle_request(region_id, request).await
158            && e.status_code() != StatusCode::RegionNotFound
159        {
160            let _ = self
161                .region_server
162                .set_region_role(region_id, RegionRole::Follower);
163            error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
164        }
165    }
166
167    /// Closes staled regions.
168    async fn close_staled_regions(&self, regions: &[u64]) {
169        for region_id in regions {
170            self.close_staled_region(RegionId::from_u64(*region_id))
171                .await;
172        }
173    }
174
175    #[cfg(test)]
176    async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
177        let mut deadline = None;
178        if let Some(handle) = self.find_handle(region_id).await {
179            let (s, r) = oneshot::channel();
180            if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
181                deadline = r.await.ok()
182            }
183        }
184        deadline
185    }
186
187    pub async fn start(
188        self: &Arc<Self>,
189        event_receiver: Option<RegionServerEventReceiver>,
190    ) -> Result<()> {
191        self.started.store(true, Ordering::Relaxed);
192
193        if let Some(mut event_receiver) = event_receiver {
194            let keeper = self.clone();
195            // Initializers region alive keeper.
196            // It makes sure all opened regions are registered to `RegionAliveKeeper.`
197            loop {
198                match event_receiver.0.try_recv() {
199                    Ok(RegionServerEvent::Registered(region_id)) => {
200                        keeper.register_region(region_id).await;
201                    }
202                    Ok(RegionServerEvent::Deregistered(region_id)) => {
203                        keeper.deregister_region(region_id).await;
204                    }
205                    Err(mpsc::error::TryRecvError::Disconnected) => {
206                        return error::UnexpectedSnafu {
207                            violated: "RegionServerEventSender closed",
208                        }
209                        .fail();
210                    }
211                    Err(mpsc::error::TryRecvError::Empty) => {
212                        break;
213                    }
214                }
215            }
216            let running = self.started.clone();
217
218            // Watches changes
219            common_runtime::spawn_global(async move {
220                loop {
221                    if !running.load(Ordering::Relaxed) {
222                        info!("RegionAliveKeeper stopped! Quits the watch loop!");
223                        break;
224                    }
225
226                    match event_receiver.0.recv().await {
227                        Some(RegionServerEvent::Registered(region_id)) => {
228                            keeper.register_region(region_id).await;
229                        }
230                        Some(RegionServerEvent::Deregistered(region_id)) => {
231                            keeper.deregister_region(region_id).await;
232                        }
233                        None => {
234                            info!("RegionServerEventSender closed! Quits the watch loop!");
235                            break;
236                        }
237                    }
238                }
239            });
240        }
241
242        let tasks = self.tasks.lock().await;
243        let interval = self.heartbeat_interval_millis.load(Ordering::Relaxed);
244        for task in tasks.values() {
245            task.start(interval).await;
246        }
247
248        info!(
249            "RegionAliveKeeper is started with region {:?}",
250            tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
251        );
252
253        Ok(())
254    }
255
256    pub fn epoch(&self) -> Instant {
257        self.epoch
258    }
259}
260
261#[async_trait]
262impl HeartbeatResponseHandler for RegionAliveKeeper {
263    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
264        ctx.response.region_lease.is_some()
265    }
266
267    async fn handle(
268        &self,
269        ctx: &mut HeartbeatResponseHandlerContext,
270    ) -> common_meta::error::Result<HandleControl> {
271        let region_lease = ctx
272            .response
273            .region_lease
274            .as_ref()
275            .context(InvalidProtoMsgSnafu {
276                err_msg: "'region_lease' is missing in heartbeat response",
277            })?;
278        let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
279        let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
280
281        self.renew_region_leases(&region_lease.regions, deadline)
282            .await;
283        self.close_staled_regions(&region_lease.closeable_region_ids)
284            .await;
285
286        Ok(HandleControl::Continue)
287    }
288}
289
290#[derive(Debug)]
291enum CountdownCommand {
292    /// Start this countdown task. The first deadline will be set to
293    /// 4 * `heartbeat_interval_millis`
294    Start(u64),
295    /// Reset countdown deadline to the given instance.
296    /// (NextRole, Deadline, ExtensionInfo)
297    Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
298    /// Returns the current deadline of the countdown task.
299    #[cfg(test)]
300    Deadline(oneshot::Sender<Instant>),
301}
302
303pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
304
305/// Extension trait for [CountdownTaskHandlerExt] to reset deadline of a region.
306#[async_trait]
307pub trait CountdownTaskHandlerExt: Send + Sync {
308    async fn reset_deadline(
309        &self,
310        region_server: &RegionServer,
311        region_id: RegionId,
312        role: RegionRole,
313        deadline: Instant,
314        extension_info: HashMap<String, Vec<u8>>,
315    );
316}
317
318struct CountdownTaskHandle {
319    tx: mpsc::Sender<CountdownCommand>,
320    handler: JoinHandle<()>,
321    region_id: RegionId,
322}
323
324impl CountdownTaskHandle {
325    /// Creates a new [CountdownTaskHandle] and starts the countdown task.
326    fn new(
327        region_server: RegionServer,
328        handler_ext: Option<CountdownTaskHandlerExtRef>,
329        region_id: RegionId,
330    ) -> Self {
331        let (tx, rx) = mpsc::channel(1024);
332
333        let mut countdown_task = CountdownTask {
334            region_server,
335            handler_ext,
336            region_id,
337            rx,
338        };
339        let handler = common_runtime::spawn_hb(async move {
340            countdown_task.run().await;
341        });
342
343        Self {
344            tx,
345            handler,
346            region_id,
347        }
348    }
349
350    /// Starts the [CountdownTask],
351    /// it will be ignored if the task started.
352    async fn start(&self, heartbeat_interval_millis: u64) {
353        if let Err(e) = self
354            .tx
355            .send(CountdownCommand::Start(heartbeat_interval_millis))
356            .await
357        {
358            warn!(
359                "Failed to start region alive keeper countdown: {e}. \
360                Maybe the task is stopped due to region been closed."
361            );
362        }
363    }
364
365    #[cfg(test)]
366    async fn deadline(&self) -> Option<Instant> {
367        let (tx, rx) = oneshot::channel();
368        if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
369            return rx.await.ok();
370        }
371        None
372    }
373
374    async fn reset_deadline(
375        &self,
376        role: RegionRole,
377        deadline: Instant,
378        extension_info: HashMap<String, Vec<u8>>,
379    ) {
380        if let Err(e) = self
381            .tx
382            .send(CountdownCommand::Reset((role, deadline, extension_info)))
383            .await
384        {
385            warn!(
386                "Failed to reset region alive keeper deadline: {e}. \
387                Maybe the task is stopped due to region been closed."
388            );
389        }
390    }
391}
392
393impl Drop for CountdownTaskHandle {
394    fn drop(&mut self) {
395        debug!(
396            "Aborting region alive countdown task for region {}",
397            self.region_id
398        );
399        self.handler.abort();
400    }
401}
402
403struct CountdownTask {
404    region_server: RegionServer,
405    region_id: RegionId,
406    handler_ext: Option<CountdownTaskHandlerExtRef>,
407    rx: mpsc::Receiver<CountdownCommand>,
408}
409
410impl CountdownTask {
411    async fn run(&mut self) {
412        // 30 years. See `Instant::far_future`.
413        let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
414
415        // Make sure the alive countdown is not gonna happen before heartbeat task is started (the
416        // "start countdown" command will be sent from heartbeat task).
417        let countdown = tokio::time::sleep_until(far_future);
418        tokio::pin!(countdown);
419        let region_id = self.region_id;
420
421        let mut started = false;
422        loop {
423            tokio::select! {
424                command = self.rx.recv() => {
425                    match command {
426                        Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
427                            if !started {
428                                // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
429                                // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
430                                // network or other jitters during startup.
431                                let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
432                                countdown.set(tokio::time::sleep_until(first_deadline));
433                                started = true;
434                            }
435                        },
436                        Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
437                            let prev_role_leader = self.region_server.is_region_leader(self.region_id).unwrap_or(false);
438                            if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
439                                if err.status_code() == StatusCode::RegionNotFound {
440                                    // Table metadata in metasrv is deleted after its regions are dropped.
441                                    // The datanode may still receive lease renewal responses that depend on the metadata
442                                    // during the short period before it is removed.
443                                    warn!(err; "Failed to set region role to {role} for region {region_id}");
444                                } else {
445                                    error!(err; "Failed to set region role to {role} for region {region_id}");
446                                }
447
448                            // Finalize leadership: persist backfilled metadata.
449                            } else if !prev_role_leader && role == RegionRole::Leader
450                                && let Err(err) = self
451                                    .region_server
452                                    .set_region_role_state_gracefully(self.region_id, SettableRegionRoleState::Leader)
453                                    .await
454                            {
455                                error!(err; "Failed to set region role state gracefully to {role} for region {region_id}");
456                            }
457
458                            if let Some(ext_handler) = self.handler_ext.as_ref() {
459                                ext_handler.reset_deadline(
460                                    &self.region_server,
461                                    self.region_id,
462                                    role,
463                                    deadline,
464                                    extension_info,
465                                ).await;
466                            }
467                            trace!(
468                                "Reset deadline of region {region_id} to approximately {} seconds later.",
469                                (deadline - Instant::now()).as_secs_f32(),
470                            );
471                            countdown.set(tokio::time::sleep_until(deadline));
472                        },
473                        None => {
474                            info!(
475                                "The handle of countdown task for region {region_id}\
476                                is dropped, RegionAliveKeeper out."
477                            );
478                            break;
479                        },
480                        #[cfg(test)]
481                        Some(CountdownCommand::Deadline(tx)) => {
482                            let _ = tx.send(countdown.deadline());
483                        }
484                    }
485                }
486                () = &mut countdown => {
487                    warn!("The region {region_id} lease is expired, convert region to follower.");
488                    if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
489                        error!(err; "Failed to set region role to follower for region {region_id}");
490                    }
491                    // resets the countdown.
492                    let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
493                    countdown.as_mut().reset(far_future);
494                }
495            }
496        }
497    }
498}
499
500#[cfg(test)]
501mod test {
502
503    use mito2::config::MitoConfig;
504    use mito2::test_util::{CreateRequestBuilder, TestEnv};
505    use store_api::region_engine::RegionEngine;
506
507    use super::*;
508    use crate::tests::mock_region_server;
509
510    #[tokio::test(flavor = "multi_thread")]
511    async fn region_alive_keeper() {
512        common_telemetry::init_default_ut_logging();
513        let mut region_server = mock_region_server();
514        let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
515        let engine = engine_env.create_engine(MitoConfig::default()).await;
516        let engine = Arc::new(engine);
517        region_server.register_engine(engine.clone());
518
519        let alive_keeper = Arc::new(RegionAliveKeeper::new(
520            region_server.clone(),
521            None,
522            Duration::from_millis(100),
523        ));
524
525        let region_id = RegionId::new(1024, 1);
526        let builder = CreateRequestBuilder::new();
527        region_server
528            .handle_request(region_id, RegionRequest::Create(builder.build()))
529            .await
530            .unwrap();
531        region_server
532            .set_region_role(region_id, RegionRole::Leader)
533            .unwrap();
534
535        // Register a region before starting.
536        alive_keeper.register_region(region_id).await;
537        assert!(alive_keeper.find_handle(region_id).await.is_some());
538
539        info!("Start the keeper");
540        alive_keeper.start(None).await.unwrap();
541
542        // The started alive keeper should assign deadline to this region.
543        let deadline = alive_keeper.deadline(region_id).await.unwrap();
544        assert!(deadline >= Instant::now());
545        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
546
547        info!("Wait for lease expired");
548        // Sleep to wait lease expired.
549        tokio::time::sleep(Duration::from_millis(500)).await;
550        assert!(alive_keeper.find_handle(region_id).await.is_some());
551        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
552
553        info!("Renew the region lease");
554        // Renew lease then sleep.
555        alive_keeper
556            .renew_region_leases(
557                &[GrantedRegion {
558                    region_id: region_id.as_u64(),
559                    role: api::v1::meta::RegionRole::Leader.into(),
560                    extensions: HashMap::new(),
561                }],
562                Instant::now() + Duration::from_millis(200),
563            )
564            .await;
565        tokio::time::sleep(Duration::from_millis(100)).await;
566        assert!(alive_keeper.find_handle(region_id).await.is_some());
567        let deadline = alive_keeper.deadline(region_id).await.unwrap();
568        assert!(deadline >= Instant::now());
569        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
570
571        info!("Wait for lease expired");
572        // Sleep to wait lease expired.
573        tokio::time::sleep(Duration::from_millis(200)).await;
574        assert!(alive_keeper.find_handle(region_id).await.is_some());
575        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
576
577        let deadline = alive_keeper.deadline(region_id).await.unwrap();
578        assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
579    }
580
581    #[tokio::test(flavor = "multi_thread")]
582    async fn countdown_task() {
583        let region_server = mock_region_server();
584
585        let countdown_handle =
586            CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
587
588        // If countdown task is not started, its deadline is set to far future.
589        assert!(
590            countdown_handle.deadline().await.unwrap()
591                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
592        );
593
594        // The first deadline should be set to 4 * heartbeat_interval_millis.
595        // We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test.
596        let heartbeat_interval_millis = 100;
597        countdown_handle.start(heartbeat_interval_millis).await;
598        assert!(
599            countdown_handle.deadline().await.unwrap()
600                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
601        );
602        tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
603
604        // No effect.
605        countdown_handle.start(heartbeat_interval_millis).await;
606        assert!(
607            countdown_handle.deadline().await.unwrap()
608                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
609        );
610
611        // Reset deadline.
612        countdown_handle
613            .reset_deadline(
614                RegionRole::Leader,
615                Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
616                HashMap::new(),
617            )
618            .await;
619        assert!(
620            countdown_handle.deadline().await.unwrap()
621                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
622        );
623    }
624}