datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod apply_staging_manifest;
26mod close_region;
27mod downgrade_region;
28mod enter_staging;
29mod file_ref;
30mod flush_region;
31mod gc_worker;
32mod open_region;
33mod remap_manifest;
34mod sync_region;
35mod upgrade_region;
36
37use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
38use crate::heartbeat::handler::close_region::CloseRegionsHandler;
39use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
40use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
41use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
42use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
43use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
44use crate::heartbeat::handler::open_region::OpenRegionsHandler;
45use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
46use crate::heartbeat::handler::sync_region::SyncRegionHandler;
47use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
48use crate::heartbeat::task_tracker::TaskTracker;
49use crate::region_server::RegionServer;
50
51/// The handler for [`Instruction`]s.
52#[derive(Clone)]
53pub struct RegionHeartbeatResponseHandler {
54    region_server: RegionServer,
55    downgrade_tasks: TaskTracker<()>,
56    flush_tasks: TaskTracker<()>,
57    open_region_parallelism: usize,
58    gc_tasks: TaskTracker<GcReport>,
59}
60
61#[async_trait::async_trait]
62pub trait InstructionHandler: Send + Sync {
63    type Instruction;
64    async fn handle(
65        &self,
66        ctx: &HandlerContext,
67        instruction: Self::Instruction,
68    ) -> Option<InstructionReply>;
69}
70
71#[derive(Clone)]
72pub struct HandlerContext {
73    region_server: RegionServer,
74    downgrade_tasks: TaskTracker<()>,
75    flush_tasks: TaskTracker<()>,
76    gc_tasks: TaskTracker<GcReport>,
77}
78
79impl HandlerContext {
80    #[cfg(test)]
81    pub fn new_for_test(region_server: RegionServer) -> Self {
82        Self {
83            region_server,
84            downgrade_tasks: TaskTracker::new(),
85            flush_tasks: TaskTracker::new(),
86            gc_tasks: TaskTracker::new(),
87        }
88    }
89}
90
91impl RegionHeartbeatResponseHandler {
92    /// Returns the [RegionHeartbeatResponseHandler].
93    pub fn new(region_server: RegionServer) -> Self {
94        Self {
95            region_server,
96            downgrade_tasks: TaskTracker::new(),
97            flush_tasks: TaskTracker::new(),
98            // Default to half of the number of CPUs.
99            open_region_parallelism: (num_cpus::get() / 2).max(1),
100            gc_tasks: TaskTracker::new(),
101        }
102    }
103
104    /// Sets the parallelism for opening regions.
105    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
106        self.open_region_parallelism = parallelism;
107        self
108    }
109
110    fn build_handler(
111        &self,
112        instruction: &Instruction,
113    ) -> MetaResult<Option<Box<InstructionHandlers>>> {
114        match instruction {
115            Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
116            Instruction::OpenRegions(_) => Ok(Some(Box::new(
117                OpenRegionsHandler {
118                    open_region_parallelism: self.open_region_parallelism,
119                }
120                .into(),
121            ))),
122            Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
123            Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
124            Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
125                UpgradeRegionsHandler {
126                    upgrade_region_parallelism: self.open_region_parallelism,
127                }
128                .into(),
129            ))),
130            Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
131            Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
132            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
133            Instruction::Suspend => Ok(None),
134            Instruction::EnterStagingRegions(_) => {
135                Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
136            }
137            Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
138            Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
139            Instruction::ApplyStagingManifests(_) => {
140                Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
141            }
142        }
143    }
144}
145
146#[allow(clippy::enum_variant_names)]
147pub enum InstructionHandlers {
148    CloseRegions(CloseRegionsHandler),
149    OpenRegions(OpenRegionsHandler),
150    FlushRegions(FlushRegionsHandler),
151    DowngradeRegions(DowngradeRegionsHandler),
152    UpgradeRegions(UpgradeRegionsHandler),
153    GetFileRefs(GetFileRefsHandler),
154    GcRegions(GcRegionsHandler),
155    EnterStagingRegions(EnterStagingRegionsHandler),
156    SyncRegions(SyncRegionHandler),
157    RemapManifest(RemapManifestHandler),
158    ApplyStagingManifests(ApplyStagingManifestsHandler),
159}
160
161macro_rules! impl_from_handler {
162    ($($handler:ident => $variant:ident),*) => {
163        $(
164            impl From<$handler> for InstructionHandlers {
165                fn from(handler: $handler) -> Self {
166                    InstructionHandlers::$variant(handler)
167                }
168            }
169        )*
170    };
171}
172
173impl_from_handler!(
174    CloseRegionsHandler => CloseRegions,
175    OpenRegionsHandler => OpenRegions,
176    FlushRegionsHandler => FlushRegions,
177    DowngradeRegionsHandler => DowngradeRegions,
178    UpgradeRegionsHandler => UpgradeRegions,
179    GetFileRefsHandler => GetFileRefs,
180    GcRegionsHandler => GcRegions,
181    EnterStagingRegionsHandler => EnterStagingRegions,
182    SyncRegionHandler => SyncRegions,
183    RemapManifestHandler => RemapManifest,
184    ApplyStagingManifestsHandler => ApplyStagingManifests
185);
186
187macro_rules! dispatch_instr {
188    (
189        $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
190    ) => {
191        impl InstructionHandlers {
192            pub async fn handle(
193                &self,
194                ctx: &HandlerContext,
195                instruction: Instruction,
196            ) -> Option<InstructionReply> {
197                match (self, instruction) {
198                    $(
199                        (
200                            InstructionHandlers::$handler_variant(handler),
201                            Instruction::$instr_variant(instr),
202                        ) => handler.handle(ctx, instr).await,
203                    )*
204                    // Safety: must be used in pairs with `build_handler`.
205                    _ => unreachable!(),
206                }
207            }
208            /// Check whether this instruction is acceptable by any handler.
209            pub fn is_acceptable(instruction: &Instruction) -> bool {
210                matches!(
211                    instruction,
212                    $(
213                        Instruction::$instr_variant { .. }
214                    )|*
215                )
216            }
217        }
218    };
219}
220
221dispatch_instr!(
222    CloseRegions => CloseRegions,
223    OpenRegions => OpenRegions,
224    FlushRegions => FlushRegions,
225    DowngradeRegions => DowngradeRegions,
226    UpgradeRegions => UpgradeRegions,
227    GetFileRefs => GetFileRefs,
228    GcRegions => GcRegions,
229    EnterStagingRegions => EnterStagingRegions,
230    SyncRegions => SyncRegions,
231    RemapManifest => RemapManifest,
232    ApplyStagingManifests => ApplyStagingManifests,
233);
234
235#[async_trait]
236impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
237    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
238        if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
239            return InstructionHandlers::is_acceptable(instruction);
240        }
241        false
242    }
243
244    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
245        let (meta, instruction) = ctx
246            .incoming_message
247            .take()
248            .context(InvalidHeartbeatResponseSnafu)?;
249
250        let mailbox = ctx.mailbox.clone();
251        if let Some(handler) = self.build_handler(&instruction)? {
252            let context = HandlerContext {
253                region_server: self.region_server.clone(),
254                downgrade_tasks: self.downgrade_tasks.clone(),
255                flush_tasks: self.flush_tasks.clone(),
256                gc_tasks: self.gc_tasks.clone(),
257            };
258            let _handle = common_runtime::spawn_global(async move {
259                let reply = handler.handle(&context, instruction).await;
260                if let Some(reply) = reply
261                    && let Err(e) = mailbox.send((meta, reply)).await
262                {
263                    let error = e.to_string();
264                    let (meta, reply) = e.0;
265                    error!("Failed to send reply {reply} to {meta:?}: {error}");
266                }
267            });
268        }
269
270        Ok(HandleControl::Continue)
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use std::assert_matches::assert_matches;
277    use std::collections::HashMap;
278    use std::sync::Arc;
279    use std::time::Duration;
280
281    use common_meta::RegionIdent;
282    use common_meta::heartbeat::mailbox::{
283        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
284    };
285    use common_meta::instruction::{
286        DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
287    };
288    use mito2::config::MitoConfig;
289    use mito2::engine::MITO_ENGINE_NAME;
290    use mito2::test_util::{CreateRequestBuilder, TestEnv};
291    use store_api::path_utils::table_dir;
292    use store_api::region_engine::RegionRole;
293    use store_api::region_request::{RegionCloseRequest, RegionRequest};
294    use store_api::storage::RegionId;
295    use tokio::sync::mpsc::{self, Receiver};
296
297    use super::*;
298    use crate::error;
299    use crate::tests::mock_region_server;
300
301    pub struct HeartbeatResponseTestEnv {
302        pub(crate) mailbox: MailboxRef,
303        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
304    }
305
306    impl HeartbeatResponseTestEnv {
307        pub fn new() -> Self {
308            let (tx, rx) = mpsc::channel(8);
309            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
310
311            HeartbeatResponseTestEnv {
312                mailbox,
313                receiver: rx,
314            }
315        }
316
317        pub fn create_handler_ctx(
318            &self,
319            incoming_message: IncomingMessage,
320        ) -> HeartbeatResponseHandlerContext {
321            HeartbeatResponseHandlerContext {
322                mailbox: self.mailbox.clone(),
323                response: Default::default(),
324                incoming_message: Some(incoming_message),
325            }
326        }
327    }
328
329    #[test]
330    fn test_is_acceptable() {
331        common_telemetry::init_default_ut_logging();
332        let region_server = mock_region_server();
333        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
334        let heartbeat_env = HeartbeatResponseTestEnv::new();
335        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
336
337        // Open region
338        let region_id = RegionId::new(1024, 1);
339        let storage_path = "test";
340        let instruction = open_region_instruction(region_id, storage_path);
341        assert!(
342            heartbeat_handler
343                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
344        );
345
346        // Close region
347        let instruction = close_region_instruction(region_id);
348        assert!(
349            heartbeat_handler
350                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
351        );
352
353        // Downgrade region
354        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
355            region_id: RegionId::new(2048, 1),
356            flush_timeout: Some(Duration::from_secs(1)),
357        }]);
358        assert!(
359            heartbeat_handler
360                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
361        );
362
363        // Upgrade region
364        let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
365            region_id,
366            ..Default::default()
367        }]);
368        assert!(
369            heartbeat_handler
370                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
371        );
372
373        // Enter staging region
374        let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
375            region_id,
376            partition_expr: "".to_string(),
377        }]);
378        assert!(
379            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
380        );
381    }
382
383    fn close_region_instruction(region_id: RegionId) -> Instruction {
384        Instruction::CloseRegions(vec![RegionIdent {
385            table_id: region_id.table_id(),
386            region_number: region_id.region_number(),
387            datanode_id: 2,
388            engine: MITO_ENGINE_NAME.to_string(),
389        }])
390    }
391
392    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
393        Instruction::OpenRegions(vec![OpenRegion::new(
394            RegionIdent {
395                table_id: region_id.table_id(),
396                region_number: region_id.region_number(),
397                datanode_id: 2,
398                engine: MITO_ENGINE_NAME.to_string(),
399            },
400            path,
401            HashMap::new(),
402            HashMap::new(),
403            false,
404        )])
405    }
406
407    #[tokio::test]
408    async fn test_close_region() {
409        common_telemetry::init_default_ut_logging();
410
411        let mut region_server = mock_region_server();
412        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
413
414        let mut engine_env = TestEnv::with_prefix("close-region").await;
415        let engine = engine_env.create_engine(MitoConfig::default()).await;
416        region_server.register_engine(Arc::new(engine));
417        let region_id = RegionId::new(1024, 1);
418
419        let builder = CreateRequestBuilder::new();
420        let create_req = builder.build();
421        region_server
422            .handle_request(region_id, RegionRequest::Create(create_req))
423            .await
424            .unwrap();
425
426        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
427
428        // Should be ok, if we try to close it twice.
429        for _ in 0..2 {
430            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
431            let instruction = close_region_instruction(region_id);
432
433            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
434            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
435            assert_matches!(control, HandleControl::Continue);
436
437            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
438
439            if let InstructionReply::CloseRegions(reply) = reply {
440                assert!(reply.result);
441                assert!(reply.error.is_none());
442            } else {
443                unreachable!()
444            }
445
446            assert_matches!(
447                region_server
448                    .set_region_role(region_id, RegionRole::Leader)
449                    .unwrap_err(),
450                error::Error::RegionNotFound { .. }
451            );
452        }
453    }
454
455    #[tokio::test]
456    async fn test_open_region_ok() {
457        common_telemetry::init_default_ut_logging();
458
459        let mut region_server = mock_region_server();
460        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
461
462        let mut engine_env = TestEnv::with_prefix("open-region").await;
463        let engine = engine_env.create_engine(MitoConfig::default()).await;
464        region_server.register_engine(Arc::new(engine));
465        let region_id = RegionId::new(1024, 1);
466
467        let builder = CreateRequestBuilder::new();
468        let mut create_req = builder.build();
469        let storage_path = "test";
470        create_req.table_dir = table_dir(storage_path, region_id.table_id());
471
472        region_server
473            .handle_request(region_id, RegionRequest::Create(create_req))
474            .await
475            .unwrap();
476
477        region_server
478            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
479            .await
480            .unwrap();
481        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
482
483        // Should be ok, if we try to open it twice.
484        for _ in 0..2 {
485            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
486            let instruction = open_region_instruction(region_id, storage_path);
487
488            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
489            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
490            assert_matches!(control, HandleControl::Continue);
491
492            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
493
494            if let InstructionReply::OpenRegions(reply) = reply {
495                assert!(reply.result);
496                assert!(reply.error.is_none());
497            } else {
498                unreachable!()
499            }
500        }
501    }
502
503    #[tokio::test]
504    async fn test_open_not_exists_region() {
505        common_telemetry::init_default_ut_logging();
506
507        let mut region_server = mock_region_server();
508        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
509
510        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
511        let engine = engine_env.create_engine(MitoConfig::default()).await;
512        region_server.register_engine(Arc::new(engine));
513        let region_id = RegionId::new(1024, 1);
514        let storage_path = "test";
515
516        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
517
518        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
519        let instruction = open_region_instruction(region_id, storage_path);
520
521        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
522        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
523        assert_matches!(control, HandleControl::Continue);
524
525        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
526
527        if let InstructionReply::OpenRegions(reply) = reply {
528            assert!(!reply.result);
529            assert!(reply.error.is_some());
530        } else {
531            unreachable!()
532        }
533    }
534
535    #[tokio::test]
536    async fn test_downgrade_region() {
537        common_telemetry::init_default_ut_logging();
538
539        let mut region_server = mock_region_server();
540        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
541
542        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
543        let engine = engine_env.create_engine(MitoConfig::default()).await;
544        region_server.register_engine(Arc::new(engine));
545        let region_id = RegionId::new(1024, 1);
546
547        let builder = CreateRequestBuilder::new();
548        let mut create_req = builder.build();
549        let storage_path = "test";
550        create_req.table_dir = table_dir(storage_path, region_id.table_id());
551
552        region_server
553            .handle_request(region_id, RegionRequest::Create(create_req))
554            .await
555            .unwrap();
556
557        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
558
559        // Should be ok, if we try to downgrade it twice.
560        for _ in 0..2 {
561            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
562            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
563                region_id,
564                flush_timeout: Some(Duration::from_secs(1)),
565            }]);
566
567            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
568            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
569            assert_matches!(control, HandleControl::Continue);
570
571            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
572
573            let reply = &reply.expect_downgrade_regions_reply()[0];
574            assert!(reply.exists);
575            assert!(reply.error.is_none());
576            assert_eq!(reply.last_entry_id.unwrap(), 0);
577        }
578
579        // Downgrades a not exists region.
580        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
581        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
582            region_id: RegionId::new(2048, 1),
583            flush_timeout: Some(Duration::from_secs(1)),
584        }]);
585        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
586        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
587        assert_matches!(control, HandleControl::Continue);
588
589        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
590
591        let reply = reply.expect_downgrade_regions_reply();
592        assert!(!reply[0].exists);
593        assert!(reply[0].error.is_none());
594        assert!(reply[0].last_entry_id.is_none());
595    }
596}