datanode/heartbeat/handler/
upgrade_region.rsuse common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCatchupRequest, RegionRequest};
use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;
impl HandlerContext {
pub(crate) fn handle_upgrade_region_instruction(
self,
UpgradeRegion {
region_id,
last_entry_id,
replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
});
};
if writable {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
});
}
let region_server_moved = self.region_server.clone();
let register_result = self
.catchup_tasks
.try_register(
region_id,
Box::pin(async move {
info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
region_server_moved
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another catchup task is running for the region: {region_id}");
}
let Some(replay_timeout) = replay_timeout else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
});
};
let mut watcher = register_result.into_watcher();
let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
match result {
WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}),
WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}),
WaitResult::Finish(Err(err)) => {
InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(format!("{err:?}")),
})
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::{InstructionReply, UpgradeRegion};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
unreachable!();
}));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_not_ready() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_not_ready_with_retry() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
mock_region_server.register_test_region(region_id, mock_engine);
let waits = vec![
Some(Duration::from_millis(100u64)),
Some(Duration::from_millis(100u64)),
];
let handler_context = HandlerContext::new_for_test(mock_region_server);
for replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
replay_timeout,
last_entry_id: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
let timer = Instant::now();
let reply = handler_context
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
#[tokio::test]
async fn test_region_error() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.error.unwrap().contains("mock_error"));
}
}
}