datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod close_region;
26mod downgrade_region;
27mod file_ref;
28mod flush_region;
29mod gc_worker;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::handler::close_region::CloseRegionsHandler;
34use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
35use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
36use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
37use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
38use crate::heartbeat::handler::open_region::OpenRegionsHandler;
39use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
40use crate::heartbeat::task_tracker::TaskTracker;
41use crate::region_server::RegionServer;
42
43/// The handler for [`Instruction`]s.
44#[derive(Clone)]
45pub struct RegionHeartbeatResponseHandler {
46    region_server: RegionServer,
47    downgrade_tasks: TaskTracker<()>,
48    flush_tasks: TaskTracker<()>,
49    open_region_parallelism: usize,
50    gc_tasks: TaskTracker<GcReport>,
51}
52
53#[async_trait::async_trait]
54pub trait InstructionHandler: Send + Sync {
55    type Instruction;
56    async fn handle(
57        &self,
58        ctx: &HandlerContext,
59        instruction: Self::Instruction,
60    ) -> Option<InstructionReply>;
61}
62
63#[derive(Clone)]
64pub struct HandlerContext {
65    region_server: RegionServer,
66    downgrade_tasks: TaskTracker<()>,
67    flush_tasks: TaskTracker<()>,
68    gc_tasks: TaskTracker<GcReport>,
69}
70
71impl HandlerContext {
72    #[cfg(test)]
73    pub fn new_for_test(region_server: RegionServer) -> Self {
74        Self {
75            region_server,
76            downgrade_tasks: TaskTracker::new(),
77            flush_tasks: TaskTracker::new(),
78            gc_tasks: TaskTracker::new(),
79        }
80    }
81}
82
83impl RegionHeartbeatResponseHandler {
84    /// Returns the [RegionHeartbeatResponseHandler].
85    pub fn new(region_server: RegionServer) -> Self {
86        Self {
87            region_server,
88            downgrade_tasks: TaskTracker::new(),
89            flush_tasks: TaskTracker::new(),
90            // Default to half of the number of CPUs.
91            open_region_parallelism: (num_cpus::get() / 2).max(1),
92            gc_tasks: TaskTracker::new(),
93        }
94    }
95
96    /// Sets the parallelism for opening regions.
97    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
98        self.open_region_parallelism = parallelism;
99        self
100    }
101
102    fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
103        match instruction {
104            Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
105            Instruction::OpenRegions(_) => Ok(Box::new(
106                OpenRegionsHandler {
107                    open_region_parallelism: self.open_region_parallelism,
108                }
109                .into(),
110            )),
111            Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
112            Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
113            Instruction::UpgradeRegions(_) => Ok(Box::new(
114                UpgradeRegionsHandler {
115                    upgrade_region_parallelism: self.open_region_parallelism,
116                }
117                .into(),
118            )),
119            Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
120            Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
121            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
122        }
123    }
124}
125
126#[allow(clippy::enum_variant_names)]
127pub enum InstructionHandlers {
128    CloseRegions(CloseRegionsHandler),
129    OpenRegions(OpenRegionsHandler),
130    FlushRegions(FlushRegionsHandler),
131    DowngradeRegions(DowngradeRegionsHandler),
132    UpgradeRegions(UpgradeRegionsHandler),
133    GetFileRefs(GetFileRefsHandler),
134    GcRegions(GcRegionsHandler),
135}
136
137macro_rules! impl_from_handler {
138    ($($handler:ident => $variant:ident),*) => {
139        $(
140            impl From<$handler> for InstructionHandlers {
141                fn from(handler: $handler) -> Self {
142                    InstructionHandlers::$variant(handler)
143                }
144            }
145        )*
146    };
147}
148
149impl_from_handler!(
150    CloseRegionsHandler => CloseRegions,
151    OpenRegionsHandler => OpenRegions,
152    FlushRegionsHandler => FlushRegions,
153    DowngradeRegionsHandler => DowngradeRegions,
154    UpgradeRegionsHandler => UpgradeRegions,
155    GetFileRefsHandler => GetFileRefs,
156    GcRegionsHandler => GcRegions
157);
158
159macro_rules! dispatch_instr {
160    (
161        $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
162    ) => {
163        impl InstructionHandlers {
164            pub async fn handle(
165                &self,
166                ctx: &HandlerContext,
167                instruction: Instruction,
168            ) -> Option<InstructionReply> {
169                match (self, instruction) {
170                    $(
171                        (
172                            InstructionHandlers::$handler_variant(handler),
173                            Instruction::$instr_variant(instr),
174                        ) => handler.handle(ctx, instr).await,
175                    )*
176                    // Safety: must be used in pairs with `build_handler`.
177                    _ => unreachable!(),
178                }
179            }
180            /// Check whether this instruction is acceptable by any handler.
181            pub fn is_acceptable(instruction: &Instruction) -> bool {
182                matches!(
183                    instruction,
184                    $(
185                        Instruction::$instr_variant { .. }
186                    )|*
187                )
188            }
189        }
190    };
191}
192
193dispatch_instr!(
194    CloseRegions => CloseRegions,
195    OpenRegions => OpenRegions,
196    FlushRegions => FlushRegions,
197    DowngradeRegions => DowngradeRegions,
198    UpgradeRegions => UpgradeRegions,
199    GetFileRefs => GetFileRefs,
200    GcRegions => GcRegions,
201);
202
203#[async_trait]
204impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
205    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
206        if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
207            return InstructionHandlers::is_acceptable(instruction);
208        }
209        false
210    }
211
212    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
213        let (meta, instruction) = ctx
214            .incoming_message
215            .take()
216            .context(InvalidHeartbeatResponseSnafu)?;
217
218        let mailbox = ctx.mailbox.clone();
219        let region_server = self.region_server.clone();
220        let downgrade_tasks = self.downgrade_tasks.clone();
221        let flush_tasks = self.flush_tasks.clone();
222        let gc_tasks = self.gc_tasks.clone();
223        let handler = self.build_handler(&instruction)?;
224        let _handle = common_runtime::spawn_global(async move {
225            let reply = handler
226                .handle(
227                    &HandlerContext {
228                        region_server,
229                        downgrade_tasks,
230                        flush_tasks,
231                        gc_tasks,
232                    },
233                    instruction,
234                )
235                .await;
236
237            if let Some(reply) = reply
238                && let Err(e) = mailbox.send((meta, reply)).await
239            {
240                error!(e; "Failed to send reply to mailbox");
241            }
242        });
243
244        Ok(HandleControl::Continue)
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::assert_matches::assert_matches;
251    use std::collections::HashMap;
252    use std::sync::Arc;
253    use std::time::Duration;
254
255    use common_meta::RegionIdent;
256    use common_meta::heartbeat::mailbox::{
257        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
258    };
259    use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
260    use mito2::config::MitoConfig;
261    use mito2::engine::MITO_ENGINE_NAME;
262    use mito2::test_util::{CreateRequestBuilder, TestEnv};
263    use store_api::path_utils::table_dir;
264    use store_api::region_engine::RegionRole;
265    use store_api::region_request::{RegionCloseRequest, RegionRequest};
266    use store_api::storage::RegionId;
267    use tokio::sync::mpsc::{self, Receiver};
268
269    use super::*;
270    use crate::error;
271    use crate::tests::mock_region_server;
272
273    pub struct HeartbeatResponseTestEnv {
274        pub(crate) mailbox: MailboxRef,
275        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
276    }
277
278    impl HeartbeatResponseTestEnv {
279        pub fn new() -> Self {
280            let (tx, rx) = mpsc::channel(8);
281            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
282
283            HeartbeatResponseTestEnv {
284                mailbox,
285                receiver: rx,
286            }
287        }
288
289        pub fn create_handler_ctx(
290            &self,
291            incoming_message: IncomingMessage,
292        ) -> HeartbeatResponseHandlerContext {
293            HeartbeatResponseHandlerContext {
294                mailbox: self.mailbox.clone(),
295                response: Default::default(),
296                incoming_message: Some(incoming_message),
297            }
298        }
299    }
300
301    #[test]
302    fn test_is_acceptable() {
303        common_telemetry::init_default_ut_logging();
304        let region_server = mock_region_server();
305        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
306        let heartbeat_env = HeartbeatResponseTestEnv::new();
307        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
308
309        // Open region
310        let region_id = RegionId::new(1024, 1);
311        let storage_path = "test";
312        let instruction = open_region_instruction(region_id, storage_path);
313        assert!(
314            heartbeat_handler
315                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
316        );
317
318        // Close region
319        let instruction = close_region_instruction(region_id);
320        assert!(
321            heartbeat_handler
322                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
323        );
324
325        // Downgrade region
326        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
327            region_id: RegionId::new(2048, 1),
328            flush_timeout: Some(Duration::from_secs(1)),
329        }]);
330        assert!(
331            heartbeat_handler
332                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
333        );
334
335        // Upgrade region
336        let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
337            region_id,
338            ..Default::default()
339        }]);
340        assert!(
341            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
342        );
343    }
344
345    fn close_region_instruction(region_id: RegionId) -> Instruction {
346        Instruction::CloseRegions(vec![RegionIdent {
347            table_id: region_id.table_id(),
348            region_number: region_id.region_number(),
349            datanode_id: 2,
350            engine: MITO_ENGINE_NAME.to_string(),
351        }])
352    }
353
354    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
355        Instruction::OpenRegions(vec![OpenRegion::new(
356            RegionIdent {
357                table_id: region_id.table_id(),
358                region_number: region_id.region_number(),
359                datanode_id: 2,
360                engine: MITO_ENGINE_NAME.to_string(),
361            },
362            path,
363            HashMap::new(),
364            HashMap::new(),
365            false,
366        )])
367    }
368
369    #[tokio::test]
370    async fn test_close_region() {
371        common_telemetry::init_default_ut_logging();
372
373        let mut region_server = mock_region_server();
374        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
375
376        let mut engine_env = TestEnv::with_prefix("close-region").await;
377        let engine = engine_env.create_engine(MitoConfig::default()).await;
378        region_server.register_engine(Arc::new(engine));
379        let region_id = RegionId::new(1024, 1);
380
381        let builder = CreateRequestBuilder::new();
382        let create_req = builder.build();
383        region_server
384            .handle_request(region_id, RegionRequest::Create(create_req))
385            .await
386            .unwrap();
387
388        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
389
390        // Should be ok, if we try to close it twice.
391        for _ in 0..2 {
392            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
393            let instruction = close_region_instruction(region_id);
394
395            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
396            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
397            assert_matches!(control, HandleControl::Continue);
398
399            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
400
401            if let InstructionReply::CloseRegions(reply) = reply {
402                assert!(reply.result);
403                assert!(reply.error.is_none());
404            } else {
405                unreachable!()
406            }
407
408            assert_matches!(
409                region_server
410                    .set_region_role(region_id, RegionRole::Leader)
411                    .unwrap_err(),
412                error::Error::RegionNotFound { .. }
413            );
414        }
415    }
416
417    #[tokio::test]
418    async fn test_open_region_ok() {
419        common_telemetry::init_default_ut_logging();
420
421        let mut region_server = mock_region_server();
422        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
423
424        let mut engine_env = TestEnv::with_prefix("open-region").await;
425        let engine = engine_env.create_engine(MitoConfig::default()).await;
426        region_server.register_engine(Arc::new(engine));
427        let region_id = RegionId::new(1024, 1);
428
429        let builder = CreateRequestBuilder::new();
430        let mut create_req = builder.build();
431        let storage_path = "test";
432        create_req.table_dir = table_dir(storage_path, region_id.table_id());
433
434        region_server
435            .handle_request(region_id, RegionRequest::Create(create_req))
436            .await
437            .unwrap();
438
439        region_server
440            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
441            .await
442            .unwrap();
443        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
444
445        // Should be ok, if we try to open it twice.
446        for _ in 0..2 {
447            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
448            let instruction = open_region_instruction(region_id, storage_path);
449
450            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
451            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
452            assert_matches!(control, HandleControl::Continue);
453
454            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
455
456            if let InstructionReply::OpenRegions(reply) = reply {
457                assert!(reply.result);
458                assert!(reply.error.is_none());
459            } else {
460                unreachable!()
461            }
462        }
463    }
464
465    #[tokio::test]
466    async fn test_open_not_exists_region() {
467        common_telemetry::init_default_ut_logging();
468
469        let mut region_server = mock_region_server();
470        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
471
472        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
473        let engine = engine_env.create_engine(MitoConfig::default()).await;
474        region_server.register_engine(Arc::new(engine));
475        let region_id = RegionId::new(1024, 1);
476        let storage_path = "test";
477
478        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
479
480        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
481        let instruction = open_region_instruction(region_id, storage_path);
482
483        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
484        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
485        assert_matches!(control, HandleControl::Continue);
486
487        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
488
489        if let InstructionReply::OpenRegions(reply) = reply {
490            assert!(!reply.result);
491            assert!(reply.error.is_some());
492        } else {
493            unreachable!()
494        }
495    }
496
497    #[tokio::test]
498    async fn test_downgrade_region() {
499        common_telemetry::init_default_ut_logging();
500
501        let mut region_server = mock_region_server();
502        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
503
504        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
505        let engine = engine_env.create_engine(MitoConfig::default()).await;
506        region_server.register_engine(Arc::new(engine));
507        let region_id = RegionId::new(1024, 1);
508
509        let builder = CreateRequestBuilder::new();
510        let mut create_req = builder.build();
511        let storage_path = "test";
512        create_req.table_dir = table_dir(storage_path, region_id.table_id());
513
514        region_server
515            .handle_request(region_id, RegionRequest::Create(create_req))
516            .await
517            .unwrap();
518
519        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
520
521        // Should be ok, if we try to downgrade it twice.
522        for _ in 0..2 {
523            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
524            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
525                region_id,
526                flush_timeout: Some(Duration::from_secs(1)),
527            }]);
528
529            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
530            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
531            assert_matches!(control, HandleControl::Continue);
532
533            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
534
535            let reply = &reply.expect_downgrade_regions_reply()[0];
536            assert!(reply.exists);
537            assert!(reply.error.is_none());
538            assert_eq!(reply.last_entry_id.unwrap(), 0);
539        }
540
541        // Downgrades a not exists region.
542        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
543        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
544            region_id: RegionId::new(2048, 1),
545            flush_timeout: Some(Duration::from_secs(1)),
546        }]);
547        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
548        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
549        assert_matches!(control, HandleControl::Continue);
550
551        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
552
553        let reply = reply.expect_downgrade_regions_reply();
554        assert!(!reply[0].exists);
555        assert!(reply[0].error.is_none());
556        assert!(reply[0].last_entry_id.is_none());
557    }
558}