1use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19use api::v1::meta::GrantedRegion;
20use async_trait::async_trait;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_meta::error::InvalidProtoMsgSnafu;
24use common_meta::heartbeat::handler::{
25 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
26};
27use common_telemetry::{debug, error, info, trace, warn};
28use snafu::OptionExt;
29use store_api::region_engine::RegionRole;
30use store_api::region_request::{RegionCloseRequest, RegionRequest};
31use store_api::storage::RegionId;
32#[cfg(test)]
33use tokio::sync::oneshot;
34use tokio::sync::{mpsc, Mutex};
35use tokio::task::JoinHandle;
36use tokio::time::{Duration, Instant};
37
38use crate::error::{self, Result};
39use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
40use crate::region_server::RegionServer;
41
42pub struct RegionAliveKeeper {
51 region_server: RegionServer,
52 tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
53 heartbeat_interval_millis: u64,
54 started: Arc<AtomicBool>,
55
56 epoch: Instant,
61
62 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
63}
64
65impl RegionAliveKeeper {
66 pub fn new(
68 region_server: RegionServer,
69 countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
70 heartbeat_interval_millis: u64,
71 ) -> Self {
72 Self {
73 region_server,
74 tasks: Arc::new(Mutex::new(HashMap::new())),
75 heartbeat_interval_millis,
76 started: Arc::new(AtomicBool::new(false)),
77 epoch: Instant::now(),
78 countdown_task_handler_ext,
79 }
80 }
81
82 async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
83 self.tasks.lock().await.get(®ion_id).cloned()
84 }
85
86 pub async fn register_region(&self, region_id: RegionId) {
89 if self.find_handle(region_id).await.is_some() {
90 return;
91 }
92
93 let handle = Arc::new(CountdownTaskHandle::new(
94 self.region_server.clone(),
95 self.countdown_task_handler_ext.clone(),
96 region_id,
97 ));
98
99 let mut handles = self.tasks.lock().await;
100 let _ = handles.insert(region_id, handle.clone());
101
102 if self.started.load(Ordering::Relaxed) {
103 handle.start(self.heartbeat_interval_millis).await;
104
105 info!("Region alive countdown for region {region_id} is started!",);
106 } else {
107 info!(
108 "Region alive countdown for region {region_id} is registered but not started yet!",
109 );
110 }
111 }
112
113 pub async fn deregister_region(&self, region_id: RegionId) {
115 if self.tasks.lock().await.remove(®ion_id).is_some() {
116 info!("Deregister alive countdown for region {region_id}")
117 }
118 }
119
120 async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
122 for region in regions {
123 let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
124 if let Some(handle) = self.find_handle(region_id).await {
125 handle
126 .reset_deadline(role, deadline, region.extensions.clone())
127 .await;
128 } else {
129 warn!(
130 "Trying to renew the lease for region {region_id}, the keeper handler is not found!"
131 );
132 }
134 }
135 }
136
137 async fn close_staled_region(&self, region_id: RegionId) {
138 info!("Closing staled region: {region_id}");
139 let request = RegionRequest::Close(RegionCloseRequest {});
140 if let Err(e) = self.region_server.handle_request(region_id, request).await {
141 if e.status_code() != StatusCode::RegionNotFound {
142 let _ = self
143 .region_server
144 .set_region_role(region_id, RegionRole::Follower);
145 error!(e; "Failed to close staled region {}, convert region to follower.", region_id);
146 }
147 }
148 }
149
150 async fn close_staled_regions(&self, regions: &[u64]) {
152 for region_id in regions {
153 self.close_staled_region(RegionId::from_u64(*region_id))
154 .await;
155 }
156 }
157
158 #[cfg(test)]
159 async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
160 let mut deadline = None;
161 if let Some(handle) = self.find_handle(region_id).await {
162 let (s, r) = oneshot::channel();
163 if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
164 deadline = r.await.ok()
165 }
166 }
167 deadline
168 }
169
170 pub async fn start(
171 self: &Arc<Self>,
172 event_receiver: Option<RegionServerEventReceiver>,
173 ) -> Result<()> {
174 self.started.store(true, Ordering::Relaxed);
175
176 if let Some(mut event_receiver) = event_receiver {
177 let keeper = self.clone();
178 loop {
181 match event_receiver.0.try_recv() {
182 Ok(RegionServerEvent::Registered(region_id)) => {
183 keeper.register_region(region_id).await;
184 }
185 Ok(RegionServerEvent::Deregistered(region_id)) => {
186 keeper.deregister_region(region_id).await;
187 }
188 Err(mpsc::error::TryRecvError::Disconnected) => {
189 return error::UnexpectedSnafu {
190 violated: "RegionServerEventSender closed",
191 }
192 .fail()
193 }
194 Err(mpsc::error::TryRecvError::Empty) => {
195 break;
196 }
197 }
198 }
199 let running = self.started.clone();
200
201 common_runtime::spawn_global(async move {
203 loop {
204 if !running.load(Ordering::Relaxed) {
205 info!("RegionAliveKeeper stopped! Quits the watch loop!");
206 break;
207 }
208
209 match event_receiver.0.recv().await {
210 Some(RegionServerEvent::Registered(region_id)) => {
211 keeper.register_region(region_id).await;
212 }
213 Some(RegionServerEvent::Deregistered(region_id)) => {
214 keeper.deregister_region(region_id).await;
215 }
216 None => {
217 info!("RegionServerEventSender closed! Quits the watch loop!");
218 break;
219 }
220 }
221 }
222 });
223 }
224
225 let tasks = self.tasks.lock().await;
226 for task in tasks.values() {
227 task.start(self.heartbeat_interval_millis).await;
228 }
229
230 info!(
231 "RegionAliveKeeper is started with region {:?}",
232 tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
233 );
234
235 Ok(())
236 }
237
238 pub fn epoch(&self) -> Instant {
239 self.epoch
240 }
241}
242
243#[async_trait]
244impl HeartbeatResponseHandler for RegionAliveKeeper {
245 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
246 ctx.response.region_lease.is_some()
247 }
248
249 async fn handle(
250 &self,
251 ctx: &mut HeartbeatResponseHandlerContext,
252 ) -> common_meta::error::Result<HandleControl> {
253 let region_lease = ctx
254 .response
255 .region_lease
256 .as_ref()
257 .context(InvalidProtoMsgSnafu {
258 err_msg: "'region_lease' is missing in heartbeat response",
259 })?;
260 let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
261 let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
262
263 self.renew_region_leases(®ion_lease.regions, deadline)
264 .await;
265 self.close_staled_regions(®ion_lease.closeable_region_ids)
266 .await;
267
268 Ok(HandleControl::Continue)
269 }
270}
271
272#[derive(Debug)]
273enum CountdownCommand {
274 Start(u64),
277 Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
280 #[cfg(test)]
282 Deadline(oneshot::Sender<Instant>),
283}
284
285pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
286
287#[async_trait]
289pub trait CountdownTaskHandlerExt: Send + Sync {
290 async fn reset_deadline(
291 &self,
292 region_server: &RegionServer,
293 region_id: RegionId,
294 role: RegionRole,
295 deadline: Instant,
296 extension_info: HashMap<String, Vec<u8>>,
297 );
298}
299
300struct CountdownTaskHandle {
301 tx: mpsc::Sender<CountdownCommand>,
302 handler: JoinHandle<()>,
303 region_id: RegionId,
304}
305
306impl CountdownTaskHandle {
307 fn new(
309 region_server: RegionServer,
310 handler_ext: Option<CountdownTaskHandlerExtRef>,
311 region_id: RegionId,
312 ) -> Self {
313 let (tx, rx) = mpsc::channel(1024);
314
315 let mut countdown_task = CountdownTask {
316 region_server,
317 handler_ext,
318 region_id,
319 rx,
320 };
321 let handler = common_runtime::spawn_hb(async move {
322 countdown_task.run().await;
323 });
324
325 Self {
326 tx,
327 handler,
328 region_id,
329 }
330 }
331
332 async fn start(&self, heartbeat_interval_millis: u64) {
335 if let Err(e) = self
336 .tx
337 .send(CountdownCommand::Start(heartbeat_interval_millis))
338 .await
339 {
340 warn!(
341 "Failed to start region alive keeper countdown: {e}. \
342 Maybe the task is stopped due to region been closed."
343 );
344 }
345 }
346
347 #[cfg(test)]
348 async fn deadline(&self) -> Option<Instant> {
349 let (tx, rx) = oneshot::channel();
350 if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
351 return rx.await.ok();
352 }
353 None
354 }
355
356 async fn reset_deadline(
357 &self,
358 role: RegionRole,
359 deadline: Instant,
360 extension_info: HashMap<String, Vec<u8>>,
361 ) {
362 if let Err(e) = self
363 .tx
364 .send(CountdownCommand::Reset((role, deadline, extension_info)))
365 .await
366 {
367 warn!(
368 "Failed to reset region alive keeper deadline: {e}. \
369 Maybe the task is stopped due to region been closed."
370 );
371 }
372 }
373}
374
375impl Drop for CountdownTaskHandle {
376 fn drop(&mut self) {
377 debug!(
378 "Aborting region alive countdown task for region {}",
379 self.region_id
380 );
381 self.handler.abort();
382 }
383}
384
385struct CountdownTask {
386 region_server: RegionServer,
387 region_id: RegionId,
388 handler_ext: Option<CountdownTaskHandlerExtRef>,
389 rx: mpsc::Receiver<CountdownCommand>,
390}
391
392impl CountdownTask {
393 async fn run(&mut self) {
394 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
396
397 let countdown = tokio::time::sleep_until(far_future);
400 tokio::pin!(countdown);
401 let region_id = self.region_id;
402
403 let mut started = false;
404 loop {
405 tokio::select! {
406 command = self.rx.recv() => {
407 match command {
408 Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
409 if !started {
410 let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
414 countdown.set(tokio::time::sleep_until(first_deadline));
415 started = true;
416 }
417 },
418 Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
419 if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
420 error!(err; "Failed to set region role to {role} for region {region_id}");
421 }
422 if let Some(ext_handler) = self.handler_ext.as_ref() {
423 ext_handler.reset_deadline(
424 &self.region_server,
425 self.region_id,
426 role,
427 deadline,
428 extension_info,
429 ).await;
430 }
431 trace!(
432 "Reset deadline of region {region_id} to approximately {} seconds later.",
433 (deadline - Instant::now()).as_secs_f32(),
434 );
435 countdown.set(tokio::time::sleep_until(deadline));
436 },
437 None => {
438 info!(
439 "The handle of countdown task for region {region_id}\
440 is dropped, RegionAliveKeeper out."
441 );
442 break;
443 },
444 #[cfg(test)]
445 Some(CountdownCommand::Deadline(tx)) => {
446 let _ = tx.send(countdown.deadline());
447 }
448 }
449 }
450 () = &mut countdown => {
451 warn!("The region {region_id} lease is expired, convert region to follower.");
452 if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
453 error!(err; "Failed to set region role to follower for region {region_id}");
454 }
455 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
457 countdown.as_mut().reset(far_future);
458 }
459 }
460 }
461 }
462}
463
464#[cfg(test)]
465mod test {
466
467 use mito2::config::MitoConfig;
468 use mito2::test_util::{CreateRequestBuilder, TestEnv};
469 use store_api::region_engine::RegionEngine;
470
471 use super::*;
472 use crate::tests::mock_region_server;
473
474 #[tokio::test(flavor = "multi_thread")]
475 async fn region_alive_keeper() {
476 common_telemetry::init_default_ut_logging();
477 let mut region_server = mock_region_server();
478 let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
479 let engine = engine_env.create_engine(MitoConfig::default()).await;
480 let engine = Arc::new(engine);
481 region_server.register_engine(engine.clone());
482
483 let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
484
485 let region_id = RegionId::new(1024, 1);
486 let builder = CreateRequestBuilder::new();
487 region_server
488 .handle_request(region_id, RegionRequest::Create(builder.build()))
489 .await
490 .unwrap();
491 region_server
492 .set_region_role(region_id, RegionRole::Leader)
493 .unwrap();
494
495 alive_keeper.register_region(region_id).await;
497 assert!(alive_keeper.find_handle(region_id).await.is_some());
498
499 info!("Start the keeper");
500 alive_keeper.start(None).await.unwrap();
501
502 let deadline = alive_keeper.deadline(region_id).await.unwrap();
504 assert!(deadline >= Instant::now());
505 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
506
507 info!("Wait for lease expired");
508 tokio::time::sleep(Duration::from_millis(500)).await;
510 assert!(alive_keeper.find_handle(region_id).await.is_some());
511 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
512
513 info!("Renew the region lease");
514 alive_keeper
516 .renew_region_leases(
517 &[GrantedRegion {
518 region_id: region_id.as_u64(),
519 role: api::v1::meta::RegionRole::Leader.into(),
520 extensions: HashMap::new(),
521 }],
522 Instant::now() + Duration::from_millis(200),
523 )
524 .await;
525 tokio::time::sleep(Duration::from_millis(100)).await;
526 assert!(alive_keeper.find_handle(region_id).await.is_some());
527 let deadline = alive_keeper.deadline(region_id).await.unwrap();
528 assert!(deadline >= Instant::now());
529 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
530
531 info!("Wait for lease expired");
532 tokio::time::sleep(Duration::from_millis(200)).await;
534 assert!(alive_keeper.find_handle(region_id).await.is_some());
535 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
536
537 let deadline = alive_keeper.deadline(region_id).await.unwrap();
538 assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
539 }
540
541 #[tokio::test(flavor = "multi_thread")]
542 async fn countdown_task() {
543 let region_server = mock_region_server();
544
545 let countdown_handle =
546 CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
547
548 assert!(
550 countdown_handle.deadline().await.unwrap()
551 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
552 );
553
554 let heartbeat_interval_millis = 100;
557 countdown_handle.start(heartbeat_interval_millis).await;
558 assert!(
559 countdown_handle.deadline().await.unwrap()
560 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
561 );
562 tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
563
564 countdown_handle.start(heartbeat_interval_millis).await;
566 assert!(
567 countdown_handle.deadline().await.unwrap()
568 > Instant::now() + Duration::from_secs(86400 * 365 * 29)
569 );
570
571 countdown_handle
573 .reset_deadline(
574 RegionRole::Leader,
575 Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
576 HashMap::new(),
577 )
578 .await;
579 assert!(
580 countdown_handle.deadline().await.unwrap()
581 > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
582 );
583 }
584}