datanode/heartbeat/handler/
downgrade_region.rsuse common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
use common_telemetry::tracing::info;
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;
impl HandlerContext {
async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> InstructionReply {
match self
.region_server
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: true,
error: None,
})
}
Ok(SetRegionRoleStateResponse::NotFound) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
})
}
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}),
}
}
pub(crate) fn handle_downgrade_region_instruction(
self,
DowngradeRegion {
region_id,
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
warn!("Region: {region_id} is not found");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
});
};
let region_server_moved = self.region_server.clone();
if !writable {
warn!(
"Region: {region_id} is not writable, flush_timeout: {:?}",
flush_timeout
);
return self.downgrade_to_follower_gracefully(region_id).await;
}
let Some(flush_timeout) = flush_timeout else {
return self.downgrade_to_follower_gracefully(region_id).await;
};
if reject_write {
match self
.region_server
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
});
}
Err(err) => {
warn!(err; "Failed to convert region to downgrading leader");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
});
}
}
}
let register_result = self
.downgrade_tasks
.try_register(
region_id,
Box::pin(async move {
info!("Flush region: {region_id} before converting region to follower");
region_server_moved
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await;
match result {
WaitResult::Timeout => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
}),
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::{DowngradeRegion, InstructionReply};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SetRegionRoleStateResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for flush_timeout in waits {
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
}
}
}
#[tokio::test]
async fn test_region_readonly() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
if let RegionRequest::Flush(_) = req {
unreachable!();
};
Ok(0)
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for flush_timeout in waits {
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
}
}
}
#[tokio::test]
async fn test_region_flush_timeout() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_secs(100));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let flush_timeout = Duration::from_millis(100);
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
}
}
#[tokio::test]
async fn test_region_flush_timeout_and_retry() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_millis(300));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let waits = vec![
Some(Duration::from_millis(100u64)),
Some(Duration::from_millis(100u64)),
];
for flush_timeout in waits {
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
}
}
let timer = Instant::now();
let reply = handler_context
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
}
}
#[tokio::test]
async fn test_region_flush_timeout_and_retry_error() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_millis(300));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock flush failed",
}
.fail()
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let waits = vec![
Some(Duration::from_millis(100u64)),
Some(Duration::from_millis(100u64)),
];
for flush_timeout in waits {
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
}
}
let timer = Instant::now();
let reply = handler_context
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("flush failed"));
assert!(reply.last_entry_id.is_none());
}
}
#[tokio::test]
async fn test_set_region_readonly_not_found() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_set_readonly_gracefully_mock_fn =
Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
}
}
#[tokio::test]
async fn test_set_region_readonly_error() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
error::UnexpectedSnafu {
violated: "Failed to set region to readonly",
}
.fail()
}));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = handler_context
.clone()
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply
.error
.unwrap()
.contains("Failed to set region to readonly"));
assert!(reply.last_entry_id.is_none());
}
}
}