datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod close_region;
26mod downgrade_region;
27mod file_ref;
28mod flush_region;
29mod gc_worker;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::handler::close_region::CloseRegionsHandler;
34use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
35use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
36use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
37use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
38use crate::heartbeat::handler::open_region::OpenRegionsHandler;
39use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
40use crate::heartbeat::task_tracker::TaskTracker;
41use crate::region_server::RegionServer;
42
43/// The handler for [`Instruction`]s.
44#[derive(Clone)]
45pub struct RegionHeartbeatResponseHandler {
46    region_server: RegionServer,
47    downgrade_tasks: TaskTracker<()>,
48    flush_tasks: TaskTracker<()>,
49    open_region_parallelism: usize,
50    gc_tasks: TaskTracker<GcReport>,
51}
52
53#[async_trait::async_trait]
54pub trait InstructionHandler: Send + Sync {
55    type Instruction;
56    async fn handle(
57        &self,
58        ctx: &HandlerContext,
59        instruction: Self::Instruction,
60    ) -> Option<InstructionReply>;
61}
62
63#[derive(Clone)]
64pub struct HandlerContext {
65    region_server: RegionServer,
66    downgrade_tasks: TaskTracker<()>,
67    flush_tasks: TaskTracker<()>,
68    gc_tasks: TaskTracker<GcReport>,
69}
70
71impl HandlerContext {
72    #[cfg(test)]
73    pub fn new_for_test(region_server: RegionServer) -> Self {
74        Self {
75            region_server,
76            downgrade_tasks: TaskTracker::new(),
77            flush_tasks: TaskTracker::new(),
78            gc_tasks: TaskTracker::new(),
79        }
80    }
81}
82
83impl RegionHeartbeatResponseHandler {
84    /// Returns the [RegionHeartbeatResponseHandler].
85    pub fn new(region_server: RegionServer) -> Self {
86        Self {
87            region_server,
88            downgrade_tasks: TaskTracker::new(),
89            flush_tasks: TaskTracker::new(),
90            // Default to half of the number of CPUs.
91            open_region_parallelism: (num_cpus::get() / 2).max(1),
92            gc_tasks: TaskTracker::new(),
93        }
94    }
95
96    /// Sets the parallelism for opening regions.
97    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
98        self.open_region_parallelism = parallelism;
99        self
100    }
101
102    fn build_handler(
103        &self,
104        instruction: &Instruction,
105    ) -> MetaResult<Option<Box<InstructionHandlers>>> {
106        match instruction {
107            Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
108            Instruction::OpenRegions(_) => Ok(Some(Box::new(
109                OpenRegionsHandler {
110                    open_region_parallelism: self.open_region_parallelism,
111                }
112                .into(),
113            ))),
114            Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
115            Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
116            Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
117                UpgradeRegionsHandler {
118                    upgrade_region_parallelism: self.open_region_parallelism,
119                }
120                .into(),
121            ))),
122            Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
123            Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
124            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
125            Instruction::Suspend => Ok(None),
126        }
127    }
128}
129
130#[allow(clippy::enum_variant_names)]
131pub enum InstructionHandlers {
132    CloseRegions(CloseRegionsHandler),
133    OpenRegions(OpenRegionsHandler),
134    FlushRegions(FlushRegionsHandler),
135    DowngradeRegions(DowngradeRegionsHandler),
136    UpgradeRegions(UpgradeRegionsHandler),
137    GetFileRefs(GetFileRefsHandler),
138    GcRegions(GcRegionsHandler),
139}
140
141macro_rules! impl_from_handler {
142    ($($handler:ident => $variant:ident),*) => {
143        $(
144            impl From<$handler> for InstructionHandlers {
145                fn from(handler: $handler) -> Self {
146                    InstructionHandlers::$variant(handler)
147                }
148            }
149        )*
150    };
151}
152
153impl_from_handler!(
154    CloseRegionsHandler => CloseRegions,
155    OpenRegionsHandler => OpenRegions,
156    FlushRegionsHandler => FlushRegions,
157    DowngradeRegionsHandler => DowngradeRegions,
158    UpgradeRegionsHandler => UpgradeRegions,
159    GetFileRefsHandler => GetFileRefs,
160    GcRegionsHandler => GcRegions
161);
162
163macro_rules! dispatch_instr {
164    (
165        $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
166    ) => {
167        impl InstructionHandlers {
168            pub async fn handle(
169                &self,
170                ctx: &HandlerContext,
171                instruction: Instruction,
172            ) -> Option<InstructionReply> {
173                match (self, instruction) {
174                    $(
175                        (
176                            InstructionHandlers::$handler_variant(handler),
177                            Instruction::$instr_variant(instr),
178                        ) => handler.handle(ctx, instr).await,
179                    )*
180                    // Safety: must be used in pairs with `build_handler`.
181                    _ => unreachable!(),
182                }
183            }
184            /// Check whether this instruction is acceptable by any handler.
185            pub fn is_acceptable(instruction: &Instruction) -> bool {
186                matches!(
187                    instruction,
188                    $(
189                        Instruction::$instr_variant { .. }
190                    )|*
191                )
192            }
193        }
194    };
195}
196
197dispatch_instr!(
198    CloseRegions => CloseRegions,
199    OpenRegions => OpenRegions,
200    FlushRegions => FlushRegions,
201    DowngradeRegions => DowngradeRegions,
202    UpgradeRegions => UpgradeRegions,
203    GetFileRefs => GetFileRefs,
204    GcRegions => GcRegions,
205);
206
207#[async_trait]
208impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
209    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
210        if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
211            return InstructionHandlers::is_acceptable(instruction);
212        }
213        false
214    }
215
216    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
217        let (meta, instruction) = ctx
218            .incoming_message
219            .take()
220            .context(InvalidHeartbeatResponseSnafu)?;
221
222        let mailbox = ctx.mailbox.clone();
223        if let Some(handler) = self.build_handler(&instruction)? {
224            let context = HandlerContext {
225                region_server: self.region_server.clone(),
226                downgrade_tasks: self.downgrade_tasks.clone(),
227                flush_tasks: self.flush_tasks.clone(),
228                gc_tasks: self.gc_tasks.clone(),
229            };
230            let _handle = common_runtime::spawn_global(async move {
231                let reply = handler.handle(&context, instruction).await;
232                if let Some(reply) = reply
233                    && let Err(e) = mailbox.send((meta, reply)).await
234                {
235                    let error = e.to_string();
236                    let (meta, reply) = e.0;
237                    error!("Failed to send reply {reply} to {meta:?}: {error}");
238                }
239            });
240        }
241
242        Ok(HandleControl::Continue)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::assert_matches::assert_matches;
249    use std::collections::HashMap;
250    use std::sync::Arc;
251    use std::time::Duration;
252
253    use common_meta::RegionIdent;
254    use common_meta::heartbeat::mailbox::{
255        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
256    };
257    use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
258    use mito2::config::MitoConfig;
259    use mito2::engine::MITO_ENGINE_NAME;
260    use mito2::test_util::{CreateRequestBuilder, TestEnv};
261    use store_api::path_utils::table_dir;
262    use store_api::region_engine::RegionRole;
263    use store_api::region_request::{RegionCloseRequest, RegionRequest};
264    use store_api::storage::RegionId;
265    use tokio::sync::mpsc::{self, Receiver};
266
267    use super::*;
268    use crate::error;
269    use crate::tests::mock_region_server;
270
271    pub struct HeartbeatResponseTestEnv {
272        pub(crate) mailbox: MailboxRef,
273        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
274    }
275
276    impl HeartbeatResponseTestEnv {
277        pub fn new() -> Self {
278            let (tx, rx) = mpsc::channel(8);
279            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
280
281            HeartbeatResponseTestEnv {
282                mailbox,
283                receiver: rx,
284            }
285        }
286
287        pub fn create_handler_ctx(
288            &self,
289            incoming_message: IncomingMessage,
290        ) -> HeartbeatResponseHandlerContext {
291            HeartbeatResponseHandlerContext {
292                mailbox: self.mailbox.clone(),
293                response: Default::default(),
294                incoming_message: Some(incoming_message),
295            }
296        }
297    }
298
299    #[test]
300    fn test_is_acceptable() {
301        common_telemetry::init_default_ut_logging();
302        let region_server = mock_region_server();
303        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
304        let heartbeat_env = HeartbeatResponseTestEnv::new();
305        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
306
307        // Open region
308        let region_id = RegionId::new(1024, 1);
309        let storage_path = "test";
310        let instruction = open_region_instruction(region_id, storage_path);
311        assert!(
312            heartbeat_handler
313                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
314        );
315
316        // Close region
317        let instruction = close_region_instruction(region_id);
318        assert!(
319            heartbeat_handler
320                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
321        );
322
323        // Downgrade region
324        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
325            region_id: RegionId::new(2048, 1),
326            flush_timeout: Some(Duration::from_secs(1)),
327        }]);
328        assert!(
329            heartbeat_handler
330                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
331        );
332
333        // Upgrade region
334        let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
335            region_id,
336            ..Default::default()
337        }]);
338        assert!(
339            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
340        );
341    }
342
343    fn close_region_instruction(region_id: RegionId) -> Instruction {
344        Instruction::CloseRegions(vec![RegionIdent {
345            table_id: region_id.table_id(),
346            region_number: region_id.region_number(),
347            datanode_id: 2,
348            engine: MITO_ENGINE_NAME.to_string(),
349        }])
350    }
351
352    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
353        Instruction::OpenRegions(vec![OpenRegion::new(
354            RegionIdent {
355                table_id: region_id.table_id(),
356                region_number: region_id.region_number(),
357                datanode_id: 2,
358                engine: MITO_ENGINE_NAME.to_string(),
359            },
360            path,
361            HashMap::new(),
362            HashMap::new(),
363            false,
364        )])
365    }
366
367    #[tokio::test]
368    async fn test_close_region() {
369        common_telemetry::init_default_ut_logging();
370
371        let mut region_server = mock_region_server();
372        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
373
374        let mut engine_env = TestEnv::with_prefix("close-region").await;
375        let engine = engine_env.create_engine(MitoConfig::default()).await;
376        region_server.register_engine(Arc::new(engine));
377        let region_id = RegionId::new(1024, 1);
378
379        let builder = CreateRequestBuilder::new();
380        let create_req = builder.build();
381        region_server
382            .handle_request(region_id, RegionRequest::Create(create_req))
383            .await
384            .unwrap();
385
386        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
387
388        // Should be ok, if we try to close it twice.
389        for _ in 0..2 {
390            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
391            let instruction = close_region_instruction(region_id);
392
393            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
394            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
395            assert_matches!(control, HandleControl::Continue);
396
397            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
398
399            if let InstructionReply::CloseRegions(reply) = reply {
400                assert!(reply.result);
401                assert!(reply.error.is_none());
402            } else {
403                unreachable!()
404            }
405
406            assert_matches!(
407                region_server
408                    .set_region_role(region_id, RegionRole::Leader)
409                    .unwrap_err(),
410                error::Error::RegionNotFound { .. }
411            );
412        }
413    }
414
415    #[tokio::test]
416    async fn test_open_region_ok() {
417        common_telemetry::init_default_ut_logging();
418
419        let mut region_server = mock_region_server();
420        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
421
422        let mut engine_env = TestEnv::with_prefix("open-region").await;
423        let engine = engine_env.create_engine(MitoConfig::default()).await;
424        region_server.register_engine(Arc::new(engine));
425        let region_id = RegionId::new(1024, 1);
426
427        let builder = CreateRequestBuilder::new();
428        let mut create_req = builder.build();
429        let storage_path = "test";
430        create_req.table_dir = table_dir(storage_path, region_id.table_id());
431
432        region_server
433            .handle_request(region_id, RegionRequest::Create(create_req))
434            .await
435            .unwrap();
436
437        region_server
438            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
439            .await
440            .unwrap();
441        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
442
443        // Should be ok, if we try to open it twice.
444        for _ in 0..2 {
445            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
446            let instruction = open_region_instruction(region_id, storage_path);
447
448            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
449            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
450            assert_matches!(control, HandleControl::Continue);
451
452            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
453
454            if let InstructionReply::OpenRegions(reply) = reply {
455                assert!(reply.result);
456                assert!(reply.error.is_none());
457            } else {
458                unreachable!()
459            }
460        }
461    }
462
463    #[tokio::test]
464    async fn test_open_not_exists_region() {
465        common_telemetry::init_default_ut_logging();
466
467        let mut region_server = mock_region_server();
468        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
469
470        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
471        let engine = engine_env.create_engine(MitoConfig::default()).await;
472        region_server.register_engine(Arc::new(engine));
473        let region_id = RegionId::new(1024, 1);
474        let storage_path = "test";
475
476        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
477
478        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
479        let instruction = open_region_instruction(region_id, storage_path);
480
481        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
482        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
483        assert_matches!(control, HandleControl::Continue);
484
485        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
486
487        if let InstructionReply::OpenRegions(reply) = reply {
488            assert!(!reply.result);
489            assert!(reply.error.is_some());
490        } else {
491            unreachable!()
492        }
493    }
494
495    #[tokio::test]
496    async fn test_downgrade_region() {
497        common_telemetry::init_default_ut_logging();
498
499        let mut region_server = mock_region_server();
500        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
501
502        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
503        let engine = engine_env.create_engine(MitoConfig::default()).await;
504        region_server.register_engine(Arc::new(engine));
505        let region_id = RegionId::new(1024, 1);
506
507        let builder = CreateRequestBuilder::new();
508        let mut create_req = builder.build();
509        let storage_path = "test";
510        create_req.table_dir = table_dir(storage_path, region_id.table_id());
511
512        region_server
513            .handle_request(region_id, RegionRequest::Create(create_req))
514            .await
515            .unwrap();
516
517        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
518
519        // Should be ok, if we try to downgrade it twice.
520        for _ in 0..2 {
521            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
522            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
523                region_id,
524                flush_timeout: Some(Duration::from_secs(1)),
525            }]);
526
527            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
528            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
529            assert_matches!(control, HandleControl::Continue);
530
531            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
532
533            let reply = &reply.expect_downgrade_regions_reply()[0];
534            assert!(reply.exists);
535            assert!(reply.error.is_none());
536            assert_eq!(reply.last_entry_id.unwrap(), 0);
537        }
538
539        // Downgrades a not exists region.
540        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
541        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
542            region_id: RegionId::new(2048, 1),
543            flush_timeout: Some(Duration::from_secs(1)),
544        }]);
545        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
546        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
547        assert_matches!(control, HandleControl::Continue);
548
549        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
550
551        let reply = reply.expect_downgrade_regions_reply();
552        assert!(!reply[0].exists);
553        assert!(reply[0].error.is_none());
554        assert!(reply[0].last_entry_id.is_none());
555    }
556}