1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23
24mod close_region;
25mod downgrade_region;
26mod flush_region;
27mod open_region;
28mod upgrade_region;
29
30use crate::heartbeat::handler::close_region::CloseRegionsHandler;
31use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
32use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
33use crate::heartbeat::handler::open_region::OpenRegionsHandler;
34use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
35use crate::heartbeat::task_tracker::TaskTracker;
36use crate::region_server::RegionServer;
37
38#[derive(Clone)]
40pub struct RegionHeartbeatResponseHandler {
41 region_server: RegionServer,
42 catchup_tasks: TaskTracker<()>,
43 downgrade_tasks: TaskTracker<()>,
44 flush_tasks: TaskTracker<()>,
45 open_region_parallelism: usize,
46}
47
48#[async_trait::async_trait]
49pub trait InstructionHandler: Send + Sync {
50 async fn handle(
51 &self,
52 ctx: &HandlerContext,
53 instruction: Instruction,
54 ) -> Option<InstructionReply>;
55}
56
57#[derive(Clone)]
58pub struct HandlerContext {
59 region_server: RegionServer,
60 catchup_tasks: TaskTracker<()>,
61 downgrade_tasks: TaskTracker<()>,
62 flush_tasks: TaskTracker<()>,
63}
64
65impl HandlerContext {
66 #[cfg(test)]
67 pub fn new_for_test(region_server: RegionServer) -> Self {
68 Self {
69 region_server,
70 catchup_tasks: TaskTracker::new(),
71 downgrade_tasks: TaskTracker::new(),
72 flush_tasks: TaskTracker::new(),
73 }
74 }
75}
76
77impl RegionHeartbeatResponseHandler {
78 pub fn new(region_server: RegionServer) -> Self {
80 Self {
81 region_server,
82 catchup_tasks: TaskTracker::new(),
83 downgrade_tasks: TaskTracker::new(),
84 flush_tasks: TaskTracker::new(),
85 open_region_parallelism: (num_cpus::get() / 2).max(1),
87 }
88 }
89
90 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
92 self.open_region_parallelism = parallelism;
93 self
94 }
95
96 fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<dyn InstructionHandler>> {
97 match instruction {
98 Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)),
99 Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler {
100 open_region_parallelism: self.open_region_parallelism,
101 })),
102 Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
103 Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
104 Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
105 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
106 }
107 }
108}
109
110#[async_trait]
111impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
112 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
113 matches!(ctx.incoming_message.as_ref(), |Some((
114 _,
115 Instruction::DowngradeRegions { .. },
116 ))| Some((
117 _,
118 Instruction::UpgradeRegion { .. }
119 )) | Some((
120 _,
121 Instruction::FlushRegions { .. }
122 )) | Some((
123 _,
124 Instruction::OpenRegions { .. }
125 )) | Some((
126 _,
127 Instruction::CloseRegions { .. }
128 )))
129 }
130
131 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
132 let (meta, instruction) = ctx
133 .incoming_message
134 .take()
135 .context(InvalidHeartbeatResponseSnafu)?;
136
137 let mailbox = ctx.mailbox.clone();
138 let region_server = self.region_server.clone();
139 let catchup_tasks = self.catchup_tasks.clone();
140 let downgrade_tasks = self.downgrade_tasks.clone();
141 let flush_tasks = self.flush_tasks.clone();
142 let handler = self.build_handler(&instruction)?;
143 let _handle = common_runtime::spawn_global(async move {
144 let reply = handler
145 .handle(
146 &HandlerContext {
147 region_server,
148 catchup_tasks,
149 downgrade_tasks,
150 flush_tasks,
151 },
152 instruction,
153 )
154 .await;
155
156 if let Some(reply) = reply
157 && let Err(e) = mailbox.send((meta, reply)).await
158 {
159 error!(e; "Failed to send reply to mailbox");
160 }
161 });
162
163 Ok(HandleControl::Continue)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::assert_matches::assert_matches;
170 use std::collections::HashMap;
171 use std::sync::Arc;
172 use std::time::Duration;
173
174 use common_meta::RegionIdent;
175 use common_meta::heartbeat::mailbox::{
176 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
177 };
178 use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
179 use mito2::config::MitoConfig;
180 use mito2::engine::MITO_ENGINE_NAME;
181 use mito2::test_util::{CreateRequestBuilder, TestEnv};
182 use store_api::path_utils::table_dir;
183 use store_api::region_engine::RegionRole;
184 use store_api::region_request::{RegionCloseRequest, RegionRequest};
185 use store_api::storage::RegionId;
186 use tokio::sync::mpsc::{self, Receiver};
187
188 use super::*;
189 use crate::error;
190 use crate::tests::mock_region_server;
191
192 pub struct HeartbeatResponseTestEnv {
193 pub(crate) mailbox: MailboxRef,
194 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
195 }
196
197 impl HeartbeatResponseTestEnv {
198 pub fn new() -> Self {
199 let (tx, rx) = mpsc::channel(8);
200 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
201
202 HeartbeatResponseTestEnv {
203 mailbox,
204 receiver: rx,
205 }
206 }
207
208 pub fn create_handler_ctx(
209 &self,
210 incoming_message: IncomingMessage,
211 ) -> HeartbeatResponseHandlerContext {
212 HeartbeatResponseHandlerContext {
213 mailbox: self.mailbox.clone(),
214 response: Default::default(),
215 incoming_message: Some(incoming_message),
216 }
217 }
218 }
219
220 #[test]
221 fn test_is_acceptable() {
222 common_telemetry::init_default_ut_logging();
223 let region_server = mock_region_server();
224 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
225 let heartbeat_env = HeartbeatResponseTestEnv::new();
226 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
227
228 let region_id = RegionId::new(1024, 1);
230 let storage_path = "test";
231 let instruction = open_region_instruction(region_id, storage_path);
232 assert!(
233 heartbeat_handler
234 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
235 );
236
237 let instruction = close_region_instruction(region_id);
239 assert!(
240 heartbeat_handler
241 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
242 );
243
244 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
246 region_id: RegionId::new(2048, 1),
247 flush_timeout: Some(Duration::from_secs(1)),
248 }]);
249 assert!(
250 heartbeat_handler
251 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
252 );
253
254 let instruction = Instruction::UpgradeRegion(UpgradeRegion {
256 region_id,
257 ..Default::default()
258 });
259 assert!(
260 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
261 );
262 }
263
264 fn close_region_instruction(region_id: RegionId) -> Instruction {
265 Instruction::CloseRegions(vec![RegionIdent {
266 table_id: region_id.table_id(),
267 region_number: region_id.region_number(),
268 datanode_id: 2,
269 engine: MITO_ENGINE_NAME.to_string(),
270 }])
271 }
272
273 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
274 Instruction::OpenRegions(vec![OpenRegion::new(
275 RegionIdent {
276 table_id: region_id.table_id(),
277 region_number: region_id.region_number(),
278 datanode_id: 2,
279 engine: MITO_ENGINE_NAME.to_string(),
280 },
281 path,
282 HashMap::new(),
283 HashMap::new(),
284 false,
285 )])
286 }
287
288 #[tokio::test]
289 async fn test_close_region() {
290 common_telemetry::init_default_ut_logging();
291
292 let mut region_server = mock_region_server();
293 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
294
295 let mut engine_env = TestEnv::with_prefix("close-region").await;
296 let engine = engine_env.create_engine(MitoConfig::default()).await;
297 region_server.register_engine(Arc::new(engine));
298 let region_id = RegionId::new(1024, 1);
299
300 let builder = CreateRequestBuilder::new();
301 let create_req = builder.build();
302 region_server
303 .handle_request(region_id, RegionRequest::Create(create_req))
304 .await
305 .unwrap();
306
307 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
308
309 for _ in 0..2 {
311 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
312 let instruction = close_region_instruction(region_id);
313
314 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
315 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
316 assert_matches!(control, HandleControl::Continue);
317
318 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
319
320 if let InstructionReply::CloseRegions(reply) = reply {
321 assert!(reply.result);
322 assert!(reply.error.is_none());
323 } else {
324 unreachable!()
325 }
326
327 assert_matches!(
328 region_server
329 .set_region_role(region_id, RegionRole::Leader)
330 .unwrap_err(),
331 error::Error::RegionNotFound { .. }
332 );
333 }
334 }
335
336 #[tokio::test]
337 async fn test_open_region_ok() {
338 common_telemetry::init_default_ut_logging();
339
340 let mut region_server = mock_region_server();
341 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
342
343 let mut engine_env = TestEnv::with_prefix("open-region").await;
344 let engine = engine_env.create_engine(MitoConfig::default()).await;
345 region_server.register_engine(Arc::new(engine));
346 let region_id = RegionId::new(1024, 1);
347
348 let builder = CreateRequestBuilder::new();
349 let mut create_req = builder.build();
350 let storage_path = "test";
351 create_req.table_dir = table_dir(storage_path, region_id.table_id());
352
353 region_server
354 .handle_request(region_id, RegionRequest::Create(create_req))
355 .await
356 .unwrap();
357
358 region_server
359 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
360 .await
361 .unwrap();
362 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
363
364 for _ in 0..2 {
366 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
367 let instruction = open_region_instruction(region_id, storage_path);
368
369 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
370 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
371 assert_matches!(control, HandleControl::Continue);
372
373 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
374
375 if let InstructionReply::OpenRegions(reply) = reply {
376 assert!(reply.result);
377 assert!(reply.error.is_none());
378 } else {
379 unreachable!()
380 }
381 }
382 }
383
384 #[tokio::test]
385 async fn test_open_not_exists_region() {
386 common_telemetry::init_default_ut_logging();
387
388 let mut region_server = mock_region_server();
389 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
390
391 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
392 let engine = engine_env.create_engine(MitoConfig::default()).await;
393 region_server.register_engine(Arc::new(engine));
394 let region_id = RegionId::new(1024, 1);
395 let storage_path = "test";
396
397 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
398
399 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
400 let instruction = open_region_instruction(region_id, storage_path);
401
402 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
403 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
404 assert_matches!(control, HandleControl::Continue);
405
406 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
407
408 if let InstructionReply::OpenRegions(reply) = reply {
409 assert!(!reply.result);
410 assert!(reply.error.is_some());
411 } else {
412 unreachable!()
413 }
414 }
415
416 #[tokio::test]
417 async fn test_downgrade_region() {
418 common_telemetry::init_default_ut_logging();
419
420 let mut region_server = mock_region_server();
421 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
422
423 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
424 let engine = engine_env.create_engine(MitoConfig::default()).await;
425 region_server.register_engine(Arc::new(engine));
426 let region_id = RegionId::new(1024, 1);
427
428 let builder = CreateRequestBuilder::new();
429 let mut create_req = builder.build();
430 let storage_path = "test";
431 create_req.table_dir = table_dir(storage_path, region_id.table_id());
432
433 region_server
434 .handle_request(region_id, RegionRequest::Create(create_req))
435 .await
436 .unwrap();
437
438 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
439
440 for _ in 0..2 {
442 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
443 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
444 region_id,
445 flush_timeout: Some(Duration::from_secs(1)),
446 }]);
447
448 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
449 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
450 assert_matches!(control, HandleControl::Continue);
451
452 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
453
454 let reply = &reply.expect_downgrade_regions_reply()[0];
455 assert!(reply.exists);
456 assert!(reply.error.is_none());
457 assert_eq!(reply.last_entry_id.unwrap(), 0);
458 }
459
460 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
462 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
463 region_id: RegionId::new(2048, 1),
464 flush_timeout: Some(Duration::from_secs(1)),
465 }]);
466 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
467 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
468 assert_matches!(control, HandleControl::Continue);
469
470 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
471
472 let reply = reply.expect_downgrade_regions_reply();
473 assert!(!reply[0].exists);
474 assert!(reply[0].error.is_none());
475 assert!(reply[0].last_entry_id.is_none());
476 }
477}