datanode/heartbeat/handler/
close_region.rs1use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
16use common_telemetry::warn;
17use futures::future::join_all;
18use store_api::region_request::{RegionCloseRequest, RegionRequest};
19use store_api::storage::RegionId;
20
21use crate::error;
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct CloseRegionsHandler;
26
27#[async_trait::async_trait]
28impl InstructionHandler for CloseRegionsHandler {
29 async fn handle(
30 &self,
31 ctx: &HandlerContext,
32 instruction: Instruction,
33 ) -> Option<InstructionReply> {
34 let region_idents = instruction.into_close_regions().unwrap();
36 let region_ids = region_idents
37 .into_iter()
38 .map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))
39 .collect::<Vec<_>>();
40
41 let futs = region_ids.iter().map(|region_id| {
42 ctx.region_server
43 .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
44 });
45
46 let results = join_all(futs).await;
47
48 let mut errors = vec![];
49 for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
50 match result {
51 Ok(_) => (),
52 Err(error::Error::RegionNotFound { .. }) => {
53 warn!(
54 "Received a close regions instruction from meta, but target region:{} is not found.",
55 region_id
56 );
57 }
58 Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
59 }
60 }
61
62 if errors.is_empty() {
63 return Some(InstructionReply::CloseRegions(SimpleReply {
64 result: true,
65 error: None,
66 }));
67 }
68
69 Some(InstructionReply::CloseRegions(SimpleReply {
70 result: false,
71 error: Some(errors.join("; ")),
72 }))
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use std::assert_matches;
79 use std::sync::Arc;
80
81 use assert_matches::assert_matches;
82 use common_meta::RegionIdent;
83 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
84 use common_meta::heartbeat::mailbox::MessageMeta;
85 use common_meta::instruction::Instruction;
86 use mito2::config::MitoConfig;
87 use mito2::engine::MITO_ENGINE_NAME;
88 use mito2::test_util::{CreateRequestBuilder, TestEnv};
89 use store_api::region_request::RegionRequest;
90 use store_api::storage::RegionId;
91
92 use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
93 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
94 use crate::tests::mock_region_server;
95
96 fn close_regions_instruction(region_ids: impl IntoIterator<Item = RegionId>) -> Instruction {
97 let region_idents = region_ids
98 .into_iter()
99 .map(|region_id| RegionIdent {
100 table_id: region_id.table_id(),
101 region_number: region_id.region_number(),
102 datanode_id: 2,
103 engine: MITO_ENGINE_NAME.to_string(),
104 })
105 .collect();
106
107 Instruction::CloseRegions(region_idents)
108 }
109
110 #[tokio::test]
111 async fn test_close_regions() {
112 common_telemetry::init_default_ut_logging();
113
114 let mut region_server = mock_region_server();
115 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
116 let mut engine_env = TestEnv::with_prefix("close-regions").await;
117 let engine = engine_env.create_engine(MitoConfig::default()).await;
118 region_server.register_engine(Arc::new(engine.clone()));
119 let region_id = RegionId::new(1024, 1);
120 let region_id1 = RegionId::new(1024, 2);
121
122 let builder = CreateRequestBuilder::new();
123 let create_req = builder.build();
124 region_server
125 .handle_request(region_id, RegionRequest::Create(create_req))
126 .await
127 .unwrap();
128
129 let create_req1 = builder.build();
130 region_server
131 .handle_request(region_id1, RegionRequest::Create(create_req1))
132 .await
133 .unwrap();
134 let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
135 let instruction =
136 close_regions_instruction([region_id, region_id1, RegionId::new(1024, 3)]);
137 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
138 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
139 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
140 assert_matches!(control, HandleControl::Continue);
141
142 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
143 let reply = reply.expect_close_regions_reply();
144 assert!(reply.result);
145 assert!(reply.error.is_none());
146 assert!(!engine.is_region_exists(region_id));
147 assert!(!engine.is_region_exists(region_id1));
148 assert!(!engine.is_region_exists(RegionId::new(1024, 3)));
149 }
150}