datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::RegionIdent;
22use common_telemetry::error;
23use futures::future::BoxFuture;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26
27mod close_region;
28mod downgrade_region;
29mod flush_region;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::task_tracker::TaskTracker;
34use crate::region_server::RegionServer;
35
36/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
37#[derive(Clone)]
38pub struct RegionHeartbeatResponseHandler {
39    region_server: RegionServer,
40    catchup_tasks: TaskTracker<()>,
41    downgrade_tasks: TaskTracker<()>,
42    flush_tasks: TaskTracker<()>,
43}
44
45/// Handler of the instruction.
46pub type InstructionHandler =
47    Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
48
49#[derive(Clone)]
50pub struct HandlerContext {
51    region_server: RegionServer,
52    catchup_tasks: TaskTracker<()>,
53    downgrade_tasks: TaskTracker<()>,
54    flush_tasks: TaskTracker<()>,
55}
56
57impl HandlerContext {
58    fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
59        RegionId::new(region_ident.table_id, region_ident.region_number)
60    }
61
62    #[cfg(test)]
63    pub fn new_for_test(region_server: RegionServer) -> Self {
64        Self {
65            region_server,
66            catchup_tasks: TaskTracker::new(),
67            downgrade_tasks: TaskTracker::new(),
68            flush_tasks: TaskTracker::new(),
69        }
70    }
71}
72
73impl RegionHeartbeatResponseHandler {
74    /// Returns the [RegionHeartbeatResponseHandler].
75    pub fn new(region_server: RegionServer) -> Self {
76        Self {
77            region_server,
78            catchup_tasks: TaskTracker::new(),
79            downgrade_tasks: TaskTracker::new(),
80            flush_tasks: TaskTracker::new(),
81        }
82    }
83
84    /// Builds the [InstructionHandler].
85    fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
86        match instruction {
87            Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
88                handler_context.handle_open_region_instruction(open_region)
89            })),
90            Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
91                handler_context.handle_close_region_instruction(close_region)
92            })),
93            Instruction::DowngradeRegion(downgrade_region) => {
94                Ok(Box::new(move |handler_context| {
95                    handler_context.handle_downgrade_region_instruction(downgrade_region)
96                }))
97            }
98            Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
99                handler_context.handle_upgrade_region_instruction(upgrade_region)
100            })),
101            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
102            Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
103                handler_context.handle_flush_regions_instruction(flush_regions)
104            })),
105        }
106    }
107}
108
109#[async_trait]
110impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
111    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
112        matches!(
113            ctx.incoming_message.as_ref(),
114            Some((_, Instruction::OpenRegion { .. }))
115                | Some((_, Instruction::CloseRegion { .. }))
116                | Some((_, Instruction::DowngradeRegion { .. }))
117                | Some((_, Instruction::UpgradeRegion { .. }))
118                | Some((_, Instruction::FlushRegions { .. }))
119        )
120    }
121
122    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
123        let (meta, instruction) = ctx
124            .incoming_message
125            .take()
126            .context(InvalidHeartbeatResponseSnafu)?;
127
128        let mailbox = ctx.mailbox.clone();
129        let region_server = self.region_server.clone();
130        let catchup_tasks = self.catchup_tasks.clone();
131        let downgrade_tasks = self.downgrade_tasks.clone();
132        let flush_tasks = self.flush_tasks.clone();
133        let handler = Self::build_handler(instruction)?;
134        let _handle = common_runtime::spawn_global(async move {
135            let reply = handler(HandlerContext {
136                region_server,
137                catchup_tasks,
138                downgrade_tasks,
139                flush_tasks,
140            })
141            .await;
142
143            if let Some(reply) = reply {
144                if let Err(e) = mailbox.send((meta, reply)).await {
145                    error!(e; "Failed to send reply to mailbox");
146                }
147            }
148        });
149
150        Ok(HandleControl::Continue)
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use std::assert_matches::assert_matches;
157    use std::collections::HashMap;
158    use std::sync::Arc;
159    use std::time::Duration;
160
161    use common_meta::heartbeat::mailbox::{
162        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
163    };
164    use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
165    use mito2::config::MitoConfig;
166    use mito2::engine::MITO_ENGINE_NAME;
167    use mito2::test_util::{CreateRequestBuilder, TestEnv};
168    use store_api::path_utils::table_dir;
169    use store_api::region_engine::RegionRole;
170    use store_api::region_request::{RegionCloseRequest, RegionRequest};
171    use store_api::storage::RegionId;
172    use tokio::sync::mpsc::{self, Receiver};
173
174    use super::*;
175    use crate::error;
176    use crate::tests::mock_region_server;
177
178    pub struct HeartbeatResponseTestEnv {
179        mailbox: MailboxRef,
180        receiver: Receiver<(MessageMeta, InstructionReply)>,
181    }
182
183    impl HeartbeatResponseTestEnv {
184        pub fn new() -> Self {
185            let (tx, rx) = mpsc::channel(8);
186            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
187
188            HeartbeatResponseTestEnv {
189                mailbox,
190                receiver: rx,
191            }
192        }
193
194        pub fn create_handler_ctx(
195            &self,
196            incoming_message: IncomingMessage,
197        ) -> HeartbeatResponseHandlerContext {
198            HeartbeatResponseHandlerContext {
199                mailbox: self.mailbox.clone(),
200                response: Default::default(),
201                incoming_message: Some(incoming_message),
202            }
203        }
204    }
205
206    #[test]
207    fn test_is_acceptable() {
208        common_telemetry::init_default_ut_logging();
209        let region_server = mock_region_server();
210        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
211        let heartbeat_env = HeartbeatResponseTestEnv::new();
212        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
213
214        // Open region
215        let region_id = RegionId::new(1024, 1);
216        let storage_path = "test";
217        let instruction = open_region_instruction(region_id, storage_path);
218        assert!(heartbeat_handler
219            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
220
221        // Close region
222        let instruction = close_region_instruction(region_id);
223        assert!(heartbeat_handler
224            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
225
226        // Downgrade region
227        let instruction = Instruction::DowngradeRegion(DowngradeRegion {
228            region_id: RegionId::new(2048, 1),
229            flush_timeout: Some(Duration::from_secs(1)),
230        });
231        assert!(heartbeat_handler
232            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
233
234        // Upgrade region
235        let instruction = Instruction::UpgradeRegion(UpgradeRegion {
236            region_id,
237            ..Default::default()
238        });
239        assert!(
240            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
241        );
242    }
243
244    fn close_region_instruction(region_id: RegionId) -> Instruction {
245        Instruction::CloseRegion(RegionIdent {
246            table_id: region_id.table_id(),
247            region_number: region_id.region_number(),
248            datanode_id: 2,
249            engine: MITO_ENGINE_NAME.to_string(),
250        })
251    }
252
253    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
254        Instruction::OpenRegion(OpenRegion::new(
255            RegionIdent {
256                table_id: region_id.table_id(),
257                region_number: region_id.region_number(),
258                datanode_id: 2,
259                engine: MITO_ENGINE_NAME.to_string(),
260            },
261            path,
262            HashMap::new(),
263            HashMap::new(),
264            false,
265        ))
266    }
267
268    #[tokio::test]
269    async fn test_close_region() {
270        common_telemetry::init_default_ut_logging();
271
272        let mut region_server = mock_region_server();
273        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
274
275        let mut engine_env = TestEnv::with_prefix("close-region").await;
276        let engine = engine_env.create_engine(MitoConfig::default()).await;
277        region_server.register_engine(Arc::new(engine));
278        let region_id = RegionId::new(1024, 1);
279
280        let builder = CreateRequestBuilder::new();
281        let create_req = builder.build();
282        region_server
283            .handle_request(region_id, RegionRequest::Create(create_req))
284            .await
285            .unwrap();
286
287        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
288
289        // Should be ok, if we try to close it twice.
290        for _ in 0..2 {
291            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
292            let instruction = close_region_instruction(region_id);
293
294            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
295            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
296            assert_matches!(control, HandleControl::Continue);
297
298            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
299
300            if let InstructionReply::CloseRegion(reply) = reply {
301                assert!(reply.result);
302                assert!(reply.error.is_none());
303            } else {
304                unreachable!()
305            }
306
307            assert_matches!(
308                region_server
309                    .set_region_role(region_id, RegionRole::Leader)
310                    .unwrap_err(),
311                error::Error::RegionNotFound { .. }
312            );
313        }
314    }
315
316    #[tokio::test]
317    async fn test_open_region_ok() {
318        common_telemetry::init_default_ut_logging();
319
320        let mut region_server = mock_region_server();
321        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
322
323        let mut engine_env = TestEnv::with_prefix("open-region").await;
324        let engine = engine_env.create_engine(MitoConfig::default()).await;
325        region_server.register_engine(Arc::new(engine));
326        let region_id = RegionId::new(1024, 1);
327
328        let builder = CreateRequestBuilder::new();
329        let mut create_req = builder.build();
330        let storage_path = "test";
331        create_req.table_dir = table_dir(storage_path, region_id.table_id());
332
333        region_server
334            .handle_request(region_id, RegionRequest::Create(create_req))
335            .await
336            .unwrap();
337
338        region_server
339            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
340            .await
341            .unwrap();
342        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
343
344        // Should be ok, if we try to open it twice.
345        for _ in 0..2 {
346            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
347            let instruction = open_region_instruction(region_id, storage_path);
348
349            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
350            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
351            assert_matches!(control, HandleControl::Continue);
352
353            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
354
355            if let InstructionReply::OpenRegion(reply) = reply {
356                assert!(reply.result);
357                assert!(reply.error.is_none());
358            } else {
359                unreachable!()
360            }
361        }
362    }
363
364    #[tokio::test]
365    async fn test_open_not_exists_region() {
366        common_telemetry::init_default_ut_logging();
367
368        let mut region_server = mock_region_server();
369        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
370
371        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
372        let engine = engine_env.create_engine(MitoConfig::default()).await;
373        region_server.register_engine(Arc::new(engine));
374        let region_id = RegionId::new(1024, 1);
375        let storage_path = "test";
376
377        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
378
379        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
380        let instruction = open_region_instruction(region_id, storage_path);
381
382        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
383        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
384        assert_matches!(control, HandleControl::Continue);
385
386        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
387
388        if let InstructionReply::OpenRegion(reply) = reply {
389            assert!(!reply.result);
390            assert!(reply.error.is_some());
391        } else {
392            unreachable!()
393        }
394    }
395
396    #[tokio::test]
397    async fn test_downgrade_region() {
398        common_telemetry::init_default_ut_logging();
399
400        let mut region_server = mock_region_server();
401        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
402
403        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
404        let engine = engine_env.create_engine(MitoConfig::default()).await;
405        region_server.register_engine(Arc::new(engine));
406        let region_id = RegionId::new(1024, 1);
407
408        let builder = CreateRequestBuilder::new();
409        let mut create_req = builder.build();
410        let storage_path = "test";
411        create_req.table_dir = table_dir(storage_path, region_id.table_id());
412
413        region_server
414            .handle_request(region_id, RegionRequest::Create(create_req))
415            .await
416            .unwrap();
417
418        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
419
420        // Should be ok, if we try to downgrade it twice.
421        for _ in 0..2 {
422            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
423            let instruction = Instruction::DowngradeRegion(DowngradeRegion {
424                region_id,
425                flush_timeout: Some(Duration::from_secs(1)),
426            });
427
428            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
429            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
430            assert_matches!(control, HandleControl::Continue);
431
432            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
433
434            if let InstructionReply::DowngradeRegion(reply) = reply {
435                assert!(reply.exists);
436                assert!(reply.error.is_none());
437                assert_eq!(reply.last_entry_id.unwrap(), 0);
438            } else {
439                unreachable!()
440            }
441        }
442
443        // Downgrades a not exists region.
444        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
445        let instruction = Instruction::DowngradeRegion(DowngradeRegion {
446            region_id: RegionId::new(2048, 1),
447            flush_timeout: Some(Duration::from_secs(1)),
448        });
449        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
450        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
451        assert_matches!(control, HandleControl::Continue);
452
453        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
454
455        if let InstructionReply::DowngradeRegion(reply) = reply {
456            assert!(!reply.exists);
457            assert!(reply.error.is_none());
458            assert!(reply.last_entry_id.is_none());
459        } else {
460            unreachable!()
461        }
462    }
463}