1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::{RegionRole, SettableRegionRoleState};
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{Mutex, mpsc};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42pub struct RegionAliveKeeper {
51 region_server: RegionServer,
52 tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53 heartbeat_interval_millis: Arc<AtomicU64>,
54 started: Arc<AtomicBool>,
55
56 epoch: Instant,
61
62 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66 pub fn new(
68 region_server: RegionServer,
69 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70 heartbeat_interval: Duration,
71 ) -> Self {
72 Self {
73 region_server,
74 tasks: Arc::new(Mutex::new(HashMap::new())),
75 heartbeat_interval_millis: Arc::new(AtomicU64::new(
76 heartbeat_interval.as_millis() as u64
77 )),
78 started: Arc::new(AtomicBool::new(false)),
79 epoch: Instant::now(),
80 countdown_task_handler_ext,
81 }
82 }
83
84 pub fn update_heartbeat_interval(&self, heartbeat_interval_millis: u64) {
86 self.heartbeat_interval_millis
87 .store(heartbeat_interval_millis, Ordering::Relaxed);
88 }
89
90 async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
91 self.tasks.lock().await.get(®ion_id).cloned()
92 }
93
94 pub async fn register_region(&self, region_id: RegionId) {
97 let handle = Arc::new(CountdownTaskHandle::new(
98 self.region_server.clone(),
99 self.countdown_task_handler_ext.clone(),
100 region_id,
101 ));
102
103 let should_start = {
104 let mut handles = self.tasks.lock().await;
105
106 if handles.contains_key(®ion_id) {
108 return;
109 }
110
111 handles.insert(region_id, handle.clone());
113
114 self.started.load(Ordering::Relaxed)
116 };
117
118 if should_start {
119 handle
120 .start(self.heartbeat_interval_millis.load(Ordering::Relaxed))
121 .await;
122 info!("Region alive countdown for region {region_id} is started!");
123 } else {
124 info!(
125 "Region alive countdown for region {region_id} is registered but not started yet!"
126 );
127 }
128 }
129
130 pub async fn deregister_region(&self, region_id: RegionId) {
132 if self.tasks.lock().await.remove(®ion_id).is_some() {
133 info!("Deregister alive countdown for region {region_id}")
134 }
135 }
136
137 async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
139 for region in regions {
140 let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
141 if let Some(handle) = self.find_handle(region_id).await {
142 handle
143 .reset_deadline(role, deadline, region.extensions.clone())
144 .await;
145 } else {
146 warn!(
147 "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
148 );
149 }
151 }
152 }
153
154 async fn close_staled_region(&self, region_id: RegionId) {
155 info!("Closing staled region: {region_id}");
156 let request = RegionRequest::Close(RegionCloseRequest {});
157 if let Err(e) = self.region_server.handle_request(region_id, request).await
158 && e.status_code() != StatusCode::RegionNotFound
159 {
160 let _ = self
161 .region_server
162 .set_region_role(region_id, RegionRole::Follower);
163 error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
164 }
165 }
166
167 async fn close_staled_regions(&self, regions: &[u64]) {
169 for region_id in regions {
170 self.close_staled_region(RegionId::from_u64(*region_id))
171 .await;
172 }
173 }
174
175 #[cfg(test)]
176 async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
177 let mut deadline = None;
178 if let Some(handle) = self.find_handle(region_id).await {
179 let (s, r) = oneshot::channel();
180 if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
181 deadline = r.await.ok()
182 }
183 }
184 deadline
185 }
186
187 pub async fn start(
188 self: &Arc<Self>,
189 event_receiver: Option<RegionServerEventReceiver>,
190 ) -> Result<()> {
191 self.started.store(true, Ordering::Relaxed);
192
193 if let Some(mut event_receiver) = event_receiver {
194 let keeper = self.clone();
195 loop {
198 match event_receiver.0.try_recv() {
199 Ok(RegionServerEvent::Registered(region_id)) => {
200 keeper.register_region(region_id).await;
201 }
202 Ok(RegionServerEvent::Deregistered(region_id)) => {
203 keeper.deregister_region(region_id).await;
204 }
205 Err(mpsc::error::TryRecvError::Disconnected) => {
206 return error::UnexpectedSnafu {
207 violated: "RegionServerEventSender closed",
208 }
209 .fail();
210 }
211 Err(mpsc::error::TryRecvError::Empty) => {
212 break;
213 }
214 }
215 }
216 let running = self.started.clone();
217
218 common_runtime::spawn_global(async move {
220 loop {
221 if !running.load(Ordering::Relaxed) {
222 info!("RegionAliveKeeper stopped! Quits the watch loop!");
223 break;
224 }
225
226 match event_receiver.0.recv().await {
227 Some(RegionServerEvent::Registered(region_id)) => {
228 keeper.register_region(region_id).await;
229 }
230 Some(RegionServerEvent::Deregistered(region_id)) => {
231 keeper.deregister_region(region_id).await;
232 }
233 None => {
234 info!("RegionServerEventSender closed! Quits the watch loop!");
235 break;
236 }
237 }
238 }
239 });
240 }
241
242 let tasks = self.tasks.lock().await;
243 let interval = self.heartbeat_interval_millis.load(Ordering::Relaxed);
244 for task in tasks.values() {
245 task.start(interval).await;
246 }
247
248 info!(
249 "RegionAliveKeeper is started with region {:?}",
250 tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
251 );
252
253 Ok(())
254 }
255
256 pub fn epoch(&self) -> Instant {
257 self.epoch
258 }
259}
260
261#[async_trait]
262impl HeartbeatResponseHandler for RegionAliveKeeper {
263 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
264 ctx.response.region_lease.is_some()
265 }
266
267 async fn handle(
268 &self,
269 ctx: &mut HeartbeatResponseHandlerContext,
270 ) -> common_meta::error::Result<HandleControl> {
271 let region_lease = ctx
272 .response
273 .region_lease
274 .as_ref()
275 .context(InvalidProtoMsgSnafu {
276 err_msg: "'region_lease' is missing in heartbeat response",
277 })?;
278 let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
279 let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
280
281 self.renew_region_leases(®ion_lease.regions, deadline)
282 .await;
283 self.close_staled_regions(®ion_lease.closeable_region_ids)
284 .await;
285
286 Ok(HandleControl::Continue)
287 }
288}
289
290#[derive(Debug)]
291enum CountdownCommand {
292 Start(u64),
295 Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
298 #[cfg(test)]
300 Deadline(oneshot::Sender<Instant>),
301}
302
303pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
304
305#[async_trait]
307pub trait CountdownTaskHandlerExt: Send + Sync {
308 async fn reset_deadline(
309 &self,
310 region_server: &RegionServer,
311 region_id: RegionId,
312 role: RegionRole,
313 deadline: Instant,
314 extension_info: HashMap<String, Vec<u8>>,
315 );
316}
317
318struct CountdownTaskHandle {
319 tx: mpsc::Sender<CountdownCommand>,
320 handler: JoinHandle<()>,
321 region_id: RegionId,
322}
323
324impl CountdownTaskHandle {
325 fn new(
327 region_server: RegionServer,
328 handler_ext: Option<CountdownTaskHandlerExtRef>,
329 region_id: RegionId,
330 ) -> Self {
331 let (tx, rx) = mpsc::channel(1024);
332
333 let mut countdown_task = CountdownTask {
334 region_server,
335 handler_ext,
336 region_id,
337 rx,
338 };
339 let handler = common_runtime::spawn_hb(async move {
340 countdown_task.run().await;
341 });
342
343 Self {
344 tx,
345 handler,
346 region_id,
347 }
348 }
349
350 async fn start(&self, heartbeat_interval_millis: u64) {
353 if let Err(e) = self
354 .tx
355 .send(CountdownCommand::Start(heartbeat_interval_millis))
356 .await
357 {
358 warn!(
359 "Failed to start region alive keeper countdown: {e}. \
360 Maybe the task is stopped due to region been closed."
361 );
362 }
363 }
364
365 #[cfg(test)]
366 async fn deadline(&self) -> Option<Instant> {
367 let (tx, rx) = oneshot::channel();
368 if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
369 return rx.await.ok();
370 }
371 None
372 }
373
374 async fn reset_deadline(
375 &self,
376 role: RegionRole,
377 deadline: Instant,
378 extension_info: HashMap<String, Vec<u8>>,
379 ) {
380 if let Err(e) = self
381 .tx
382 .send(CountdownCommand::Reset((role, deadline, extension_info)))
383 .await
384 {
385 warn!(
386 "Failed to reset region alive keeper deadline: {e}. \
387 Maybe the task is stopped due to region been closed."
388 );
389 }
390 }
391}
392
393impl Drop for CountdownTaskHandle {
394 fn drop(&mut self) {
395 debug!(
396 "Aborting region alive countdown task for region {}",
397 self.region_id
398 );
399 self.handler.abort();
400 }
401}
402
403struct CountdownTask {
404 region_server: RegionServer,
405 region_id: RegionId,
406 handler_ext: Option<CountdownTaskHandlerExtRef>,
407 rx: mpsc::Receiver<CountdownCommand>,
408}
409
410impl CountdownTask {
411 async fn run(&mut self) {
412 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
414
415 let countdown = tokio::time::sleep_until(far_future);
418 tokio::pin!(countdown);
419 let region_id = self.region_id;
420
421 let mut started = false;
422 loop {
423 tokio::select! {
424 command = self.rx.recv() => {
425 match command {
426 Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
427 if !started {
428 let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
432 countdown.set(tokio::time::sleep_until(first_deadline));
433 started = true;
434 }
435 },
436 Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
437 let prev_role_leader = self.region_server.is_region_leader(self.region_id).unwrap_or(false);
438 if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
439 if err.status_code() == StatusCode::RegionNotFound {
440 warn!(err; "Failed to set region role to {role} for region {region_id}");
444 } else {
445 error!(err; "Failed to set region role to {role} for region {region_id}");
446 }
447
448 } else if !prev_role_leader && role == RegionRole::Leader
450 && let Err(err) = self
451 .region_server
452 .set_region_role_state_gracefully(self.region_id, SettableRegionRoleState::Leader)
453 .await
454 {
455 error!(err; "Failed to set region role state gracefully to {role} for region {region_id}");
456 }
457
458 if let Some(ext_handler) = self.handler_ext.as_ref() {
459 ext_handler.reset_deadline(
460 &self.region_server,
461 self.region_id,
462 role,
463 deadline,
464 extension_info,
465 ).await;
466 }
467 trace!(
468 "Reset deadline of region {region_id} to approximately {} seconds later.",
469 (deadline - Instant::now()).as_secs_f32(),
470 );
471 countdown.set(tokio::time::sleep_until(deadline));
472 },
473 None => {
474 info!(
475 "The handle of countdown task for region {region_id}\
476 is dropped, RegionAliveKeeper out."
477 );
478 break;
479 },
480 #[cfg(test)]
481 Some(CountdownCommand::Deadline(tx)) => {
482 let _ = tx.send(countdown.deadline());
483 }
484 }
485 }
486 () = &mut countdown => {
487 warn!("The region {region_id} lease is expired, convert region to follower.");
488 if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
489 error!(err; "Failed to set region role to follower for region {region_id}");
490 }
491 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
493 countdown.as_mut().reset(far_future);
494 }
495 }
496 }
497 }
498}
499
500#[cfg(test)]
501mod test {
502
503 use mito2::config::MitoConfig;
504 use mito2::test_util::{CreateRequestBuilder, TestEnv};
505 use store_api::region_engine::RegionEngine;
506
507 use super::*;
508 use crate::tests::mock_region_server;
509
510 #[tokio::test(flavor = "multi_thread")]
511 async fn region_alive_keeper() {
512 common_telemetry::init_default_ut_logging();
513 let mut region_server = mock_region_server();
514 let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
515 let engine = engine_env.create_engine(MitoConfig::default()).await;
516 let engine = Arc::new(engine);
517 region_server.register_engine(engine.clone());
518
519 let alive_keeper = Arc::new(RegionAliveKeeper::new(
520 region_server.clone(),
521 None,
522 Duration::from_millis(100),
523 ));
524
525 let region_id = RegionId::new(1024, 1);
526 let builder = CreateRequestBuilder::new();
527 region_server
528 .handle_request(region_id, RegionRequest::Create(builder.build()))
529 .await
530 .unwrap();
531 region_server
532 .set_region_role(region_id, RegionRole::Leader)
533 .unwrap();
534
535 alive_keeper.register_region(region_id).await;
537 assert!(alive_keeper.find_handle(region_id).await.is_some());
538
539 info!("Start the keeper");
540 alive_keeper.start(None).await.unwrap();
541
542 let deadline = alive_keeper.deadline(region_id).await.unwrap();
544 assert!(deadline >= Instant::now());
545 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
546
547 info!("Wait for lease expired");
548 tokio::time::sleep(Duration::from_millis(500)).await;
550 assert!(alive_keeper.find_handle(region_id).await.is_some());
551 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
552
553 info!("Renew the region lease");
554 alive_keeper
556 .renew_region_leases(
557 &[GrantedRegion {
558 region_id: region_id.as_u64(),
559 role: api::v1::meta::RegionRole::Leader.into(),
560 extensions: HashMap::new(),
561 }],
562 Instant::now() + Duration::from_millis(200),
563 )
564 .await;
565 tokio::time::sleep(Duration::from_millis(100)).await;
566 assert!(alive_keeper.find_handle(region_id).await.is_some());
567 let deadline = alive_keeper.deadline(region_id).await.unwrap();
568 assert!(deadline >= Instant::now());
569 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
570
571 info!("Wait for lease expired");
572 tokio::time::sleep(Duration::from_millis(200)).await;
574 assert!(alive_keeper.find_handle(region_id).await.is_some());
575 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
576
577 let deadline = alive_keeper.deadline(region_id).await.unwrap();
578 assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
579 }
580
581 #[tokio::test(flavor = "multi_thread")]
582 async fn countdown_task() {
583 let region_server = mock_region_server();
584
585 let countdown_handle =
586 CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
587
588 assert!(
590 countdown_handle.deadline().await.unwrap()
591 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
592 );
593
594 let heartbeat_interval_millis = 100;
597 countdown_handle.start(heartbeat_interval_millis).await;
598 assert!(
599 countdown_handle.deadline().await.unwrap()
600 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
601 );
602 tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
603
604 countdown_handle.start(heartbeat_interval_millis).await;
606 assert!(
607 countdown_handle.deadline().await.unwrap()
608 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
609 );
610
611 countdown_handle
613 .reset_deadline(
614 RegionRole::Leader,
615 Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
616 HashMap::new(),
617 )
618 .await;
619 assert!(
620 countdown_handle.deadline().await.unwrap()
621 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
622 );
623 }
624}