datanode/
alive_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::RegionRole;
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{mpsc, Mutex};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
43///
44/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached,
45/// the status of region be set to "readonly", ensures there is no side-effect in the entity system.
46///
47/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever
48/// via heartbeat. If the meta server decides some region could be resided in this Datanode,
49/// it will renew the lease of region, a deadline of [CountdownTask] will be reset.
50pub struct RegionAliveKeeper {
51    region_server: RegionServer,
52    tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53    heartbeat_interval_millis: u64,
54    started: Arc<AtomicBool>,
55
56    /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
57    /// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically
58    /// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
59    /// duration acts like an "invariant point" for region's keep alive lease.
60    epoch: Instant,
61
62    countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66    /// Returns an empty [RegionAliveKeeper].
67    pub fn new(
68        region_server: RegionServer,
69        countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70        heartbeat_interval_millis: u64,
71    ) -> Self {
72        Self {
73            region_server,
74            tasks: Arc::new(Mutex::new(HashMap::new())),
75            heartbeat_interval_millis,
76            started: Arc::new(AtomicBool::new(false)),
77            epoch: Instant::now(),
78            countdown_task_handler_ext,
79        }
80    }
81
82    async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
83        self.tasks.lock().await.get(&region_id).cloned()
84    }
85
86    /// Add the countdown task for a specific region.
87    /// It will be ignored if the task exists.
88    pub async fn register_region(&self, region_id: RegionId) {
89        if self.find_handle(region_id).await.is_some() {
90            return;
91        }
92
93        let handle = Arc::new(CountdownTaskHandle::new(
94            self.region_server.clone(),
95            self.countdown_task_handler_ext.clone(),
96            region_id,
97        ));
98
99        let mut handles = self.tasks.lock().await;
100        let _ = handles.insert(region_id, handle.clone());
101
102        if self.started.load(Ordering::Relaxed) {
103            handle.start(self.heartbeat_interval_millis).await;
104
105            info!("Region alive countdown for region {region_id} is started!",);
106        } else {
107            info!(
108                "Region alive countdown for region {region_id} is registered but not started yet!",
109            );
110        }
111    }
112
113    /// Removes the countdown task for a specific region.
114    pub async fn deregister_region(&self, region_id: RegionId) {
115        if self.tasks.lock().await.remove(&region_id).is_some() {
116            info!("Deregister alive countdown for region {region_id}")
117        }
118    }
119
120    /// Renews the lease of regions to `deadline`.
121    async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
122        for region in regions {
123            let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
124            if let Some(handle) = self.find_handle(region_id).await {
125                handle
126                    .reset_deadline(role, deadline, region.extensions.clone())
127                    .await;
128            } else {
129                warn!(
130                    "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
131                );
132                // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
133            }
134        }
135    }
136
137    async fn close_staled_region(&self, region_id: RegionId) {
138        info!("Closing staled region: {region_id}");
139        let request = RegionRequest::Close(RegionCloseRequest {});
140        if let Err(e) = self.region_server.handle_request(region_id, request).await {
141            if e.status_code() != StatusCode::RegionNotFound {
142                let _ = self
143                    .region_server
144                    .set_region_role(region_id, RegionRole::Follower);
145                error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
146            }
147        }
148    }
149
150    /// Closes staled regions.
151    async fn close_staled_regions(&self, regions: &[u64]) {
152        for region_id in regions {
153            self.close_staled_region(RegionId::from_u64(*region_id))
154                .await;
155        }
156    }
157
158    #[cfg(test)]
159    async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
160        let mut deadline = None;
161        if let Some(handle) = self.find_handle(region_id).await {
162            let (s, r) = oneshot::channel();
163            if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
164                deadline = r.await.ok()
165            }
166        }
167        deadline
168    }
169
170    pub async fn start(
171        self: &Arc<Self>,
172        event_receiver: Option<RegionServerEventReceiver>,
173    ) -> Result<()> {
174        self.started.store(true, Ordering::Relaxed);
175
176        if let Some(mut event_receiver) = event_receiver {
177            let keeper = self.clone();
178            // Initializers region alive keeper.
179            // It makes sure all opened regions are registered to `RegionAliveKeeper.`
180            loop {
181                match event_receiver.0.try_recv() {
182                    Ok(RegionServerEvent::Registered(region_id)) => {
183                        keeper.register_region(region_id).await;
184                    }
185                    Ok(RegionServerEvent::Deregistered(region_id)) => {
186                        keeper.deregister_region(region_id).await;
187                    }
188                    Err(mpsc::error::TryRecvError::Disconnected) => {
189                        return error::UnexpectedSnafu {
190                            violated: "RegionServerEventSender closed",
191                        }
192                        .fail()
193                    }
194                    Err(mpsc::error::TryRecvError::Empty) => {
195                        break;
196                    }
197                }
198            }
199            let running = self.started.clone();
200
201            // Watches changes
202            common_runtime::spawn_global(async move {
203                loop {
204                    if !running.load(Ordering::Relaxed) {
205                        info!("RegionAliveKeeper stopped! Quits the watch loop!");
206                        break;
207                    }
208
209                    match event_receiver.0.recv().await {
210                        Some(RegionServerEvent::Registered(region_id)) => {
211                            keeper.register_region(region_id).await;
212                        }
213                        Some(RegionServerEvent::Deregistered(region_id)) => {
214                            keeper.deregister_region(region_id).await;
215                        }
216                        None => {
217                            info!("RegionServerEventSender closed! Quits the watch loop!");
218                            break;
219                        }
220                    }
221                }
222            });
223        }
224
225        let tasks = self.tasks.lock().await;
226        for task in tasks.values() {
227            task.start(self.heartbeat_interval_millis).await;
228        }
229
230        info!(
231            "RegionAliveKeeper is started with region {:?}",
232            tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
233        );
234
235        Ok(())
236    }
237
238    pub fn epoch(&self) -> Instant {
239        self.epoch
240    }
241}
242
243#[async_trait]
244impl HeartbeatResponseHandler for RegionAliveKeeper {
245    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
246        ctx.response.region_lease.is_some()
247    }
248
249    async fn handle(
250        &self,
251        ctx: &mut HeartbeatResponseHandlerContext,
252    ) -> common_meta::error::Result<HandleControl> {
253        let region_lease = ctx
254            .response
255            .region_lease
256            .as_ref()
257            .context(InvalidProtoMsgSnafu {
258                err_msg: "'region_lease' is missing in heartbeat response",
259            })?;
260        let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
261        let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
262
263        self.renew_region_leases(&region_lease.regions, deadline)
264            .await;
265        self.close_staled_regions(&region_lease.closeable_region_ids)
266            .await;
267
268        Ok(HandleControl::Continue)
269    }
270}
271
272#[derive(Debug)]
273enum CountdownCommand {
274    /// Start this countdown task. The first deadline will be set to
275    /// 4 * `heartbeat_interval_millis`
276    Start(u64),
277    /// Reset countdown deadline to the given instance.
278    /// (NextRole, Deadline, ExtensionInfo)
279    Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
280    /// Returns the current deadline of the countdown task.
281    #[cfg(test)]
282    Deadline(oneshot::Sender<Instant>),
283}
284
285pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
286
287/// Extension trait for [CountdownTaskHandlerExt] to reset deadline of a region.
288#[async_trait]
289pub trait CountdownTaskHandlerExt: Send + Sync {
290    async fn reset_deadline(
291        &self,
292        region_server: &RegionServer,
293        region_id: RegionId,
294        role: RegionRole,
295        deadline: Instant,
296        extension_info: HashMap<String, Vec<u8>>,
297    );
298}
299
300struct CountdownTaskHandle {
301    tx: mpsc::Sender<CountdownCommand>,
302    handler: JoinHandle<()>,
303    region_id: RegionId,
304}
305
306impl CountdownTaskHandle {
307    /// Creates a new [CountdownTaskHandle] and starts the countdown task.
308    fn new(
309        region_server: RegionServer,
310        handler_ext: Option<CountdownTaskHandlerExtRef>,
311        region_id: RegionId,
312    ) -> Self {
313        let (tx, rx) = mpsc::channel(1024);
314
315        let mut countdown_task = CountdownTask {
316            region_server,
317            handler_ext,
318            region_id,
319            rx,
320        };
321        let handler = common_runtime::spawn_hb(async move {
322            countdown_task.run().await;
323        });
324
325        Self {
326            tx,
327            handler,
328            region_id,
329        }
330    }
331
332    /// Starts the [CountdownTask],
333    /// it will be ignored if the task started.
334    async fn start(&self, heartbeat_interval_millis: u64) {
335        if let Err(e) = self
336            .tx
337            .send(CountdownCommand::Start(heartbeat_interval_millis))
338            .await
339        {
340            warn!(
341                "Failed to start region alive keeper countdown: {e}. \
342                Maybe the task is stopped due to region been closed."
343            );
344        }
345    }
346
347    #[cfg(test)]
348    async fn deadline(&self) -> Option<Instant> {
349        let (tx, rx) = oneshot::channel();
350        if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
351            return rx.await.ok();
352        }
353        None
354    }
355
356    async fn reset_deadline(
357        &self,
358        role: RegionRole,
359        deadline: Instant,
360        extension_info: HashMap<String, Vec<u8>>,
361    ) {
362        if let Err(e) = self
363            .tx
364            .send(CountdownCommand::Reset((role, deadline, extension_info)))
365            .await
366        {
367            warn!(
368                "Failed to reset region alive keeper deadline: {e}. \
369                Maybe the task is stopped due to region been closed."
370            );
371        }
372    }
373}
374
375impl Drop for CountdownTaskHandle {
376    fn drop(&mut self) {
377        debug!(
378            "Aborting region alive countdown task for region {}",
379            self.region_id
380        );
381        self.handler.abort();
382    }
383}
384
385struct CountdownTask {
386    region_server: RegionServer,
387    region_id: RegionId,
388    handler_ext: Option<CountdownTaskHandlerExtRef>,
389    rx: mpsc::Receiver<CountdownCommand>,
390}
391
392impl CountdownTask {
393    async fn run(&mut self) {
394        // 30 years. See `Instant::far_future`.
395        let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
396
397        // Make sure the alive countdown is not gonna happen before heartbeat task is started (the
398        // "start countdown" command will be sent from heartbeat task).
399        let countdown = tokio::time::sleep_until(far_future);
400        tokio::pin!(countdown);
401        let region_id = self.region_id;
402
403        let mut started = false;
404        loop {
405            tokio::select! {
406                command = self.rx.recv() => {
407                    match command {
408                        Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
409                            if !started {
410                                // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
411                                // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
412                                // network or other jitters during startup.
413                                let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
414                                countdown.set(tokio::time::sleep_until(first_deadline));
415                                started = true;
416                            }
417                        },
418                        Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
419                            if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
420                                error!(err; "Failed to set region role to {role} for region {region_id}");
421                            }
422                            if let Some(ext_handler) = self.handler_ext.as_ref() {
423                                ext_handler.reset_deadline(
424                                    &self.region_server,
425                                    self.region_id,
426                                    role,
427                                    deadline,
428                                    extension_info,
429                                ).await;
430                            }
431                            trace!(
432                                "Reset deadline of region {region_id} to approximately {} seconds later.",
433                                (deadline - Instant::now()).as_secs_f32(),
434                            );
435                            countdown.set(tokio::time::sleep_until(deadline));
436                        },
437                        None => {
438                            info!(
439                                "The handle of countdown task for region {region_id}\
440                                is dropped, RegionAliveKeeper out."
441                            );
442                            break;
443                        },
444                        #[cfg(test)]
445                        Some(CountdownCommand::Deadline(tx)) => {
446                            let _ = tx.send(countdown.deadline());
447                        }
448                    }
449                }
450                () = &mut countdown => {
451                    warn!("The region {region_id} lease is expired, convert region to follower.");
452                    if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
453                        error!(err; "Failed to set region role to follower for region {region_id}");
454                    }
455                    // resets the countdown.
456                    let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
457                    countdown.as_mut().reset(far_future);
458                }
459            }
460        }
461    }
462}
463
464#[cfg(test)]
465mod test {
466
467    use mito2::config::MitoConfig;
468    use mito2::test_util::{CreateRequestBuilder, TestEnv};
469    use store_api::region_engine::RegionEngine;
470
471    use super::*;
472    use crate::tests::mock_region_server;
473
474    #[tokio::test(flavor = "multi_thread")]
475    async fn region_alive_keeper() {
476        common_telemetry::init_default_ut_logging();
477        let mut region_server = mock_region_server();
478        let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
479        let engine = engine_env.create_engine(MitoConfig::default()).await;
480        let engine = Arc::new(engine);
481        region_server.register_engine(engine.clone());
482
483        let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
484
485        let region_id = RegionId::new(1024, 1);
486        let builder = CreateRequestBuilder::new();
487        region_server
488            .handle_request(region_id, RegionRequest::Create(builder.build()))
489            .await
490            .unwrap();
491        region_server
492            .set_region_role(region_id, RegionRole::Leader)
493            .unwrap();
494
495        // Register a region before starting.
496        alive_keeper.register_region(region_id).await;
497        assert!(alive_keeper.find_handle(region_id).await.is_some());
498
499        info!("Start the keeper");
500        alive_keeper.start(None).await.unwrap();
501
502        // The started alive keeper should assign deadline to this region.
503        let deadline = alive_keeper.deadline(region_id).await.unwrap();
504        assert!(deadline >= Instant::now());
505        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
506
507        info!("Wait for lease expired");
508        // Sleep to wait lease expired.
509        tokio::time::sleep(Duration::from_millis(500)).await;
510        assert!(alive_keeper.find_handle(region_id).await.is_some());
511        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
512
513        info!("Renew the region lease");
514        // Renew lease then sleep.
515        alive_keeper
516            .renew_region_leases(
517                &[GrantedRegion {
518                    region_id: region_id.as_u64(),
519                    role: api::v1::meta::RegionRole::Leader.into(),
520                    extensions: HashMap::new(),
521                }],
522                Instant::now() + Duration::from_millis(200),
523            )
524            .await;
525        tokio::time::sleep(Duration::from_millis(100)).await;
526        assert!(alive_keeper.find_handle(region_id).await.is_some());
527        let deadline = alive_keeper.deadline(region_id).await.unwrap();
528        assert!(deadline >= Instant::now());
529        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
530
531        info!("Wait for lease expired");
532        // Sleep to wait lease expired.
533        tokio::time::sleep(Duration::from_millis(200)).await;
534        assert!(alive_keeper.find_handle(region_id).await.is_some());
535        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
536
537        let deadline = alive_keeper.deadline(region_id).await.unwrap();
538        assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
539    }
540
541    #[tokio::test(flavor = "multi_thread")]
542    async fn countdown_task() {
543        let region_server = mock_region_server();
544
545        let countdown_handle =
546            CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
547
548        // If countdown task is not started, its deadline is set to far future.
549        assert!(
550            countdown_handle.deadline().await.unwrap()
551                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
552        );
553
554        // The first deadline should be set to 4 * heartbeat_interval_millis.
555        // We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test.
556        let heartbeat_interval_millis = 100;
557        countdown_handle.start(heartbeat_interval_millis).await;
558        assert!(
559            countdown_handle.deadline().await.unwrap()
560                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
561        );
562        tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
563
564        // No effect.
565        countdown_handle.start(heartbeat_interval_millis).await;
566        assert!(
567            countdown_handle.deadline().await.unwrap()
568                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
569        );
570
571        // Reset deadline.
572        countdown_handle
573            .reset_deadline(
574                RegionRole::Leader,
575                Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
576                HashMap::new(),
577            )
578            .await;
579        assert!(
580            countdown_handle.deadline().await.unwrap()
581                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
582        );
583    }
584}