1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::RegionIdent;
22use common_telemetry::error;
23use futures::future::BoxFuture;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26
27mod close_region;
28mod downgrade_region;
29mod flush_region;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::task_tracker::TaskTracker;
34use crate::region_server::RegionServer;
35
36#[derive(Clone)]
38pub struct RegionHeartbeatResponseHandler {
39 region_server: RegionServer,
40 catchup_tasks: TaskTracker<()>,
41 downgrade_tasks: TaskTracker<()>,
42 flush_tasks: TaskTracker<()>,
43}
44
45pub type InstructionHandler =
47 Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
48
49#[derive(Clone)]
50pub struct HandlerContext {
51 region_server: RegionServer,
52 catchup_tasks: TaskTracker<()>,
53 downgrade_tasks: TaskTracker<()>,
54 flush_tasks: TaskTracker<()>,
55}
56
57impl HandlerContext {
58 fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
59 RegionId::new(region_ident.table_id, region_ident.region_number)
60 }
61
62 #[cfg(test)]
63 pub fn new_for_test(region_server: RegionServer) -> Self {
64 Self {
65 region_server,
66 catchup_tasks: TaskTracker::new(),
67 downgrade_tasks: TaskTracker::new(),
68 flush_tasks: TaskTracker::new(),
69 }
70 }
71}
72
73impl RegionHeartbeatResponseHandler {
74 pub fn new(region_server: RegionServer) -> Self {
76 Self {
77 region_server,
78 catchup_tasks: TaskTracker::new(),
79 downgrade_tasks: TaskTracker::new(),
80 flush_tasks: TaskTracker::new(),
81 }
82 }
83
84 fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
86 match instruction {
87 Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
88 handler_context.handle_open_region_instruction(open_region)
89 })),
90 Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
91 handler_context.handle_close_region_instruction(close_region)
92 })),
93 Instruction::DowngradeRegion(downgrade_region) => {
94 Ok(Box::new(move |handler_context| {
95 handler_context.handle_downgrade_region_instruction(downgrade_region)
96 }))
97 }
98 Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
99 handler_context.handle_upgrade_region_instruction(upgrade_region)
100 })),
101 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
102 Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
103 handler_context.handle_flush_regions_instruction(flush_regions)
104 })),
105 }
106 }
107}
108
109#[async_trait]
110impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
111 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
112 matches!(
113 ctx.incoming_message.as_ref(),
114 Some((_, Instruction::OpenRegion { .. }))
115 | Some((_, Instruction::CloseRegion { .. }))
116 | Some((_, Instruction::DowngradeRegion { .. }))
117 | Some((_, Instruction::UpgradeRegion { .. }))
118 | Some((_, Instruction::FlushRegions { .. }))
119 )
120 }
121
122 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
123 let (meta, instruction) = ctx
124 .incoming_message
125 .take()
126 .context(InvalidHeartbeatResponseSnafu)?;
127
128 let mailbox = ctx.mailbox.clone();
129 let region_server = self.region_server.clone();
130 let catchup_tasks = self.catchup_tasks.clone();
131 let downgrade_tasks = self.downgrade_tasks.clone();
132 let flush_tasks = self.flush_tasks.clone();
133 let handler = Self::build_handler(instruction)?;
134 let _handle = common_runtime::spawn_global(async move {
135 let reply = handler(HandlerContext {
136 region_server,
137 catchup_tasks,
138 downgrade_tasks,
139 flush_tasks,
140 })
141 .await;
142
143 if let Some(reply) = reply {
144 if let Err(e) = mailbox.send((meta, reply)).await {
145 error!(e; "Failed to send reply to mailbox");
146 }
147 }
148 });
149
150 Ok(HandleControl::Continue)
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use std::assert_matches::assert_matches;
157 use std::collections::HashMap;
158 use std::sync::Arc;
159 use std::time::Duration;
160
161 use common_meta::heartbeat::mailbox::{
162 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
163 };
164 use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
165 use mito2::config::MitoConfig;
166 use mito2::engine::MITO_ENGINE_NAME;
167 use mito2::test_util::{CreateRequestBuilder, TestEnv};
168 use store_api::path_utils::table_dir;
169 use store_api::region_engine::RegionRole;
170 use store_api::region_request::{RegionCloseRequest, RegionRequest};
171 use store_api::storage::RegionId;
172 use tokio::sync::mpsc::{self, Receiver};
173
174 use super::*;
175 use crate::error;
176 use crate::tests::mock_region_server;
177
178 pub struct HeartbeatResponseTestEnv {
179 mailbox: MailboxRef,
180 receiver: Receiver<(MessageMeta, InstructionReply)>,
181 }
182
183 impl HeartbeatResponseTestEnv {
184 pub fn new() -> Self {
185 let (tx, rx) = mpsc::channel(8);
186 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
187
188 HeartbeatResponseTestEnv {
189 mailbox,
190 receiver: rx,
191 }
192 }
193
194 pub fn create_handler_ctx(
195 &self,
196 incoming_message: IncomingMessage,
197 ) -> HeartbeatResponseHandlerContext {
198 HeartbeatResponseHandlerContext {
199 mailbox: self.mailbox.clone(),
200 response: Default::default(),
201 incoming_message: Some(incoming_message),
202 }
203 }
204 }
205
206 #[test]
207 fn test_is_acceptable() {
208 common_telemetry::init_default_ut_logging();
209 let region_server = mock_region_server();
210 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
211 let heartbeat_env = HeartbeatResponseTestEnv::new();
212 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
213
214 let region_id = RegionId::new(1024, 1);
216 let storage_path = "test";
217 let instruction = open_region_instruction(region_id, storage_path);
218 assert!(heartbeat_handler
219 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
220
221 let instruction = close_region_instruction(region_id);
223 assert!(heartbeat_handler
224 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
225
226 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
228 region_id: RegionId::new(2048, 1),
229 flush_timeout: Some(Duration::from_secs(1)),
230 });
231 assert!(heartbeat_handler
232 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
233
234 let instruction = Instruction::UpgradeRegion(UpgradeRegion {
236 region_id,
237 ..Default::default()
238 });
239 assert!(
240 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
241 );
242 }
243
244 fn close_region_instruction(region_id: RegionId) -> Instruction {
245 Instruction::CloseRegion(RegionIdent {
246 table_id: region_id.table_id(),
247 region_number: region_id.region_number(),
248 datanode_id: 2,
249 engine: MITO_ENGINE_NAME.to_string(),
250 })
251 }
252
253 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
254 Instruction::OpenRegion(OpenRegion::new(
255 RegionIdent {
256 table_id: region_id.table_id(),
257 region_number: region_id.region_number(),
258 datanode_id: 2,
259 engine: MITO_ENGINE_NAME.to_string(),
260 },
261 path,
262 HashMap::new(),
263 HashMap::new(),
264 false,
265 ))
266 }
267
268 #[tokio::test]
269 async fn test_close_region() {
270 common_telemetry::init_default_ut_logging();
271
272 let mut region_server = mock_region_server();
273 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
274
275 let mut engine_env = TestEnv::with_prefix("close-region").await;
276 let engine = engine_env.create_engine(MitoConfig::default()).await;
277 region_server.register_engine(Arc::new(engine));
278 let region_id = RegionId::new(1024, 1);
279
280 let builder = CreateRequestBuilder::new();
281 let create_req = builder.build();
282 region_server
283 .handle_request(region_id, RegionRequest::Create(create_req))
284 .await
285 .unwrap();
286
287 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
288
289 for _ in 0..2 {
291 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
292 let instruction = close_region_instruction(region_id);
293
294 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
295 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
296 assert_matches!(control, HandleControl::Continue);
297
298 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
299
300 if let InstructionReply::CloseRegion(reply) = reply {
301 assert!(reply.result);
302 assert!(reply.error.is_none());
303 } else {
304 unreachable!()
305 }
306
307 assert_matches!(
308 region_server
309 .set_region_role(region_id, RegionRole::Leader)
310 .unwrap_err(),
311 error::Error::RegionNotFound { .. }
312 );
313 }
314 }
315
316 #[tokio::test]
317 async fn test_open_region_ok() {
318 common_telemetry::init_default_ut_logging();
319
320 let mut region_server = mock_region_server();
321 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
322
323 let mut engine_env = TestEnv::with_prefix("open-region").await;
324 let engine = engine_env.create_engine(MitoConfig::default()).await;
325 region_server.register_engine(Arc::new(engine));
326 let region_id = RegionId::new(1024, 1);
327
328 let builder = CreateRequestBuilder::new();
329 let mut create_req = builder.build();
330 let storage_path = "test";
331 create_req.table_dir = table_dir(storage_path, region_id.table_id());
332
333 region_server
334 .handle_request(region_id, RegionRequest::Create(create_req))
335 .await
336 .unwrap();
337
338 region_server
339 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
340 .await
341 .unwrap();
342 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
343
344 for _ in 0..2 {
346 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
347 let instruction = open_region_instruction(region_id, storage_path);
348
349 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
350 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
351 assert_matches!(control, HandleControl::Continue);
352
353 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
354
355 if let InstructionReply::OpenRegion(reply) = reply {
356 assert!(reply.result);
357 assert!(reply.error.is_none());
358 } else {
359 unreachable!()
360 }
361 }
362 }
363
364 #[tokio::test]
365 async fn test_open_not_exists_region() {
366 common_telemetry::init_default_ut_logging();
367
368 let mut region_server = mock_region_server();
369 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
370
371 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
372 let engine = engine_env.create_engine(MitoConfig::default()).await;
373 region_server.register_engine(Arc::new(engine));
374 let region_id = RegionId::new(1024, 1);
375 let storage_path = "test";
376
377 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
378
379 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
380 let instruction = open_region_instruction(region_id, storage_path);
381
382 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
383 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
384 assert_matches!(control, HandleControl::Continue);
385
386 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
387
388 if let InstructionReply::OpenRegion(reply) = reply {
389 assert!(!reply.result);
390 assert!(reply.error.is_some());
391 } else {
392 unreachable!()
393 }
394 }
395
396 #[tokio::test]
397 async fn test_downgrade_region() {
398 common_telemetry::init_default_ut_logging();
399
400 let mut region_server = mock_region_server();
401 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
402
403 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
404 let engine = engine_env.create_engine(MitoConfig::default()).await;
405 region_server.register_engine(Arc::new(engine));
406 let region_id = RegionId::new(1024, 1);
407
408 let builder = CreateRequestBuilder::new();
409 let mut create_req = builder.build();
410 let storage_path = "test";
411 create_req.table_dir = table_dir(storage_path, region_id.table_id());
412
413 region_server
414 .handle_request(region_id, RegionRequest::Create(create_req))
415 .await
416 .unwrap();
417
418 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
419
420 for _ in 0..2 {
422 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
423 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
424 region_id,
425 flush_timeout: Some(Duration::from_secs(1)),
426 });
427
428 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
429 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
430 assert_matches!(control, HandleControl::Continue);
431
432 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
433
434 if let InstructionReply::DowngradeRegion(reply) = reply {
435 assert!(reply.exists);
436 assert!(reply.error.is_none());
437 assert_eq!(reply.last_entry_id.unwrap(), 0);
438 } else {
439 unreachable!()
440 }
441 }
442
443 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
445 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
446 region_id: RegionId::new(2048, 1),
447 flush_timeout: Some(Duration::from_secs(1)),
448 });
449 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
450 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
451 assert_matches!(control, HandleControl::Continue);
452
453 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
454
455 if let InstructionReply::DowngradeRegion(reply) = reply {
456 assert!(!reply.exists);
457 assert!(reply.error.is_none());
458 assert!(reply.last_entry_id.is_none());
459 } else {
460 unreachable!()
461 }
462 }
463}