1use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::RegionRole;
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{mpsc, Mutex};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42pub struct RegionAliveKeeper {
51 region_server: RegionServer,
52 tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53 heartbeat_interval_millis: u64,
54 started: Arc<AtomicBool>,
55
56 epoch: Instant,
61
62 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66 pub fn new(
68 region_server: RegionServer,
69 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70 heartbeat_interval_millis: u64,
71 ) -> Self {
72 Self {
73 region_server,
74 tasks: Arc::new(Mutex::new(HashMap::new())),
75 heartbeat_interval_millis,
76 started: Arc::new(AtomicBool::new(false)),
77 epoch: Instant::now(),
78 countdown_task_handler_ext,
79 }
80 }
81
82 async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
83 self.tasks.lock().await.get(®ion_id).cloned()
84 }
85
86 pub async fn register_region(&self, region_id: RegionId) {
89 let handle = Arc::new(CountdownTaskHandle::new(
90 self.region_server.clone(),
91 self.countdown_task_handler_ext.clone(),
92 region_id,
93 ));
94
95 let should_start = {
96 let mut handles = self.tasks.lock().await;
97
98 if handles.contains_key(®ion_id) {
100 return;
101 }
102
103 handles.insert(region_id, handle.clone());
105
106 self.started.load(Ordering::Relaxed)
108 };
109
110 if should_start {
111 handle.start(self.heartbeat_interval_millis).await;
112 info!("Region alive countdown for region {region_id} is started!");
113 } else {
114 info!(
115 "Region alive countdown for region {region_id} is registered but not started yet!"
116 );
117 }
118 }
119
120 pub async fn deregister_region(&self, region_id: RegionId) {
122 if self.tasks.lock().await.remove(®ion_id).is_some() {
123 info!("Deregister alive countdown for region {region_id}")
124 }
125 }
126
127 async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
129 for region in regions {
130 let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
131 if let Some(handle) = self.find_handle(region_id).await {
132 handle
133 .reset_deadline(role, deadline, region.extensions.clone())
134 .await;
135 } else {
136 warn!(
137 "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
138 );
139 }
141 }
142 }
143
144 async fn close_staled_region(&self, region_id: RegionId) {
145 info!("Closing staled region: {region_id}");
146 let request = RegionRequest::Close(RegionCloseRequest {});
147 if let Err(e) = self.region_server.handle_request(region_id, request).await {
148 if e.status_code() != StatusCode::RegionNotFound {
149 let _ = self
150 .region_server
151 .set_region_role(region_id, RegionRole::Follower);
152 error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
153 }
154 }
155 }
156
157 async fn close_staled_regions(&self, regions: &[u64]) {
159 for region_id in regions {
160 self.close_staled_region(RegionId::from_u64(*region_id))
161 .await;
162 }
163 }
164
165 #[cfg(test)]
166 async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
167 let mut deadline = None;
168 if let Some(handle) = self.find_handle(region_id).await {
169 let (s, r) = oneshot::channel();
170 if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
171 deadline = r.await.ok()
172 }
173 }
174 deadline
175 }
176
177 pub async fn start(
178 self: &Arc<Self>,
179 event_receiver: Option<RegionServerEventReceiver>,
180 ) -> Result<()> {
181 self.started.store(true, Ordering::Relaxed);
182
183 if let Some(mut event_receiver) = event_receiver {
184 let keeper = self.clone();
185 loop {
188 match event_receiver.0.try_recv() {
189 Ok(RegionServerEvent::Registered(region_id)) => {
190 keeper.register_region(region_id).await;
191 }
192 Ok(RegionServerEvent::Deregistered(region_id)) => {
193 keeper.deregister_region(region_id).await;
194 }
195 Err(mpsc::error::TryRecvError::Disconnected) => {
196 return error::UnexpectedSnafu {
197 violated: "RegionServerEventSender closed",
198 }
199 .fail()
200 }
201 Err(mpsc::error::TryRecvError::Empty) => {
202 break;
203 }
204 }
205 }
206 let running = self.started.clone();
207
208 common_runtime::spawn_global(async move {
210 loop {
211 if !running.load(Ordering::Relaxed) {
212 info!("RegionAliveKeeper stopped! Quits the watch loop!");
213 break;
214 }
215
216 match event_receiver.0.recv().await {
217 Some(RegionServerEvent::Registered(region_id)) => {
218 keeper.register_region(region_id).await;
219 }
220 Some(RegionServerEvent::Deregistered(region_id)) => {
221 keeper.deregister_region(region_id).await;
222 }
223 None => {
224 info!("RegionServerEventSender closed! Quits the watch loop!");
225 break;
226 }
227 }
228 }
229 });
230 }
231
232 let tasks = self.tasks.lock().await;
233 for task in tasks.values() {
234 task.start(self.heartbeat_interval_millis).await;
235 }
236
237 info!(
238 "RegionAliveKeeper is started with region {:?}",
239 tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
240 );
241
242 Ok(())
243 }
244
245 pub fn epoch(&self) -> Instant {
246 self.epoch
247 }
248}
249
250#[async_trait]
251impl HeartbeatResponseHandler for RegionAliveKeeper {
252 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
253 ctx.response.region_lease.is_some()
254 }
255
256 async fn handle(
257 &self,
258 ctx: &mut HeartbeatResponseHandlerContext,
259 ) -> common_meta::error::Result<HandleControl> {
260 let region_lease = ctx
261 .response
262 .region_lease
263 .as_ref()
264 .context(InvalidProtoMsgSnafu {
265 err_msg: "'region_lease' is missing in heartbeat response",
266 })?;
267 let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
268 let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
269
270 self.renew_region_leases(®ion_lease.regions, deadline)
271 .await;
272 self.close_staled_regions(®ion_lease.closeable_region_ids)
273 .await;
274
275 Ok(HandleControl::Continue)
276 }
277}
278
279#[derive(Debug)]
280enum CountdownCommand {
281 Start(u64),
284 Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
287 #[cfg(test)]
289 Deadline(oneshot::Sender<Instant>),
290}
291
292pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
293
294#[async_trait]
296pub trait CountdownTaskHandlerExt: Send + Sync {
297 async fn reset_deadline(
298 &self,
299 region_server: &RegionServer,
300 region_id: RegionId,
301 role: RegionRole,
302 deadline: Instant,
303 extension_info: HashMap<String, Vec<u8>>,
304 );
305}
306
307struct CountdownTaskHandle {
308 tx: mpsc::Sender<CountdownCommand>,
309 handler: JoinHandle<()>,
310 region_id: RegionId,
311}
312
313impl CountdownTaskHandle {
314 fn new(
316 region_server: RegionServer,
317 handler_ext: Option<CountdownTaskHandlerExtRef>,
318 region_id: RegionId,
319 ) -> Self {
320 let (tx, rx) = mpsc::channel(1024);
321
322 let mut countdown_task = CountdownTask {
323 region_server,
324 handler_ext,
325 region_id,
326 rx,
327 };
328 let handler = common_runtime::spawn_hb(async move {
329 countdown_task.run().await;
330 });
331
332 Self {
333 tx,
334 handler,
335 region_id,
336 }
337 }
338
339 async fn start(&self, heartbeat_interval_millis: u64) {
342 if let Err(e) = self
343 .tx
344 .send(CountdownCommand::Start(heartbeat_interval_millis))
345 .await
346 {
347 warn!(
348 "Failed to start region alive keeper countdown: {e}. \
349 Maybe the task is stopped due to region been closed."
350 );
351 }
352 }
353
354 #[cfg(test)]
355 async fn deadline(&self) -> Option<Instant> {
356 let (tx, rx) = oneshot::channel();
357 if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
358 return rx.await.ok();
359 }
360 None
361 }
362
363 async fn reset_deadline(
364 &self,
365 role: RegionRole,
366 deadline: Instant,
367 extension_info: HashMap<String, Vec<u8>>,
368 ) {
369 if let Err(e) = self
370 .tx
371 .send(CountdownCommand::Reset((role, deadline, extension_info)))
372 .await
373 {
374 warn!(
375 "Failed to reset region alive keeper deadline: {e}. \
376 Maybe the task is stopped due to region been closed."
377 );
378 }
379 }
380}
381
382impl Drop for CountdownTaskHandle {
383 fn drop(&mut self) {
384 debug!(
385 "Aborting region alive countdown task for region {}",
386 self.region_id
387 );
388 self.handler.abort();
389 }
390}
391
392struct CountdownTask {
393 region_server: RegionServer,
394 region_id: RegionId,
395 handler_ext: Option<CountdownTaskHandlerExtRef>,
396 rx: mpsc::Receiver<CountdownCommand>,
397}
398
399impl CountdownTask {
400 async fn run(&mut self) {
401 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
403
404 let countdown = tokio::time::sleep_until(far_future);
407 tokio::pin!(countdown);
408 let region_id = self.region_id;
409
410 let mut started = false;
411 loop {
412 tokio::select! {
413 command = self.rx.recv() => {
414 match command {
415 Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
416 if !started {
417 let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
421 countdown.set(tokio::time::sleep_until(first_deadline));
422 started = true;
423 }
424 },
425 Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
426 if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
427 if err.status_code() == StatusCode::RegionNotFound {
428 warn!(err; "Failed to set region role to {role} for region {region_id}");
432 }else{
433 error!(err; "Failed to set region role to {role} for region {region_id}");
434 }
435
436 }
437 if let Some(ext_handler) = self.handler_ext.as_ref() {
438 ext_handler.reset_deadline(
439 &self.region_server,
440 self.region_id,
441 role,
442 deadline,
443 extension_info,
444 ).await;
445 }
446 trace!(
447 "Reset deadline of region {region_id} to approximately {} seconds later.",
448 (deadline - Instant::now()).as_secs_f32(),
449 );
450 countdown.set(tokio::time::sleep_until(deadline));
451 },
452 None => {
453 info!(
454 "The handle of countdown task for region {region_id}\
455 is dropped, RegionAliveKeeper out."
456 );
457 break;
458 },
459 #[cfg(test)]
460 Some(CountdownCommand::Deadline(tx)) => {
461 let _ = tx.send(countdown.deadline());
462 }
463 }
464 }
465 () = &mut countdown => {
466 warn!("The region {region_id} lease is expired, convert region to follower.");
467 if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
468 error!(err; "Failed to set region role to follower for region {region_id}");
469 }
470 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
472 countdown.as_mut().reset(far_future);
473 }
474 }
475 }
476 }
477}
478
479#[cfg(test)]
480mod test {
481
482 use mito2::config::MitoConfig;
483 use mito2::test_util::{CreateRequestBuilder, TestEnv};
484 use store_api::region_engine::RegionEngine;
485
486 use super::*;
487 use crate::tests::mock_region_server;
488
489 #[tokio::test(flavor = "multi_thread")]
490 async fn region_alive_keeper() {
491 common_telemetry::init_default_ut_logging();
492 let mut region_server = mock_region_server();
493 let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
494 let engine = engine_env.create_engine(MitoConfig::default()).await;
495 let engine = Arc::new(engine);
496 region_server.register_engine(engine.clone());
497
498 let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
499
500 let region_id = RegionId::new(1024, 1);
501 let builder = CreateRequestBuilder::new();
502 region_server
503 .handle_request(region_id, RegionRequest::Create(builder.build()))
504 .await
505 .unwrap();
506 region_server
507 .set_region_role(region_id, RegionRole::Leader)
508 .unwrap();
509
510 alive_keeper.register_region(region_id).await;
512 assert!(alive_keeper.find_handle(region_id).await.is_some());
513
514 info!("Start the keeper");
515 alive_keeper.start(None).await.unwrap();
516
517 let deadline = alive_keeper.deadline(region_id).await.unwrap();
519 assert!(deadline >= Instant::now());
520 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
521
522 info!("Wait for lease expired");
523 tokio::time::sleep(Duration::from_millis(500)).await;
525 assert!(alive_keeper.find_handle(region_id).await.is_some());
526 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
527
528 info!("Renew the region lease");
529 alive_keeper
531 .renew_region_leases(
532 &[GrantedRegion {
533 region_id: region_id.as_u64(),
534 role: api::v1::meta::RegionRole::Leader.into(),
535 extensions: HashMap::new(),
536 }],
537 Instant::now() + Duration::from_millis(200),
538 )
539 .await;
540 tokio::time::sleep(Duration::from_millis(100)).await;
541 assert!(alive_keeper.find_handle(region_id).await.is_some());
542 let deadline = alive_keeper.deadline(region_id).await.unwrap();
543 assert!(deadline >= Instant::now());
544 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
545
546 info!("Wait for lease expired");
547 tokio::time::sleep(Duration::from_millis(200)).await;
549 assert!(alive_keeper.find_handle(region_id).await.is_some());
550 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
551
552 let deadline = alive_keeper.deadline(region_id).await.unwrap();
553 assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
554 }
555
556 #[tokio::test(flavor = "multi_thread")]
557 async fn countdown_task() {
558 let region_server = mock_region_server();
559
560 let countdown_handle =
561 CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
562
563 assert!(
565 countdown_handle.deadline().await.unwrap()
566 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
567 );
568
569 let heartbeat_interval_millis = 100;
572 countdown_handle.start(heartbeat_interval_millis).await;
573 assert!(
574 countdown_handle.deadline().await.unwrap()
575 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
576 );
577 tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
578
579 countdown_handle.start(heartbeat_interval_millis).await;
581 assert!(
582 countdown_handle.deadline().await.unwrap()
583 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
584 );
585
586 countdown_handle
588 .reset_deadline(
589 RegionRole::Leader,
590 Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
591 HashMap::new(),
592 )
593 .await;
594 assert!(
595 countdown_handle.deadline().await.unwrap()
596 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
597 );
598 }
599}