datanode/heartbeat/handler/
close_region.rs1use common_meta::RegionIdent;
16use common_meta::instruction::{InstructionReply, SimpleReply};
17use common_telemetry::warn;
18use futures::future::join_all;
19use store_api::region_request::{RegionCloseRequest, RegionRequest};
20use store_api::storage::RegionId;
21
22use crate::error;
23use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
24
25#[derive(Debug, Clone, Copy, Default)]
26pub struct CloseRegionsHandler;
27
28#[async_trait::async_trait]
29impl InstructionHandler for CloseRegionsHandler {
30 type Instruction = Vec<RegionIdent>;
31
32 async fn handle(
33 &self,
34 ctx: &HandlerContext,
35 region_idents: Self::Instruction,
36 ) -> Option<InstructionReply> {
37 let region_ids = region_idents
38 .into_iter()
39 .map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))
40 .collect::<Vec<_>>();
41
42 let futs = region_ids.iter().map(|region_id| {
43 ctx.region_server
44 .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
45 });
46
47 let results = join_all(futs).await;
48
49 let mut errors = vec![];
50 for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
51 match result {
52 Ok(_) => (),
53 Err(error::Error::RegionNotFound { .. }) => {
54 warn!(
55 "Received a close regions instruction from meta, but target region:{} is not found.",
56 region_id
57 );
58 }
59 Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
60 }
61 }
62
63 if errors.is_empty() {
64 return Some(InstructionReply::CloseRegions(SimpleReply {
65 result: true,
66 error: None,
67 }));
68 }
69
70 Some(InstructionReply::CloseRegions(SimpleReply {
71 result: false,
72 error: Some(errors.join("; ")),
73 }))
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use std::assert_matches;
80 use std::sync::Arc;
81
82 use assert_matches::assert_matches;
83 use common_meta::RegionIdent;
84 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
85 use common_meta::heartbeat::mailbox::MessageMeta;
86 use common_meta::instruction::Instruction;
87 use common_meta::kv_backend::memory::MemoryKvBackend;
88 use mito2::config::MitoConfig;
89 use mito2::engine::MITO_ENGINE_NAME;
90 use mito2::test_util::{CreateRequestBuilder, TestEnv};
91 use store_api::region_request::RegionRequest;
92 use store_api::storage::RegionId;
93
94 use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
95 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
96 use crate::tests::mock_region_server;
97
98 fn close_regions_instruction(region_ids: impl IntoIterator<Item = RegionId>) -> Instruction {
99 let region_idents = region_ids
100 .into_iter()
101 .map(|region_id| RegionIdent {
102 table_id: region_id.table_id(),
103 region_number: region_id.region_number(),
104 datanode_id: 2,
105 engine: MITO_ENGINE_NAME.to_string(),
106 })
107 .collect();
108
109 Instruction::CloseRegions(region_idents)
110 }
111
112 #[tokio::test]
113 async fn test_close_regions() {
114 common_telemetry::init_default_ut_logging();
115
116 let mut region_server = mock_region_server();
117 let kv_backend = Arc::new(MemoryKvBackend::new());
118 let heartbeat_handler =
119 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
120 let mut engine_env = TestEnv::with_prefix("close-regions").await;
121 let engine = engine_env.create_engine(MitoConfig::default()).await;
122 region_server.register_engine(Arc::new(engine.clone()));
123 let region_id = RegionId::new(1024, 1);
124 let region_id1 = RegionId::new(1024, 2);
125
126 let builder = CreateRequestBuilder::new();
127 let create_req = builder.build();
128 region_server
129 .handle_request(region_id, RegionRequest::Create(create_req))
130 .await
131 .unwrap();
132
133 let create_req1 = builder.build();
134 region_server
135 .handle_request(region_id1, RegionRequest::Create(create_req1))
136 .await
137 .unwrap();
138 let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
139 let instruction =
140 close_regions_instruction([region_id, region_id1, RegionId::new(1024, 3)]);
141 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
142 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
143 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
144 assert_matches!(control, HandleControl::Continue);
145
146 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
147 let reply = reply.expect_close_regions_reply();
148 assert!(reply.result);
149 assert!(reply.error.is_none());
150 assert!(!engine.is_region_exists(region_id));
151 assert!(!engine.is_region_exists(region_id1));
152 assert!(!engine.is_region_exists(RegionId::new(1024, 3)));
153 }
154}