datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23
24mod close_region;
25mod downgrade_region;
26mod flush_region;
27mod open_region;
28mod upgrade_region;
29
30use crate::heartbeat::handler::close_region::CloseRegionsHandler;
31use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
32use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
33use crate::heartbeat::handler::open_region::OpenRegionsHandler;
34use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
35use crate::heartbeat::task_tracker::TaskTracker;
36use crate::region_server::RegionServer;
37
38/// The handler for [`Instruction`]s.
39#[derive(Clone)]
40pub struct RegionHeartbeatResponseHandler {
41    region_server: RegionServer,
42    catchup_tasks: TaskTracker<()>,
43    downgrade_tasks: TaskTracker<()>,
44    flush_tasks: TaskTracker<()>,
45    open_region_parallelism: usize,
46}
47
48#[async_trait::async_trait]
49pub trait InstructionHandler: Send + Sync {
50    async fn handle(
51        &self,
52        ctx: &HandlerContext,
53        instruction: Instruction,
54    ) -> Option<InstructionReply>;
55}
56
57#[derive(Clone)]
58pub struct HandlerContext {
59    region_server: RegionServer,
60    catchup_tasks: TaskTracker<()>,
61    downgrade_tasks: TaskTracker<()>,
62    flush_tasks: TaskTracker<()>,
63}
64
65impl HandlerContext {
66    #[cfg(test)]
67    pub fn new_for_test(region_server: RegionServer) -> Self {
68        Self {
69            region_server,
70            catchup_tasks: TaskTracker::new(),
71            downgrade_tasks: TaskTracker::new(),
72            flush_tasks: TaskTracker::new(),
73        }
74    }
75}
76
77impl RegionHeartbeatResponseHandler {
78    /// Returns the [RegionHeartbeatResponseHandler].
79    pub fn new(region_server: RegionServer) -> Self {
80        Self {
81            region_server,
82            catchup_tasks: TaskTracker::new(),
83            downgrade_tasks: TaskTracker::new(),
84            flush_tasks: TaskTracker::new(),
85            // Default to half of the number of CPUs.
86            open_region_parallelism: (num_cpus::get() / 2).max(1),
87        }
88    }
89
90    /// Sets the parallelism for opening regions.
91    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
92        self.open_region_parallelism = parallelism;
93        self
94    }
95
96    fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<dyn InstructionHandler>> {
97        match instruction {
98            Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)),
99            Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler {
100                open_region_parallelism: self.open_region_parallelism,
101            })),
102            Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
103            Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
104            Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
105            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
106        }
107    }
108}
109
110#[async_trait]
111impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
112    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
113        matches!(ctx.incoming_message.as_ref(), |Some((
114            _,
115            Instruction::DowngradeRegions { .. },
116        ))| Some((
117            _,
118            Instruction::UpgradeRegion { .. }
119        )) | Some((
120            _,
121            Instruction::FlushRegions { .. }
122        )) | Some((
123            _,
124            Instruction::OpenRegions { .. }
125        )) | Some((
126            _,
127            Instruction::CloseRegions { .. }
128        )))
129    }
130
131    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
132        let (meta, instruction) = ctx
133            .incoming_message
134            .take()
135            .context(InvalidHeartbeatResponseSnafu)?;
136
137        let mailbox = ctx.mailbox.clone();
138        let region_server = self.region_server.clone();
139        let catchup_tasks = self.catchup_tasks.clone();
140        let downgrade_tasks = self.downgrade_tasks.clone();
141        let flush_tasks = self.flush_tasks.clone();
142        let handler = self.build_handler(&instruction)?;
143        let _handle = common_runtime::spawn_global(async move {
144            let reply = handler
145                .handle(
146                    &HandlerContext {
147                        region_server,
148                        catchup_tasks,
149                        downgrade_tasks,
150                        flush_tasks,
151                    },
152                    instruction,
153                )
154                .await;
155
156            if let Some(reply) = reply
157                && let Err(e) = mailbox.send((meta, reply)).await
158            {
159                error!(e; "Failed to send reply to mailbox");
160            }
161        });
162
163        Ok(HandleControl::Continue)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::assert_matches::assert_matches;
170    use std::collections::HashMap;
171    use std::sync::Arc;
172    use std::time::Duration;
173
174    use common_meta::RegionIdent;
175    use common_meta::heartbeat::mailbox::{
176        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
177    };
178    use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
179    use mito2::config::MitoConfig;
180    use mito2::engine::MITO_ENGINE_NAME;
181    use mito2::test_util::{CreateRequestBuilder, TestEnv};
182    use store_api::path_utils::table_dir;
183    use store_api::region_engine::RegionRole;
184    use store_api::region_request::{RegionCloseRequest, RegionRequest};
185    use store_api::storage::RegionId;
186    use tokio::sync::mpsc::{self, Receiver};
187
188    use super::*;
189    use crate::error;
190    use crate::tests::mock_region_server;
191
192    pub struct HeartbeatResponseTestEnv {
193        pub(crate) mailbox: MailboxRef,
194        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
195    }
196
197    impl HeartbeatResponseTestEnv {
198        pub fn new() -> Self {
199            let (tx, rx) = mpsc::channel(8);
200            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
201
202            HeartbeatResponseTestEnv {
203                mailbox,
204                receiver: rx,
205            }
206        }
207
208        pub fn create_handler_ctx(
209            &self,
210            incoming_message: IncomingMessage,
211        ) -> HeartbeatResponseHandlerContext {
212            HeartbeatResponseHandlerContext {
213                mailbox: self.mailbox.clone(),
214                response: Default::default(),
215                incoming_message: Some(incoming_message),
216            }
217        }
218    }
219
220    #[test]
221    fn test_is_acceptable() {
222        common_telemetry::init_default_ut_logging();
223        let region_server = mock_region_server();
224        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
225        let heartbeat_env = HeartbeatResponseTestEnv::new();
226        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
227
228        // Open region
229        let region_id = RegionId::new(1024, 1);
230        let storage_path = "test";
231        let instruction = open_region_instruction(region_id, storage_path);
232        assert!(
233            heartbeat_handler
234                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
235        );
236
237        // Close region
238        let instruction = close_region_instruction(region_id);
239        assert!(
240            heartbeat_handler
241                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
242        );
243
244        // Downgrade region
245        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
246            region_id: RegionId::new(2048, 1),
247            flush_timeout: Some(Duration::from_secs(1)),
248        }]);
249        assert!(
250            heartbeat_handler
251                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
252        );
253
254        // Upgrade region
255        let instruction = Instruction::UpgradeRegion(UpgradeRegion {
256            region_id,
257            ..Default::default()
258        });
259        assert!(
260            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
261        );
262    }
263
264    fn close_region_instruction(region_id: RegionId) -> Instruction {
265        Instruction::CloseRegions(vec![RegionIdent {
266            table_id: region_id.table_id(),
267            region_number: region_id.region_number(),
268            datanode_id: 2,
269            engine: MITO_ENGINE_NAME.to_string(),
270        }])
271    }
272
273    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
274        Instruction::OpenRegions(vec![OpenRegion::new(
275            RegionIdent {
276                table_id: region_id.table_id(),
277                region_number: region_id.region_number(),
278                datanode_id: 2,
279                engine: MITO_ENGINE_NAME.to_string(),
280            },
281            path,
282            HashMap::new(),
283            HashMap::new(),
284            false,
285        )])
286    }
287
288    #[tokio::test]
289    async fn test_close_region() {
290        common_telemetry::init_default_ut_logging();
291
292        let mut region_server = mock_region_server();
293        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
294
295        let mut engine_env = TestEnv::with_prefix("close-region").await;
296        let engine = engine_env.create_engine(MitoConfig::default()).await;
297        region_server.register_engine(Arc::new(engine));
298        let region_id = RegionId::new(1024, 1);
299
300        let builder = CreateRequestBuilder::new();
301        let create_req = builder.build();
302        region_server
303            .handle_request(region_id, RegionRequest::Create(create_req))
304            .await
305            .unwrap();
306
307        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
308
309        // Should be ok, if we try to close it twice.
310        for _ in 0..2 {
311            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
312            let instruction = close_region_instruction(region_id);
313
314            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
315            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
316            assert_matches!(control, HandleControl::Continue);
317
318            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
319
320            if let InstructionReply::CloseRegions(reply) = reply {
321                assert!(reply.result);
322                assert!(reply.error.is_none());
323            } else {
324                unreachable!()
325            }
326
327            assert_matches!(
328                region_server
329                    .set_region_role(region_id, RegionRole::Leader)
330                    .unwrap_err(),
331                error::Error::RegionNotFound { .. }
332            );
333        }
334    }
335
336    #[tokio::test]
337    async fn test_open_region_ok() {
338        common_telemetry::init_default_ut_logging();
339
340        let mut region_server = mock_region_server();
341        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
342
343        let mut engine_env = TestEnv::with_prefix("open-region").await;
344        let engine = engine_env.create_engine(MitoConfig::default()).await;
345        region_server.register_engine(Arc::new(engine));
346        let region_id = RegionId::new(1024, 1);
347
348        let builder = CreateRequestBuilder::new();
349        let mut create_req = builder.build();
350        let storage_path = "test";
351        create_req.table_dir = table_dir(storage_path, region_id.table_id());
352
353        region_server
354            .handle_request(region_id, RegionRequest::Create(create_req))
355            .await
356            .unwrap();
357
358        region_server
359            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
360            .await
361            .unwrap();
362        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
363
364        // Should be ok, if we try to open it twice.
365        for _ in 0..2 {
366            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
367            let instruction = open_region_instruction(region_id, storage_path);
368
369            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
370            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
371            assert_matches!(control, HandleControl::Continue);
372
373            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
374
375            if let InstructionReply::OpenRegions(reply) = reply {
376                assert!(reply.result);
377                assert!(reply.error.is_none());
378            } else {
379                unreachable!()
380            }
381        }
382    }
383
384    #[tokio::test]
385    async fn test_open_not_exists_region() {
386        common_telemetry::init_default_ut_logging();
387
388        let mut region_server = mock_region_server();
389        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
390
391        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
392        let engine = engine_env.create_engine(MitoConfig::default()).await;
393        region_server.register_engine(Arc::new(engine));
394        let region_id = RegionId::new(1024, 1);
395        let storage_path = "test";
396
397        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
398
399        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
400        let instruction = open_region_instruction(region_id, storage_path);
401
402        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
403        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
404        assert_matches!(control, HandleControl::Continue);
405
406        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
407
408        if let InstructionReply::OpenRegions(reply) = reply {
409            assert!(!reply.result);
410            assert!(reply.error.is_some());
411        } else {
412            unreachable!()
413        }
414    }
415
416    #[tokio::test]
417    async fn test_downgrade_region() {
418        common_telemetry::init_default_ut_logging();
419
420        let mut region_server = mock_region_server();
421        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
422
423        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
424        let engine = engine_env.create_engine(MitoConfig::default()).await;
425        region_server.register_engine(Arc::new(engine));
426        let region_id = RegionId::new(1024, 1);
427
428        let builder = CreateRequestBuilder::new();
429        let mut create_req = builder.build();
430        let storage_path = "test";
431        create_req.table_dir = table_dir(storage_path, region_id.table_id());
432
433        region_server
434            .handle_request(region_id, RegionRequest::Create(create_req))
435            .await
436            .unwrap();
437
438        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
439
440        // Should be ok, if we try to downgrade it twice.
441        for _ in 0..2 {
442            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
443            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
444                region_id,
445                flush_timeout: Some(Duration::from_secs(1)),
446            }]);
447
448            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
449            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
450            assert_matches!(control, HandleControl::Continue);
451
452            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
453
454            let reply = &reply.expect_downgrade_regions_reply()[0];
455            assert!(reply.exists);
456            assert!(reply.error.is_none());
457            assert_eq!(reply.last_entry_id.unwrap(), 0);
458        }
459
460        // Downgrades a not exists region.
461        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
462        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
463            region_id: RegionId::new(2048, 1),
464            flush_timeout: Some(Duration::from_secs(1)),
465        }]);
466        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
467        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
468        assert_matches!(control, HandleControl::Continue);
469
470        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
471
472        let reply = reply.expect_downgrade_regions_reply();
473        assert!(!reply[0].exists);
474        assert!(reply[0].error.is_none());
475        assert!(reply[0].last_entry_id.is_none());
476    }
477}