1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::RegionIdent;
22use common_telemetry::error;
23use futures::future::BoxFuture;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26
27mod close_region;
28mod downgrade_region;
29mod flush_region;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::task_tracker::TaskTracker;
34use crate::region_server::RegionServer;
35
36#[derive(Clone)]
38pub struct RegionHeartbeatResponseHandler {
39 region_server: RegionServer,
40 catchup_tasks: TaskTracker<()>,
41 downgrade_tasks: TaskTracker<()>,
42 flush_tasks: TaskTracker<()>,
43}
44
45pub type InstructionHandler =
47 Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
48
49#[derive(Clone)]
50pub struct HandlerContext {
51 region_server: RegionServer,
52 catchup_tasks: TaskTracker<()>,
53 downgrade_tasks: TaskTracker<()>,
54 flush_tasks: TaskTracker<()>,
55}
56
57impl HandlerContext {
58 fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
59 RegionId::new(region_ident.table_id, region_ident.region_number)
60 }
61
62 #[cfg(test)]
63 pub fn new_for_test(region_server: RegionServer) -> Self {
64 Self {
65 region_server,
66 catchup_tasks: TaskTracker::new(),
67 downgrade_tasks: TaskTracker::new(),
68 flush_tasks: TaskTracker::new(),
69 }
70 }
71}
72
73impl RegionHeartbeatResponseHandler {
74 pub fn new(region_server: RegionServer) -> Self {
76 Self {
77 region_server,
78 catchup_tasks: TaskTracker::new(),
79 downgrade_tasks: TaskTracker::new(),
80 flush_tasks: TaskTracker::new(),
81 }
82 }
83
84 fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
86 match instruction {
87 Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
88 handler_context.handle_open_region_instruction(open_region)
89 })),
90 Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
91 handler_context.handle_close_region_instruction(close_region)
92 })),
93 Instruction::DowngradeRegion(downgrade_region) => {
94 Ok(Box::new(move |handler_context| {
95 handler_context.handle_downgrade_region_instruction(downgrade_region)
96 }))
97 }
98 Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
99 handler_context.handle_upgrade_region_instruction(upgrade_region)
100 })),
101 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
102 Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
103 handler_context.handle_flush_regions_instruction(flush_regions)
104 })),
105 Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
106 handler_context.handle_flush_region_instruction(flush_region)
107 })),
108 }
109 }
110}
111
112#[async_trait]
113impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
114 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
115 matches!(
116 ctx.incoming_message.as_ref(),
117 Some((_, Instruction::OpenRegion { .. }))
118 | Some((_, Instruction::CloseRegion { .. }))
119 | Some((_, Instruction::DowngradeRegion { .. }))
120 | Some((_, Instruction::UpgradeRegion { .. }))
121 | Some((_, Instruction::FlushRegion { .. }))
122 )
123 }
124
125 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
126 let (meta, instruction) = ctx
127 .incoming_message
128 .take()
129 .context(InvalidHeartbeatResponseSnafu)?;
130
131 let mailbox = ctx.mailbox.clone();
132 let region_server = self.region_server.clone();
133 let catchup_tasks = self.catchup_tasks.clone();
134 let downgrade_tasks = self.downgrade_tasks.clone();
135 let flush_tasks = self.flush_tasks.clone();
136 let handler = Self::build_handler(instruction)?;
137 let _handle = common_runtime::spawn_global(async move {
138 let reply = handler(HandlerContext {
139 region_server,
140 catchup_tasks,
141 downgrade_tasks,
142 flush_tasks,
143 })
144 .await;
145
146 if let Some(reply) = reply {
147 if let Err(e) = mailbox.send((meta, reply)).await {
148 error!(e; "Failed to send reply to mailbox");
149 }
150 }
151 });
152
153 Ok(HandleControl::Continue)
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use std::assert_matches::assert_matches;
160 use std::collections::HashMap;
161 use std::sync::Arc;
162 use std::time::Duration;
163
164 use common_meta::heartbeat::mailbox::{
165 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
166 };
167 use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
168 use mito2::config::MitoConfig;
169 use mito2::engine::MITO_ENGINE_NAME;
170 use mito2::test_util::{CreateRequestBuilder, TestEnv};
171 use store_api::path_utils::region_dir;
172 use store_api::region_engine::RegionRole;
173 use store_api::region_request::{RegionCloseRequest, RegionRequest};
174 use store_api::storage::RegionId;
175 use tokio::sync::mpsc::{self, Receiver};
176
177 use super::*;
178 use crate::error;
179 use crate::tests::mock_region_server;
180
181 pub struct HeartbeatResponseTestEnv {
182 mailbox: MailboxRef,
183 receiver: Receiver<(MessageMeta, InstructionReply)>,
184 }
185
186 impl HeartbeatResponseTestEnv {
187 pub fn new() -> Self {
188 let (tx, rx) = mpsc::channel(8);
189 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
190
191 HeartbeatResponseTestEnv {
192 mailbox,
193 receiver: rx,
194 }
195 }
196
197 pub fn create_handler_ctx(
198 &self,
199 incoming_message: IncomingMessage,
200 ) -> HeartbeatResponseHandlerContext {
201 HeartbeatResponseHandlerContext {
202 mailbox: self.mailbox.clone(),
203 response: Default::default(),
204 incoming_message: Some(incoming_message),
205 }
206 }
207 }
208
209 #[test]
210 fn test_is_acceptable() {
211 common_telemetry::init_default_ut_logging();
212 let region_server = mock_region_server();
213 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
214 let heartbeat_env = HeartbeatResponseTestEnv::new();
215 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
216
217 let region_id = RegionId::new(1024, 1);
219 let storage_path = "test";
220 let instruction = open_region_instruction(region_id, storage_path);
221 assert!(heartbeat_handler
222 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
223
224 let instruction = close_region_instruction(region_id);
226 assert!(heartbeat_handler
227 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
228
229 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
231 region_id: RegionId::new(2048, 1),
232 flush_timeout: Some(Duration::from_secs(1)),
233 });
234 assert!(heartbeat_handler
235 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
236
237 let instruction = Instruction::UpgradeRegion(UpgradeRegion {
239 region_id,
240 last_entry_id: None,
241 metadata_last_entry_id: None,
242 replay_timeout: None,
243 location_id: None,
244 });
245 assert!(
246 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
247 );
248 }
249
250 fn close_region_instruction(region_id: RegionId) -> Instruction {
251 Instruction::CloseRegion(RegionIdent {
252 table_id: region_id.table_id(),
253 region_number: region_id.region_number(),
254 datanode_id: 2,
255 engine: MITO_ENGINE_NAME.to_string(),
256 })
257 }
258
259 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
260 Instruction::OpenRegion(OpenRegion::new(
261 RegionIdent {
262 table_id: region_id.table_id(),
263 region_number: region_id.region_number(),
264 datanode_id: 2,
265 engine: MITO_ENGINE_NAME.to_string(),
266 },
267 path,
268 HashMap::new(),
269 HashMap::new(),
270 false,
271 ))
272 }
273
274 #[tokio::test]
275 async fn test_close_region() {
276 common_telemetry::init_default_ut_logging();
277
278 let mut region_server = mock_region_server();
279 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
280
281 let mut engine_env = TestEnv::with_prefix("close-region");
282 let engine = engine_env.create_engine(MitoConfig::default()).await;
283 region_server.register_engine(Arc::new(engine));
284 let region_id = RegionId::new(1024, 1);
285
286 let builder = CreateRequestBuilder::new();
287 let create_req = builder.build();
288 region_server
289 .handle_request(region_id, RegionRequest::Create(create_req))
290 .await
291 .unwrap();
292
293 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
294
295 for _ in 0..2 {
297 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
298 let instruction = close_region_instruction(region_id);
299
300 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
301 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
302 assert_matches!(control, HandleControl::Continue);
303
304 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
305
306 if let InstructionReply::CloseRegion(reply) = reply {
307 assert!(reply.result);
308 assert!(reply.error.is_none());
309 } else {
310 unreachable!()
311 }
312
313 assert_matches!(
314 region_server
315 .set_region_role(region_id, RegionRole::Leader)
316 .unwrap_err(),
317 error::Error::RegionNotFound { .. }
318 );
319 }
320 }
321
322 #[tokio::test]
323 async fn test_open_region_ok() {
324 common_telemetry::init_default_ut_logging();
325
326 let mut region_server = mock_region_server();
327 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
328
329 let mut engine_env = TestEnv::with_prefix("open-region");
330 let engine = engine_env.create_engine(MitoConfig::default()).await;
331 region_server.register_engine(Arc::new(engine));
332 let region_id = RegionId::new(1024, 1);
333
334 let builder = CreateRequestBuilder::new();
335 let mut create_req = builder.build();
336 let storage_path = "test";
337 create_req.region_dir = region_dir(storage_path, region_id);
338
339 region_server
340 .handle_request(region_id, RegionRequest::Create(create_req))
341 .await
342 .unwrap();
343
344 region_server
345 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
346 .await
347 .unwrap();
348 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
349
350 for _ in 0..2 {
352 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
353 let instruction = open_region_instruction(region_id, storage_path);
354
355 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
356 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
357 assert_matches!(control, HandleControl::Continue);
358
359 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
360
361 if let InstructionReply::OpenRegion(reply) = reply {
362 assert!(reply.result);
363 assert!(reply.error.is_none());
364 } else {
365 unreachable!()
366 }
367 }
368 }
369
370 #[tokio::test]
371 async fn test_open_not_exists_region() {
372 common_telemetry::init_default_ut_logging();
373
374 let mut region_server = mock_region_server();
375 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
376
377 let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
378 let engine = engine_env.create_engine(MitoConfig::default()).await;
379 region_server.register_engine(Arc::new(engine));
380 let region_id = RegionId::new(1024, 1);
381 let storage_path = "test";
382
383 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
384
385 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
386 let instruction = open_region_instruction(region_id, storage_path);
387
388 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
389 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
390 assert_matches!(control, HandleControl::Continue);
391
392 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
393
394 if let InstructionReply::OpenRegion(reply) = reply {
395 assert!(!reply.result);
396 assert!(reply.error.is_some());
397 } else {
398 unreachable!()
399 }
400 }
401
402 #[tokio::test]
403 async fn test_downgrade_region() {
404 common_telemetry::init_default_ut_logging();
405
406 let mut region_server = mock_region_server();
407 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
408
409 let mut engine_env = TestEnv::with_prefix("downgrade-region");
410 let engine = engine_env.create_engine(MitoConfig::default()).await;
411 region_server.register_engine(Arc::new(engine));
412 let region_id = RegionId::new(1024, 1);
413
414 let builder = CreateRequestBuilder::new();
415 let mut create_req = builder.build();
416 let storage_path = "test";
417 create_req.region_dir = region_dir(storage_path, region_id);
418
419 region_server
420 .handle_request(region_id, RegionRequest::Create(create_req))
421 .await
422 .unwrap();
423
424 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
425
426 for _ in 0..2 {
428 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
429 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
430 region_id,
431 flush_timeout: Some(Duration::from_secs(1)),
432 });
433
434 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
435 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
436 assert_matches!(control, HandleControl::Continue);
437
438 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
439
440 if let InstructionReply::DowngradeRegion(reply) = reply {
441 assert!(reply.exists);
442 assert!(reply.error.is_none());
443 assert_eq!(reply.last_entry_id.unwrap(), 0);
444 } else {
445 unreachable!()
446 }
447 }
448
449 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
451 let instruction = Instruction::DowngradeRegion(DowngradeRegion {
452 region_id: RegionId::new(2048, 1),
453 flush_timeout: Some(Duration::from_secs(1)),
454 });
455 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
456 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
457 assert_matches!(control, HandleControl::Continue);
458
459 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
460
461 if let InstructionReply::DowngradeRegion(reply) = reply {
462 assert!(!reply.exists);
463 assert!(reply.error.is_none());
464 assert!(reply.last_entry_id.is_none());
465 } else {
466 unreachable!()
467 }
468 }
469}