datanode/
alive_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::RegionRole;
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{mpsc, Mutex};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
43///
44/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached,
45/// the status of region be set to "readonly", ensures there is no side-effect in the entity system.
46///
47/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever
48/// via heartbeat. If the meta server decides some region could be resided in this Datanode,
49/// it will renew the lease of region, a deadline of [CountdownTask] will be reset.
50pub struct RegionAliveKeeper {
51    region_server: RegionServer,
52    tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53    heartbeat_interval_millis: u64,
54    started: Arc<AtomicBool>,
55
56    /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
57    /// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically
58    /// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
59    /// duration acts like an "invariant point" for region's keep alive lease.
60    epoch: Instant,
61
62    countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66    /// Returns an empty [RegionAliveKeeper].
67    pub fn new(
68        region_server: RegionServer,
69        countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70        heartbeat_interval_millis: u64,
71    ) -> Self {
72        Self {
73            region_server,
74            tasks: Arc::new(Mutex::new(HashMap::new())),
75            heartbeat_interval_millis,
76            started: Arc::new(AtomicBool::new(false)),
77            epoch: Instant::now(),
78            countdown_task_handler_ext,
79        }
80    }
81
82    async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
83        self.tasks.lock().await.get(&region_id).cloned()
84    }
85
86    /// Add the countdown task for a specific region.
87    /// It will be ignored if the task exists.
88    pub async fn register_region(&self, region_id: RegionId) {
89        let handle = Arc::new(CountdownTaskHandle::new(
90            self.region_server.clone(),
91            self.countdown_task_handler_ext.clone(),
92            region_id,
93        ));
94
95        let should_start = {
96            let mut handles = self.tasks.lock().await;
97
98            // Check if already exists, return early if so
99            if handles.contains_key(&region_id) {
100                return;
101            }
102
103            // Insert new handle
104            handles.insert(region_id, handle.clone());
105
106            // Return whether we should start (check state inside lock)
107            self.started.load(Ordering::Relaxed)
108        };
109
110        if should_start {
111            handle.start(self.heartbeat_interval_millis).await;
112            info!("Region alive countdown for region {region_id} is started!");
113        } else {
114            info!(
115                "Region alive countdown for region {region_id} is registered but not started yet!"
116            );
117        }
118    }
119
120    /// Removes the countdown task for a specific region.
121    pub async fn deregister_region(&self, region_id: RegionId) {
122        if self.tasks.lock().await.remove(&region_id).is_some() {
123            info!("Deregister alive countdown for region {region_id}")
124        }
125    }
126
127    /// Renews the lease of regions to `deadline`.
128    async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
129        for region in regions {
130            let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
131            if let Some(handle) = self.find_handle(region_id).await {
132                handle
133                    .reset_deadline(role, deadline, region.extensions.clone())
134                    .await;
135            } else {
136                warn!(
137                    "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
138                );
139                // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
140            }
141        }
142    }
143
144    async fn close_staled_region(&self, region_id: RegionId) {
145        info!("Closing staled region: {region_id}");
146        let request = RegionRequest::Close(RegionCloseRequest {});
147        if let Err(e) = self.region_server.handle_request(region_id, request).await {
148            if e.status_code() != StatusCode::RegionNotFound {
149                let _ = self
150                    .region_server
151                    .set_region_role(region_id, RegionRole::Follower);
152                error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
153            }
154        }
155    }
156
157    /// Closes staled regions.
158    async fn close_staled_regions(&self, regions: &[u64]) {
159        for region_id in regions {
160            self.close_staled_region(RegionId::from_u64(*region_id))
161                .await;
162        }
163    }
164
165    #[cfg(test)]
166    async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
167        let mut deadline = None;
168        if let Some(handle) = self.find_handle(region_id).await {
169            let (s, r) = oneshot::channel();
170            if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
171                deadline = r.await.ok()
172            }
173        }
174        deadline
175    }
176
177    pub async fn start(
178        self: &Arc<Self>,
179        event_receiver: Option<RegionServerEventReceiver>,
180    ) -> Result<()> {
181        self.started.store(true, Ordering::Relaxed);
182
183        if let Some(mut event_receiver) = event_receiver {
184            let keeper = self.clone();
185            // Initializers region alive keeper.
186            // It makes sure all opened regions are registered to `RegionAliveKeeper.`
187            loop {
188                match event_receiver.0.try_recv() {
189                    Ok(RegionServerEvent::Registered(region_id)) => {
190                        keeper.register_region(region_id).await;
191                    }
192                    Ok(RegionServerEvent::Deregistered(region_id)) => {
193                        keeper.deregister_region(region_id).await;
194                    }
195                    Err(mpsc::error::TryRecvError::Disconnected) => {
196                        return error::UnexpectedSnafu {
197                            violated: "RegionServerEventSender closed",
198                        }
199                        .fail()
200                    }
201                    Err(mpsc::error::TryRecvError::Empty) => {
202                        break;
203                    }
204                }
205            }
206            let running = self.started.clone();
207
208            // Watches changes
209            common_runtime::spawn_global(async move {
210                loop {
211                    if !running.load(Ordering::Relaxed) {
212                        info!("RegionAliveKeeper stopped! Quits the watch loop!");
213                        break;
214                    }
215
216                    match event_receiver.0.recv().await {
217                        Some(RegionServerEvent::Registered(region_id)) => {
218                            keeper.register_region(region_id).await;
219                        }
220                        Some(RegionServerEvent::Deregistered(region_id)) => {
221                            keeper.deregister_region(region_id).await;
222                        }
223                        None => {
224                            info!("RegionServerEventSender closed! Quits the watch loop!");
225                            break;
226                        }
227                    }
228                }
229            });
230        }
231
232        let tasks = self.tasks.lock().await;
233        for task in tasks.values() {
234            task.start(self.heartbeat_interval_millis).await;
235        }
236
237        info!(
238            "RegionAliveKeeper is started with region {:?}",
239            tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
240        );
241
242        Ok(())
243    }
244
245    pub fn epoch(&self) -> Instant {
246        self.epoch
247    }
248}
249
250#[async_trait]
251impl HeartbeatResponseHandler for RegionAliveKeeper {
252    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
253        ctx.response.region_lease.is_some()
254    }
255
256    async fn handle(
257        &self,
258        ctx: &mut HeartbeatResponseHandlerContext,
259    ) -> common_meta::error::Result<HandleControl> {
260        let region_lease = ctx
261            .response
262            .region_lease
263            .as_ref()
264            .context(InvalidProtoMsgSnafu {
265                err_msg: "'region_lease' is missing in heartbeat response",
266            })?;
267        let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
268        let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
269
270        self.renew_region_leases(&region_lease.regions, deadline)
271            .await;
272        self.close_staled_regions(&region_lease.closeable_region_ids)
273            .await;
274
275        Ok(HandleControl::Continue)
276    }
277}
278
279#[derive(Debug)]
280enum CountdownCommand {
281    /// Start this countdown task. The first deadline will be set to
282    /// 4 * `heartbeat_interval_millis`
283    Start(u64),
284    /// Reset countdown deadline to the given instance.
285    /// (NextRole, Deadline, ExtensionInfo)
286    Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
287    /// Returns the current deadline of the countdown task.
288    #[cfg(test)]
289    Deadline(oneshot::Sender<Instant>),
290}
291
292pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
293
294/// Extension trait for [CountdownTaskHandlerExt] to reset deadline of a region.
295#[async_trait]
296pub trait CountdownTaskHandlerExt: Send + Sync {
297    async fn reset_deadline(
298        &self,
299        region_server: &RegionServer,
300        region_id: RegionId,
301        role: RegionRole,
302        deadline: Instant,
303        extension_info: HashMap<String, Vec<u8>>,
304    );
305}
306
307struct CountdownTaskHandle {
308    tx: mpsc::Sender<CountdownCommand>,
309    handler: JoinHandle<()>,
310    region_id: RegionId,
311}
312
313impl CountdownTaskHandle {
314    /// Creates a new [CountdownTaskHandle] and starts the countdown task.
315    fn new(
316        region_server: RegionServer,
317        handler_ext: Option<CountdownTaskHandlerExtRef>,
318        region_id: RegionId,
319    ) -> Self {
320        let (tx, rx) = mpsc::channel(1024);
321
322        let mut countdown_task = CountdownTask {
323            region_server,
324            handler_ext,
325            region_id,
326            rx,
327        };
328        let handler = common_runtime::spawn_hb(async move {
329            countdown_task.run().await;
330        });
331
332        Self {
333            tx,
334            handler,
335            region_id,
336        }
337    }
338
339    /// Starts the [CountdownTask],
340    /// it will be ignored if the task started.
341    async fn start(&self, heartbeat_interval_millis: u64) {
342        if let Err(e) = self
343            .tx
344            .send(CountdownCommand::Start(heartbeat_interval_millis))
345            .await
346        {
347            warn!(
348                "Failed to start region alive keeper countdown: {e}. \
349                Maybe the task is stopped due to region been closed."
350            );
351        }
352    }
353
354    #[cfg(test)]
355    async fn deadline(&self) -> Option<Instant> {
356        let (tx, rx) = oneshot::channel();
357        if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
358            return rx.await.ok();
359        }
360        None
361    }
362
363    async fn reset_deadline(
364        &self,
365        role: RegionRole,
366        deadline: Instant,
367        extension_info: HashMap<String, Vec<u8>>,
368    ) {
369        if let Err(e) = self
370            .tx
371            .send(CountdownCommand::Reset((role, deadline, extension_info)))
372            .await
373        {
374            warn!(
375                "Failed to reset region alive keeper deadline: {e}. \
376                Maybe the task is stopped due to region been closed."
377            );
378        }
379    }
380}
381
382impl Drop for CountdownTaskHandle {
383    fn drop(&mut self) {
384        debug!(
385            "Aborting region alive countdown task for region {}",
386            self.region_id
387        );
388        self.handler.abort();
389    }
390}
391
392struct CountdownTask {
393    region_server: RegionServer,
394    region_id: RegionId,
395    handler_ext: Option<CountdownTaskHandlerExtRef>,
396    rx: mpsc::Receiver<CountdownCommand>,
397}
398
399impl CountdownTask {
400    async fn run(&mut self) {
401        // 30 years. See `Instant::far_future`.
402        let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
403
404        // Make sure the alive countdown is not gonna happen before heartbeat task is started (the
405        // "start countdown" command will be sent from heartbeat task).
406        let countdown = tokio::time::sleep_until(far_future);
407        tokio::pin!(countdown);
408        let region_id = self.region_id;
409
410        let mut started = false;
411        loop {
412            tokio::select! {
413                command = self.rx.recv() => {
414                    match command {
415                        Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
416                            if !started {
417                                // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
418                                // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
419                                // network or other jitters during startup.
420                                let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
421                                countdown.set(tokio::time::sleep_until(first_deadline));
422                                started = true;
423                            }
424                        },
425                        Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
426                            if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
427                                if err.status_code() == StatusCode::RegionNotFound {
428                                    // Table metadata in metasrv is deleted after its regions are dropped.
429                                    // The datanode may still receive lease renewal responses that depend on the metadata
430                                    // during the short period before it is removed.
431                                    warn!(err; "Failed to set region role to {role} for region {region_id}");
432                                }else{
433                                    error!(err; "Failed to set region role to {role} for region {region_id}");
434                                }
435
436                            }
437                            if let Some(ext_handler) = self.handler_ext.as_ref() {
438                                ext_handler.reset_deadline(
439                                    &self.region_server,
440                                    self.region_id,
441                                    role,
442                                    deadline,
443                                    extension_info,
444                                ).await;
445                            }
446                            trace!(
447                                "Reset deadline of region {region_id} to approximately {} seconds later.",
448                                (deadline - Instant::now()).as_secs_f32(),
449                            );
450                            countdown.set(tokio::time::sleep_until(deadline));
451                        },
452                        None => {
453                            info!(
454                                "The handle of countdown task for region {region_id}\
455                                is dropped, RegionAliveKeeper out."
456                            );
457                            break;
458                        },
459                        #[cfg(test)]
460                        Some(CountdownCommand::Deadline(tx)) => {
461                            let _ = tx.send(countdown.deadline());
462                        }
463                    }
464                }
465                () = &mut countdown => {
466                    warn!("The region {region_id} lease is expired, convert region to follower.");
467                    if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
468                        error!(err; "Failed to set region role to follower for region {region_id}");
469                    }
470                    // resets the countdown.
471                    let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
472                    countdown.as_mut().reset(far_future);
473                }
474            }
475        }
476    }
477}
478
479#[cfg(test)]
480mod test {
481
482    use mito2::config::MitoConfig;
483    use mito2::test_util::{CreateRequestBuilder, TestEnv};
484    use store_api::region_engine::RegionEngine;
485
486    use super::*;
487    use crate::tests::mock_region_server;
488
489    #[tokio::test(flavor = "multi_thread")]
490    async fn region_alive_keeper() {
491        common_telemetry::init_default_ut_logging();
492        let mut region_server = mock_region_server();
493        let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
494        let engine = engine_env.create_engine(MitoConfig::default()).await;
495        let engine = Arc::new(engine);
496        region_server.register_engine(engine.clone());
497
498        let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
499
500        let region_id = RegionId::new(1024, 1);
501        let builder = CreateRequestBuilder::new();
502        region_server
503            .handle_request(region_id, RegionRequest::Create(builder.build()))
504            .await
505            .unwrap();
506        region_server
507            .set_region_role(region_id, RegionRole::Leader)
508            .unwrap();
509
510        // Register a region before starting.
511        alive_keeper.register_region(region_id).await;
512        assert!(alive_keeper.find_handle(region_id).await.is_some());
513
514        info!("Start the keeper");
515        alive_keeper.start(None).await.unwrap();
516
517        // The started alive keeper should assign deadline to this region.
518        let deadline = alive_keeper.deadline(region_id).await.unwrap();
519        assert!(deadline >= Instant::now());
520        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
521
522        info!("Wait for lease expired");
523        // Sleep to wait lease expired.
524        tokio::time::sleep(Duration::from_millis(500)).await;
525        assert!(alive_keeper.find_handle(region_id).await.is_some());
526        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
527
528        info!("Renew the region lease");
529        // Renew lease then sleep.
530        alive_keeper
531            .renew_region_leases(
532                &[GrantedRegion {
533                    region_id: region_id.as_u64(),
534                    role: api::v1::meta::RegionRole::Leader.into(),
535                    extensions: HashMap::new(),
536                }],
537                Instant::now() + Duration::from_millis(200),
538            )
539            .await;
540        tokio::time::sleep(Duration::from_millis(100)).await;
541        assert!(alive_keeper.find_handle(region_id).await.is_some());
542        let deadline = alive_keeper.deadline(region_id).await.unwrap();
543        assert!(deadline >= Instant::now());
544        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
545
546        info!("Wait for lease expired");
547        // Sleep to wait lease expired.
548        tokio::time::sleep(Duration::from_millis(200)).await;
549        assert!(alive_keeper.find_handle(region_id).await.is_some());
550        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
551
552        let deadline = alive_keeper.deadline(region_id).await.unwrap();
553        assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
554    }
555
556    #[tokio::test(flavor = "multi_thread")]
557    async fn countdown_task() {
558        let region_server = mock_region_server();
559
560        let countdown_handle =
561            CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
562
563        // If countdown task is not started, its deadline is set to far future.
564        assert!(
565            countdown_handle.deadline().await.unwrap()
566                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
567        );
568
569        // The first deadline should be set to 4 * heartbeat_interval_millis.
570        // We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test.
571        let heartbeat_interval_millis = 100;
572        countdown_handle.start(heartbeat_interval_millis).await;
573        assert!(
574            countdown_handle.deadline().await.unwrap()
575                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
576        );
577        tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
578
579        // No effect.
580        countdown_handle.start(heartbeat_interval_millis).await;
581        assert!(
582            countdown_handle.deadline().await.unwrap()
583                > Instant::now() + Duration::from_secs(86400 * 365 * 29)
584        );
585
586        // Reset deadline.
587        countdown_handle
588            .reset_deadline(
589                RegionRole::Leader,
590                Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
591                HashMap::new(),
592            )
593            .await;
594        assert!(
595            countdown_handle.deadline().await.unwrap()
596                > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
597        );
598    }
599}