datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::RegionIdent;
22use common_telemetry::error;
23use futures::future::BoxFuture;
24use snafu::OptionExt;
25use store_api::storage::RegionId;
26
27mod close_region;
28mod downgrade_region;
29mod flush_region;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::task_tracker::TaskTracker;
34use crate::region_server::RegionServer;
35
36/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
37#[derive(Clone)]
38pub struct RegionHeartbeatResponseHandler {
39    region_server: RegionServer,
40    catchup_tasks: TaskTracker<()>,
41    downgrade_tasks: TaskTracker<()>,
42    flush_tasks: TaskTracker<()>,
43}
44
45/// Handler of the instruction.
46pub type InstructionHandler =
47    Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
48
49#[derive(Clone)]
50pub struct HandlerContext {
51    region_server: RegionServer,
52    catchup_tasks: TaskTracker<()>,
53    downgrade_tasks: TaskTracker<()>,
54    flush_tasks: TaskTracker<()>,
55}
56
57impl HandlerContext {
58    fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
59        RegionId::new(region_ident.table_id, region_ident.region_number)
60    }
61
62    #[cfg(test)]
63    pub fn new_for_test(region_server: RegionServer) -> Self {
64        Self {
65            region_server,
66            catchup_tasks: TaskTracker::new(),
67            downgrade_tasks: TaskTracker::new(),
68            flush_tasks: TaskTracker::new(),
69        }
70    }
71}
72
73impl RegionHeartbeatResponseHandler {
74    /// Returns the [RegionHeartbeatResponseHandler].
75    pub fn new(region_server: RegionServer) -> Self {
76        Self {
77            region_server,
78            catchup_tasks: TaskTracker::new(),
79            downgrade_tasks: TaskTracker::new(),
80            flush_tasks: TaskTracker::new(),
81        }
82    }
83
84    /// Builds the [InstructionHandler].
85    fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
86        match instruction {
87            Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
88                handler_context.handle_open_region_instruction(open_region)
89            })),
90            Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
91                handler_context.handle_close_region_instruction(close_region)
92            })),
93            Instruction::DowngradeRegion(downgrade_region) => {
94                Ok(Box::new(move |handler_context| {
95                    handler_context.handle_downgrade_region_instruction(downgrade_region)
96                }))
97            }
98            Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
99                handler_context.handle_upgrade_region_instruction(upgrade_region)
100            })),
101            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
102            Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
103                handler_context.handle_flush_regions_instruction(flush_regions)
104            })),
105            Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
106                handler_context.handle_flush_region_instruction(flush_region)
107            })),
108        }
109    }
110}
111
112#[async_trait]
113impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
114    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
115        matches!(
116            ctx.incoming_message.as_ref(),
117            Some((_, Instruction::OpenRegion { .. }))
118                | Some((_, Instruction::CloseRegion { .. }))
119                | Some((_, Instruction::DowngradeRegion { .. }))
120                | Some((_, Instruction::UpgradeRegion { .. }))
121                | Some((_, Instruction::FlushRegion { .. }))
122        )
123    }
124
125    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
126        let (meta, instruction) = ctx
127            .incoming_message
128            .take()
129            .context(InvalidHeartbeatResponseSnafu)?;
130
131        let mailbox = ctx.mailbox.clone();
132        let region_server = self.region_server.clone();
133        let catchup_tasks = self.catchup_tasks.clone();
134        let downgrade_tasks = self.downgrade_tasks.clone();
135        let flush_tasks = self.flush_tasks.clone();
136        let handler = Self::build_handler(instruction)?;
137        let _handle = common_runtime::spawn_global(async move {
138            let reply = handler(HandlerContext {
139                region_server,
140                catchup_tasks,
141                downgrade_tasks,
142                flush_tasks,
143            })
144            .await;
145
146            if let Some(reply) = reply {
147                if let Err(e) = mailbox.send((meta, reply)).await {
148                    error!(e; "Failed to send reply to mailbox");
149                }
150            }
151        });
152
153        Ok(HandleControl::Continue)
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use std::assert_matches::assert_matches;
160    use std::collections::HashMap;
161    use std::sync::Arc;
162    use std::time::Duration;
163
164    use common_meta::heartbeat::mailbox::{
165        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
166    };
167    use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
168    use mito2::config::MitoConfig;
169    use mito2::engine::MITO_ENGINE_NAME;
170    use mito2::test_util::{CreateRequestBuilder, TestEnv};
171    use store_api::path_utils::region_dir;
172    use store_api::region_engine::RegionRole;
173    use store_api::region_request::{RegionCloseRequest, RegionRequest};
174    use store_api::storage::RegionId;
175    use tokio::sync::mpsc::{self, Receiver};
176
177    use super::*;
178    use crate::error;
179    use crate::tests::mock_region_server;
180
181    pub struct HeartbeatResponseTestEnv {
182        mailbox: MailboxRef,
183        receiver: Receiver<(MessageMeta, InstructionReply)>,
184    }
185
186    impl HeartbeatResponseTestEnv {
187        pub fn new() -> Self {
188            let (tx, rx) = mpsc::channel(8);
189            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
190
191            HeartbeatResponseTestEnv {
192                mailbox,
193                receiver: rx,
194            }
195        }
196
197        pub fn create_handler_ctx(
198            &self,
199            incoming_message: IncomingMessage,
200        ) -> HeartbeatResponseHandlerContext {
201            HeartbeatResponseHandlerContext {
202                mailbox: self.mailbox.clone(),
203                response: Default::default(),
204                incoming_message: Some(incoming_message),
205            }
206        }
207    }
208
209    #[test]
210    fn test_is_acceptable() {
211        common_telemetry::init_default_ut_logging();
212        let region_server = mock_region_server();
213        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
214        let heartbeat_env = HeartbeatResponseTestEnv::new();
215        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
216
217        // Open region
218        let region_id = RegionId::new(1024, 1);
219        let storage_path = "test";
220        let instruction = open_region_instruction(region_id, storage_path);
221        assert!(heartbeat_handler
222            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
223
224        // Close region
225        let instruction = close_region_instruction(region_id);
226        assert!(heartbeat_handler
227            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
228
229        // Downgrade region
230        let instruction = Instruction::DowngradeRegion(DowngradeRegion {
231            region_id: RegionId::new(2048, 1),
232            flush_timeout: Some(Duration::from_secs(1)),
233        });
234        assert!(heartbeat_handler
235            .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
236
237        // Upgrade region
238        let instruction = Instruction::UpgradeRegion(UpgradeRegion {
239            region_id,
240            last_entry_id: None,
241            metadata_last_entry_id: None,
242            replay_timeout: None,
243            location_id: None,
244        });
245        assert!(
246            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
247        );
248    }
249
250    fn close_region_instruction(region_id: RegionId) -> Instruction {
251        Instruction::CloseRegion(RegionIdent {
252            table_id: region_id.table_id(),
253            region_number: region_id.region_number(),
254            datanode_id: 2,
255            engine: MITO_ENGINE_NAME.to_string(),
256        })
257    }
258
259    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
260        Instruction::OpenRegion(OpenRegion::new(
261            RegionIdent {
262                table_id: region_id.table_id(),
263                region_number: region_id.region_number(),
264                datanode_id: 2,
265                engine: MITO_ENGINE_NAME.to_string(),
266            },
267            path,
268            HashMap::new(),
269            HashMap::new(),
270            false,
271        ))
272    }
273
274    #[tokio::test]
275    async fn test_close_region() {
276        common_telemetry::init_default_ut_logging();
277
278        let mut region_server = mock_region_server();
279        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
280
281        let mut engine_env = TestEnv::with_prefix("close-region");
282        let engine = engine_env.create_engine(MitoConfig::default()).await;
283        region_server.register_engine(Arc::new(engine));
284        let region_id = RegionId::new(1024, 1);
285
286        let builder = CreateRequestBuilder::new();
287        let create_req = builder.build();
288        region_server
289            .handle_request(region_id, RegionRequest::Create(create_req))
290            .await
291            .unwrap();
292
293        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
294
295        // Should be ok, if we try to close it twice.
296        for _ in 0..2 {
297            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
298            let instruction = close_region_instruction(region_id);
299
300            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
301            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
302            assert_matches!(control, HandleControl::Continue);
303
304            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
305
306            if let InstructionReply::CloseRegion(reply) = reply {
307                assert!(reply.result);
308                assert!(reply.error.is_none());
309            } else {
310                unreachable!()
311            }
312
313            assert_matches!(
314                region_server
315                    .set_region_role(region_id, RegionRole::Leader)
316                    .unwrap_err(),
317                error::Error::RegionNotFound { .. }
318            );
319        }
320    }
321
322    #[tokio::test]
323    async fn test_open_region_ok() {
324        common_telemetry::init_default_ut_logging();
325
326        let mut region_server = mock_region_server();
327        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
328
329        let mut engine_env = TestEnv::with_prefix("open-region");
330        let engine = engine_env.create_engine(MitoConfig::default()).await;
331        region_server.register_engine(Arc::new(engine));
332        let region_id = RegionId::new(1024, 1);
333
334        let builder = CreateRequestBuilder::new();
335        let mut create_req = builder.build();
336        let storage_path = "test";
337        create_req.region_dir = region_dir(storage_path, region_id);
338
339        region_server
340            .handle_request(region_id, RegionRequest::Create(create_req))
341            .await
342            .unwrap();
343
344        region_server
345            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
346            .await
347            .unwrap();
348        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
349
350        // Should be ok, if we try to open it twice.
351        for _ in 0..2 {
352            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
353            let instruction = open_region_instruction(region_id, storage_path);
354
355            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
356            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
357            assert_matches!(control, HandleControl::Continue);
358
359            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
360
361            if let InstructionReply::OpenRegion(reply) = reply {
362                assert!(reply.result);
363                assert!(reply.error.is_none());
364            } else {
365                unreachable!()
366            }
367        }
368    }
369
370    #[tokio::test]
371    async fn test_open_not_exists_region() {
372        common_telemetry::init_default_ut_logging();
373
374        let mut region_server = mock_region_server();
375        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
376
377        let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
378        let engine = engine_env.create_engine(MitoConfig::default()).await;
379        region_server.register_engine(Arc::new(engine));
380        let region_id = RegionId::new(1024, 1);
381        let storage_path = "test";
382
383        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
384
385        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
386        let instruction = open_region_instruction(region_id, storage_path);
387
388        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
389        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
390        assert_matches!(control, HandleControl::Continue);
391
392        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
393
394        if let InstructionReply::OpenRegion(reply) = reply {
395            assert!(!reply.result);
396            assert!(reply.error.is_some());
397        } else {
398            unreachable!()
399        }
400    }
401
402    #[tokio::test]
403    async fn test_downgrade_region() {
404        common_telemetry::init_default_ut_logging();
405
406        let mut region_server = mock_region_server();
407        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
408
409        let mut engine_env = TestEnv::with_prefix("downgrade-region");
410        let engine = engine_env.create_engine(MitoConfig::default()).await;
411        region_server.register_engine(Arc::new(engine));
412        let region_id = RegionId::new(1024, 1);
413
414        let builder = CreateRequestBuilder::new();
415        let mut create_req = builder.build();
416        let storage_path = "test";
417        create_req.region_dir = region_dir(storage_path, region_id);
418
419        region_server
420            .handle_request(region_id, RegionRequest::Create(create_req))
421            .await
422            .unwrap();
423
424        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
425
426        // Should be ok, if we try to downgrade it twice.
427        for _ in 0..2 {
428            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
429            let instruction = Instruction::DowngradeRegion(DowngradeRegion {
430                region_id,
431                flush_timeout: Some(Duration::from_secs(1)),
432            });
433
434            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
435            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
436            assert_matches!(control, HandleControl::Continue);
437
438            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
439
440            if let InstructionReply::DowngradeRegion(reply) = reply {
441                assert!(reply.exists);
442                assert!(reply.error.is_none());
443                assert_eq!(reply.last_entry_id.unwrap(), 0);
444            } else {
445                unreachable!()
446            }
447        }
448
449        // Downgrades a not exists region.
450        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
451        let instruction = Instruction::DowngradeRegion(DowngradeRegion {
452            region_id: RegionId::new(2048, 1),
453            flush_timeout: Some(Duration::from_secs(1)),
454        });
455        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
456        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
457        assert_matches!(control, HandleControl::Continue);
458
459        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
460
461        if let InstructionReply::DowngradeRegion(reply) = reply {
462            assert!(!reply.exists);
463            assert!(reply.error.is_none());
464            assert!(reply.last_entry_id.is_none());
465        } else {
466            unreachable!()
467        }
468    }
469}